一、Spark 运行架构
Spark 运行架构如下图:
各个RDD之间存在着依赖关系,这些依赖关系形成有向无环图DAG,DAGScheduler对这些依赖关系形成的DAG,进行Stage划分,划分的规则很简单,从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分。完成了Stage的划分,DAGScheduler基于每个Stage生成TaskSet,并将TaskSet提交给TaskScheduler。TaskScheduler 负责具体的task调度,在Worker节点上启动task。
二、源码解析:DAGScheduler中的DAG划分
当RDD触发一个Action操作(如:colllect)后,导致SparkContext.runJob的执行。而在SparkContext的run方法中会调用DAGScheduler的run方法最终调用了DAGScheduler的submit方法:
def submitJob[T, U]( rdd: RDD[T], func:(TaskContext,Iterator[T])=> U, partitions:Seq[Int], callSite:CallSite, resultHandler:(Int, U)=>Unit, properties:Properties):JobWaiter[U]={ // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p <0).foreach { p => thrownewIllegalArgumentException( "Attempting to access a non-existent partition: "+ p +". "+ "Total number of partitions: "+ maxPartitions) } val jobId = nextJobId.getAndIncrement() if(partitions.size ==0){ // Return immediately if the job is running 0 tasks returnnewJobWaiter[U](this, jobId,0, resultHandler) } assert(partitions.size >0) val func2 = func.asInstanceOf[(TaskContext,Iterator[_])=> _] val waiter =newJobWaiter(this, jobId, partitions.size, resultHandler) //给eventProcessLoop发送JobSubmitted消息 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
DAGScheduler的submit方法中,像eventProcessLoop对象发送了JobSubmitted消息。eventProcessLoop是DAGSchedulerEventProcessLoop类的对象
private[scheduler] val eventProcessLoop =newDAGSchedulerEventProcessLoop(this)
DAGSchedulerEventProcessLoop,接收各种消息并进行处理,处理的逻辑在其doOnReceive方法中:
private def doOnReceive(event:DAGSchedulerEvent):Unit= event match { //Job提交 caseJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)=> dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) caseMapStageSubmitted(jobId, dependency, callSite, listener, properties)=> dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) caseStageCancelled(stageId)=> dagScheduler.handleStageCancellation(stageId) caseJobCancelled(jobId)=> dagScheduler.handleJobCancellation(jobId) caseJobGroupCancelled(groupId)=> dagScheduler.handleJobGroupCancelled(groupId) caseAllJobsCancelled=> dagScheduler.doCancelAllJobs() caseExecutorAdded(execId, host)=> dagScheduler.handleExecutorAdded(execId, host) caseExecutorLost(execId)=> dagScheduler.handleExecutorLost(execId, fetchFailed =false) caseBeginEvent(task, taskInfo)=> dagScheduler.handleBeginEvent(task, taskInfo) caseGettingResultEvent(taskInfo)=> dagScheduler.handleGetTaskResult(taskInfo) case completion:CompletionEvent=> dagScheduler.handleTaskCompletion(completion) caseTaskSetFailed(taskSet, reason, exception)=> dagScheduler.handleTaskSetFailed(taskSet, reason, exception) caseResubmitFailedStages=> dagScheduler.resubmitFailedStages() }
可以把DAGSchedulerEventProcessLoop理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节。无论是内部还是外部消息,DAGScheduler可以共用同一消息处理代码,逻辑清晰,处理方式统一。
接下来分析DAGScheduler的Stage划分,handleJobSubmitted方法首先创建ResultStage
try{ //创建新stage可能出现异常,比如job运行依赖hdfs文文件被删除 finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) }catch{ case e:Exception=> logWarning("Creating new stage failed due to exception - job: "+ jobId, e) listener.jobFailed(e) return }
然后调用submitStage方法,进行stage的划分。
首先由finalRDD获取它的父RDD依赖,判断依赖类型,如果是窄依赖,则将父RDD压入栈中,如果是宽依赖,则作为父Stage。
看一下源码的具体过程:
private def getMissingParentStages(stage:Stage):List[Stage]={ val missing =newHashSet[Stage] //存储需要返回的父Stage val visited =newHashSet[RDD[_]] //存储访问过的RDD //自己建立栈,以免函数的递归调用导致 val waitingForVisit =newStack[RDD[_]] def visit(rdd: RDD[_]){ if(!visited(rdd)){ visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if(rddHasUncachedPartitions){ for(dep <- rdd.dependencies){ dep match { case shufDep:ShuffleDependency[_, _, _]=> val mapStage = getShuffleMapStage(shufDep, stage.firstJobId) if(!mapStage.isAvailable){ missing += mapStage //遇到宽依赖,加入父stage } case narrowDep:NarrowDependency[_]=> waitingForVisit.push(narrowDep.rdd) //窄依赖入栈, } } } } } //回溯的起始RDD入栈 waitingForVisit.push(stage.rdd) while(waitingForVisit.nonEmpty){ visit(waitingForVisit.pop()) } missing.toList }
getMissingParentStages方法是由当前stage,返回他的父stage,父stage的创建由getShuffleMapStage返回,最终会调用newOrUsedShuffleStage方法返回ShuffleMapStage
private def newOrUsedShuffleStage( shuffleDep:ShuffleDependency[_, _, _], firstJobId:Int):ShuffleMapStage={ val rdd = shuffleDep.rdd val numTasks = rdd.partitions.length val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) if(mapOutputTracker.containsShuffle(shuffleDep.shuffleId)){ //Stage已经被计算过,从MapOutputTracker中获取计算结果 val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) val locs =MapOutputTracker.deserializeMapStatuses(serLocs) (0 until locs.length).foreach { i => if(locs(i) ne null){ // locs(i) will be null if missing stage.addOutputLoc(i, locs(i)) } } }else{ // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD "+ rdd.id +" ("+ rdd.getCreationSite +")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage }
现在父Stage已经划分好,下面看看你Stage的提交逻辑
/** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage:Stage){ val jobId = activeJobForStage(stage) if(jobId.isDefined){ logDebug("submitStage("+ stage +")") if(!waitingStages(stage)&&!runningStages(stage)&&!failedStages(stage)){ val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: "+ missing) if(missing.isEmpty){ logInfo("Submitting "+ stage +" ("+ stage.rdd +"), which has no missing parents") //如果没有父stage,则提交当前stage submitMissingTasks(stage, jobId.get) }else{ for(parent <- missing){ //如果有父stage,则递归提交父stage submitStage(parent) } waitingStages += stage } } }else{ abortStage(stage,"No active job for stage "+ stage.id,None) } }提交的过程很简单,首先当前stage获取父stage,如果父stage为空,则当前Stage为起始stage,交给submitMissingTasks处理,如果当前stage不为空,则递归调用submitStage进行提交。
到这里,DAGScheduler中的DAG划分与提交就讲完了,下次解析这些stage是如果封装成TaskSet交给TaskScheduler以及TaskSchedule的调度过程。
相关推荐
DAGScheduler 解析:DAG 的实例化 、DAGScheduler 划分Stage 的原理、DAGScheduler 划分Stage 的具体算法、Stage 内部Task 获取位置的算法;TaskScheduler 解析:TaskScheduler 原理剖析、TaskScheduler 源码解析;...
spark源码分析,RDD、Iterator、Job、DAG、Stage、Taskset、task等
算法实验:计算DAG图最长路径并输出。
java8集合源码第1章:介绍 火花RDD Spark SQL 和数据帧 星火机器学习 Spark Streaming + Apache Kafka (Java8) Hadoop - 限制 Map & Reduce 不适用于所有情况 一个的输出作为另一个的输入 火花 还使用 M&R 磁盘速度...
解析Spark 中DAG 逻辑视图;对RDD 内部的计算机制及计算过程进行深度解析;讲解Spark RDD 容错原理及其四大核心要点解析对Spark RDD 中Runtime 流程进行解析;通过一个WordCount 实例,解析Spark RDD内部机制; ...
与 Hadoop 的 MapReduce 相⽐, Spark 基于内存的运算是 MapReduce 的 100 倍.基于硬盘的运算也 要快 10 倍以上. Spark 实现了⾼效的 DAG 执⾏引擎, 可以通过基于内存来⾼效处理数据流 易⽤ 通⽤ 可融合性
Prolog-DAG-Scheduler 用SWI-Prolog编写的DAG调度程序。 这个项目是为我的声明式编程课程而设计的。 它允许用户在异构体系结构上并行调度任务。 任务表示为有向无环图(DAG),其中顶点表示(原子)子任务,边表示子...
气流维护包 一系列DAG /工作流程可帮助维持Airflow的运行 ...您可以部署到Airflow中的维护工作流,以定期取消后台运行的任务,这些任务与数据库中正在运行的任务不对应。 这很有用,因为当您通过Airflow
基于复合哈希树的Java库(DAG) 兼容OSGi 可嵌入 占地面积小 这有什么好处 区块链和超级账本技术 根据哈希和元数据索引任意(嵌套)数据(在此称为选择器和标签) 我该如何使用? 在此项目测试文件夹中可以找到...
在Go中实现DAG。 用法 假设您要表示以下DAG(表示为JSON字典): { " 1 " : [ " 2 " ], " 2 " : [ " 3 " , " 4 " ], " 4 " : [ " 3 " ] } 您应该执行以下操作: // Initialize the graph graph := dag . ...
在Airflow环境中安装dag-factory之后,有两个步骤来创建DAG。 首先,我们需要创建一个YAML配置文件。 例如: example_dag1 : default_args : owner : ' example_owner ' start_date : 2018-01-01 # or '2 days' ...
大量连续计算的需求 允许在对数据处理时 经由许多步算子 按序计算来实现处理 这些处理 是一个图的结构 但是要注意的是 图有向但是不能形成环 防止死循环 这样的有向无环的处理过程就称之为Spark的DAG有向无环图。
具体来说,它们应具有DAG-PB模式中显示的非可选字段: type PBNode struct { Links [PBLink] Data optional Bytes } type PBLink struct { Hash Link Name optional String Tsize optional Int } 直接使用...
RDD的Action算子触发job的提交,提交到Spark的Job生成RDD DAG,由DAGScheduler转换为Stage DAG,每个Stage中产生相应的Task集合,TaskScheduler将任务分发到Executor执行。每个任务对应的数据块,使用用户定义的函数...
腾讯高级工程师王联辉在OpenCloud 2015大会Spark专场的演讲PPT:腾讯在Spark上的应用与实践优化,主要介绍Spark在腾讯的当前现状、典型应用及效果,以及腾讯在Spark上的实践和优化。其中,典型应用在三个方面:预测...
airflow触发器可以一个dag中同时触发一个或者多个子dag,也可在子触发了的dag中在触发一个或者多个dag。
版本+ dag = versidag动机在分布式系统中,副本的时钟不可靠,无法获得版本的总顺序。 在p2p网络中,这尤其如此,因为其中的时钟差异可能会加剧。 有一些方法可以通过保留因果关系来获得版本的部分顺序,例如使用,...
功能性可视化您的气流DAG之间的依赖关系支持3种类型的依赖项: 触发器TriggerDagRunOperator A中的TriggerDagRunOperator触发DAG B 传感器-DAG A中的ExternalTaskSensor等待DAG B中的任务隐式-提供DAG所依赖的DAG的...
DAG技术与狭义的区块链技术相比,有其创新之处,理论上在不考虑作恶情况下可实现高扩展性和高去中心化,因此存在一些安全隐患。
拓扑群拓扑排序和划分一个有向 acylic 图为互连顶点的几乎不相交的子集。例子 var nodes = [ 'y' , 'j' , 'k' , 'x' ] ;var edges = [ [ 'x' , 'j' ] , [ 'x' , 'y' ] , [ 'k' , 'j' ] , [ 'y' , 'j' ] ,] ;var ...