Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7238字,纯文字阅读大概需要11分钟。
内容图文
![Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法](/upload/InfoBanner/zyjiaocheng/1256/2057356b8c784fdd86a8b891db1448fd.jpg)
Spark 源码解析 : DAGScheduler中的DAG划分与提交
》
介绍了DAGScheduler的Stage划分算法。
本文继续分析Stage被封装成TaskSet,并将TaskSet提交到集群的Executor执行的过程
在DAGScheduler的submitStage方法中,将Stage划分完成,生成拓扑结构,当一个stage没有父stage时候,会调用
DAGScheduler的
submitMissingTasks方法来提交该stage包含tasks。
首先来分析一下
DAGScheduler的
submitMissingTasks方法
1.获取Task的最佳计算位置:
-
val taskIdToLocations
:
Map
[
Int
,
Seq
[
TaskLocation
]]
=
try
{
-
stage match
{
-
case
s
:
ShuffleMapStage
=>
-
partitionsToCompute
.
map
{
id
=>
(
id
,
getPreferredLocs
(
stage
.
rdd
,
id
)
)}.
toMap
-
case
s
:
ResultStage
=>
-
val job
=
s
.
activeJob
.
get
-
partitionsToCompute
.
map
{
id
=>
-
val p
=
s
.
partitions
(
id
)
-
(
id
,
getPreferredLocs
(
stage
.
rdd
,
p
))
-
}.
toMap
-
}
-
}
核心是其中的getPreferredLocs方法,根据RDD的数据信息得到task的最佳计算位置,从而获取较好的数据本地性。其中的细节这里先跳过,在以后的文章在做分析
2.序列化Task的Binary,并进行广播。Executor端在执行task时会向反序列化Task。
3.根据stage的不同类型创建,为stage的每个分区创建创建task,并封装成TaskSet。Stage分两种类型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask。
-
val tasks
:
Seq
[
Task
[
_
]]
=
try
{
-
stage match
{
-
case
stage
:
ShuffleMapStage
=>
-
partitionsToCompute
.
map
{
id
=>
-
val locs
=
taskIdToLocations
(
id
)
-
val part
=
stage
.
rdd
.
partitions
(
id
)
-
new
ShuffleMapTask
(
stage
.
id
,
stage
.
latestInfo
.
attemptId
,
-
taskBinary
,
part
,
locs
,
stage
.
internalAccumulators
)
-
}
-
-
case
stage
:
ResultStage
=>
-
val job
=
stage
.
activeJob
.
get
-
partitionsToCompute
.
map
{
id
=>
-
val p
:
Int
=
stage
.
partitions
(
id
)
-
val part
=
stage
.
rdd
.
partitions
(
p
)
-
val locs
=
taskIdToLocations
(
id
)
-
new
ResultTask
(
stage
.
id
,
stage
.
latestInfo
.
attemptId
,
-
taskBinary
,
part
,
locs
,
id
,
stage
.
internalAccumulators
)
-
}
-
}
4.调用TaskScheduler的submitTasks,提交TaskSet
-
logInfo
(
"Submitting "
+
tasks
.
size
+
" missing tasks from "
+
stage
+
" ("
+
stage
.
rdd
+
")"
)
-
stage
.
pendingPartitions
++=
tasks
.
map
(
_
.
partitionId
)
-
logDebug
(
"New pending partitions: "
+
stage
.
pendingPartitions
)
-
taskScheduler
.
submitTasks
(
new
TaskSet
(
-
tasks
.
toArray
,
stage
.
id
,
stage
.
latestInfo
.
attemptId
,
jobId
,
properties
))
-
stage
.
latestInfo
.
submissionTime
=
Some
(
clock
.
getTimeMillis
())
submitTasks方法的实现在TaskScheduler的实现类TaskSchedulerImpl中。
4.1 TaskSchedulerImpl的submitTasks方法首先创建TaskSetManager。
-
val manager
=
createTaskSetManager
(
taskSet
,
maxTaskFailures
)
-
val stage
=
taskSet
.
stageId
-
val stageTaskSets
=
-
taskSetsByStageIdAndAttempt
.
getOrElseUpdate
(
stage
,
new
HashMap
[
Int
,
TaskSetManager
])
-
stageTaskSets
(
taskSet
.
stageAttemptId
)
=
manager
TaskSetManager负责管理
TaskSchedulerImpl中
一个单独TaskSet,跟踪每一个task,如果task失败,负责重试task直到达到task重试次数的最多次数。并且通过延迟调度来执行task的位置感知调度。
-
private
[
spark
]
class
TaskSetManager
(
-
sched
:
TaskSchedulerImpl
,
//绑定的TaskSchedulerImpl
-
val taskSet
:
TaskSet
,
-
val maxTaskFailures
:
Int
,
//失败最大重试次数
-
clock
:
Clock
=
new
SystemClock
())
-
extends
Schedulable
with
Logging
4.2 将TaskSetManger加入schedulableBuilder
-
schedulableBuilder
.
addTaskSetManager
(
manager
,
manager
.
taskSet
.
properties
)
//将TaskSetManager加入rootPool调度池中,由schedulableBuilder决定调度顺序
schedulableBuilder的类型是 SchedulerBuilder,
SchedulerBuilder是一个trait,有两个实现FIFO
SchedulerBuilder和
Fair
SchedulerBuilder,并且默认采用的是FIFO方式
-
// default scheduler is FIFO
-
private
val schedulingModeConf
=
conf
.
get
(
"spark.scheduler.mode"
,
"FIFO"
)
而
schedulableBuilder的创建是在SparkContext创建SchedulerBackend和TaskScheduler后调用TaskSchedulerImpl的初始化方法进行创建的。
-
def initialize
(
backend
:
SchedulerBackend
)
{
-
this
.
backend
=
backend
-
// temporarily set rootPool name to empty
-
rootPool
=
new
Pool
(
""
,
schedulingMode
,
0
,
0
)
-
schedulableBuilder
=
{
-
schedulingMode match
{
-
case
SchedulingMode
.
FIFO
=>
-
new
FIFOSchedulableBuilder
(
rootPool
)
-
case
SchedulingMode
.
FAIR
=>
-
new
FairSchedulableBuilder
(
rootPool
,
conf
)
-
}
-
}
-
schedulableBuilder
.
buildPools
()
-
}
schedulableBuilder是TaskScheduler中一个重要成员,他根据调度策略决定了TaskSetManager的调度顺序。
4.3 接下来调用SchedulerBackend的riviveOffers方法对Task进行调度,决定task具体运行在哪个Executor中。
调用CoarseGrainedSchedulerBackend的riviveOffers方法,该方法给driverEndpoint发送ReviveOffer消息
-
override def reviveOffers
()
{
-
driverEndpoint
.
send
(
ReviveOffers
)
-
}
driverEndpoint收到
ReviveOffer消息后调用makeOffers方法
-
// Make fake resource offers on all executors
-
private
def makeOffers
()
{
-
//过滤出活跃状态的Executor
-
val activeExecutors
=
executorDataMap
.
filterKeys
(
executorIsAlive
)
-
//将Executor封装成WorkerOffer对象
-
val workOffers
=
activeExecutors
.
map
{
case
(
id
,
executorData
)
=>
-
new
WorkerOffer
(
id
,
executorData
.
executorHost
,
executorData
.
freeCores
)
-
}.
toSeq
-
-
launchTasks
(
scheduler
.
resourceOffers
(
workOffers
))
-
}
注意:上面代码中的executorDataMap,在客户的向Master注册Application的时候,Master已经为
Application分配并启动好Executor,然后注册给
CoarseGrainedSchedulerBackend,注册信息就是存储在executorDataMap数据结构中。
准备好计算资源后,接下来TaskSchedulerImpl基于这些计算资源为task分配Executor。
我们看一下
TaskSchedulerImpl的resourceOffers方法:
-
// 随机打乱offers
-
val shuffledOffers
=
Random
.
shuffle
(
offers
)
-
-
// 构建一个二维数组,保存每个Executor上将要分配的那些task
-
val tasks
=
shuffledOffers
.
map
(
o
=>
new
ArrayBuffer
[
TaskDescription
](
o
.
cores
))
-
val availableCpus
=
shuffledOffers
.
map
(
o
=>
o
.
cores
).
toArray
-
-
//
根据
SchedulerBuilder
的调度算法,给
TaskManager
排好序
-
val sortedTaskSets
=
rootPool
.
getSortedTaskSetQueue
-
for
(
taskSet
<-
sortedTaskSets
)
{
-
logDebug
(
"parentName: %s, name: %s, runningTasks: %s"
.
format
(
-
taskSet
.
parent
.
name
,
taskSet
.
name
,
taskSet
.
runningTasks
))
-
if
(
newExecAvail
)
{
-
taskSet
.
executorAdded
()
-
}
-
}
-
-
// 使用双重循环,对每一个taskset 依照调度的顺序,依次按照本地性级别顺序尝试启动task
-
// 数据本地性级别顺序: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
-
var launchedTask
=
false
-
for
(
taskSet
<-
sortedTaskSets
;
maxLocality
<-
taskSet
.
myLocalityLevels
)
{
-
do
{
-
launchedTask
=
resourceOfferSingleTaskSet
(
-
taskSet
,
maxLocality
,
shuffledOffers
,
availableCpus
,
tasks
)
-
}
while
(
launchedTask
)
-
}
-
-
if
(
tasks
.
size
>
0
)
{
-
hasLaunchedTask
=
true
-
}
-
return
tasks
下面看看
resourceOfferSingleTaskSet 方法:
用当前的数据本地性,调用
TaskSetManager
的
resourceOffer
方法,在当前
executor
上分配
task
-
private
def resourceOfferSingleTaskSet
(
-
taskSet
:
TaskSetManager
,
-
maxLocality
:
TaskLocality
,
-
shuffledOffers
:
Seq
[
WorkerOffer
],
-
availableCpus
:
Array
[
Int
],
-
tasks
:
Seq
[
ArrayBuffer
[
TaskDescription
]])
:
Boolean
=
{
-
var launchedTask
=
false
-
for
(
i
<-
0
until shuffledOffers
.
size
)
{
-
val execId
=
shuffledOffers
(
i
).
executorId
-
val host
=
shuffledOffers
(
i
).
host
-
//如果executor 的cup数大于 每个task的cup数目(值为1)
-
if
(
availableCpus
(
i
)
>=
CPUS_PER_TASK
)
{
-
try
{
-
//
-
for
(
task
<-
taskSet
.
resourceOffer
(
execId
,
host
,
maxLocality
))
{
-
tasks
(
i
)
+=
task
-
val tid
=
task
.
taskId
-
taskIdToTaskSetManager
(
tid
)
=
taskSet
-
taskIdToExecutorId
(
tid
)
=
execId
-
executorIdToTaskCount
(
execId
)
+=
1
-
executorsByHost
(
host
)
+=
execId
-
availableCpus
(
i
)
-=
CPUS_PER_TASK
-
assert
(
availableCpus
(
i
)
>=
0
)
-
launchedTask
=
true
-
}
-
}
为Task分配好资源之后,DriverEndpint调用launchTask方法将task在Executor上启动运行。task在Executor上的启动运行过程,在后面的文章中会继续分析,敬请关注。
总结一下调用过程:
TaskSchedulerImpl#
submitTasks
CoarseGrainedSchedulerBackend#
riviveOffers
CoarseGrainedSchedulerBackend$DriverEndpoint#makeOffers
|-TaskSchedulerImpl#resourceOffers(offers) 为offers分配task
|-
TaskSchedulerImpl
#
resourceOfferSingleTaskSet
CoarseGrainedSchedulerBackend$DriverEndpoint#launchTask
val taskIdToLocations : Map [ Int , Seq [ TaskLocation ]] = try {
stage match {
case s : ShuffleMapStage =>
partitionsToCompute . map { id => ( id , getPreferredLocs ( stage . rdd , id ) )}. toMap
case s : ResultStage =>
val job = s . activeJob . get
partitionsToCompute . map { id =>
val p = s . partitions ( id )
( id , getPreferredLocs ( stage . rdd , p ))
}. toMap
}
}
val tasks : Seq [ Task [ _ ]] = try {
stage match {
case stage : ShuffleMapStage =>
partitionsToCompute . map { id =>
val locs = taskIdToLocations ( id )
val part = stage . rdd . partitions ( id )
new ShuffleMapTask ( stage . id , stage . latestInfo . attemptId ,
taskBinary , part , locs , stage . internalAccumulators )
}
case stage : ResultStage =>
val job = stage . activeJob . get
partitionsToCompute . map { id =>
val p : Int = stage . partitions ( id )
val part = stage . rdd . partitions ( p )
val locs = taskIdToLocations ( id )
new ResultTask ( stage . id , stage . latestInfo . attemptId ,
taskBinary , part , locs , id , stage . internalAccumulators )
}
}
logInfo ( "Submitting " + tasks . size + " missing tasks from " + stage + " (" + stage . rdd + ")" )
stage . pendingPartitions ++= tasks . map ( _ . partitionId )
logDebug ( "New pending partitions: " + stage . pendingPartitions )
taskScheduler . submitTasks ( new TaskSet (
tasks . toArray , stage . id , stage . latestInfo . attemptId , jobId , properties ))
stage . latestInfo . submissionTime = Some ( clock . getTimeMillis ())
val manager = createTaskSetManager ( taskSet , maxTaskFailures )
val stage = taskSet . stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt . getOrElseUpdate ( stage , new HashMap [ Int , TaskSetManager ])
stageTaskSets ( taskSet . stageAttemptId ) = manager
private [ spark ] class TaskSetManager (
sched : TaskSchedulerImpl , //绑定的TaskSchedulerImpl
val taskSet : TaskSet ,
val maxTaskFailures : Int , //失败最大重试次数
clock : Clock = new SystemClock ())
extends Schedulable with Logging
schedulableBuilder . addTaskSetManager ( manager , manager . taskSet . properties ) //将TaskSetManager加入rootPool调度池中,由schedulableBuilder决定调度顺序
// default scheduler is FIFO
private val schedulingModeConf = conf . get ( "spark.scheduler.mode" , "FIFO" )
def initialize ( backend : SchedulerBackend ) {
this . backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool ( "" , schedulingMode , 0 , 0 )
schedulableBuilder = {
schedulingMode match {
case SchedulingMode . FIFO =>
new FIFOSchedulableBuilder ( rootPool )
case SchedulingMode . FAIR =>
new FairSchedulableBuilder ( rootPool , conf )
}
}
schedulableBuilder . buildPools ()
}
override def reviveOffers () {
driverEndpoint . send ( ReviveOffers )
}
// Make fake resource offers on all executors
private def makeOffers () {
//过滤出活跃状态的Executor
val activeExecutors = executorDataMap . filterKeys ( executorIsAlive )
//将Executor封装成WorkerOffer对象
val workOffers = activeExecutors . map { case ( id , executorData ) =>
new WorkerOffer ( id , executorData . executorHost , executorData . freeCores )
}. toSeq
launchTasks ( scheduler . resourceOffers ( workOffers ))
}
// 随机打乱offers
val shuffledOffers = Random . shuffle ( offers )
// 构建一个二维数组,保存每个Executor上将要分配的那些task
val tasks = shuffledOffers . map ( o => new ArrayBuffer [ TaskDescription ]( o . cores ))
val availableCpus = shuffledOffers . map ( o => o . cores ). toArray
//
根据 SchedulerBuilder 的调度算法,给 TaskManager 排好序
val sortedTaskSets = rootPool . getSortedTaskSetQueue
for ( taskSet <- sortedTaskSets ) {
logDebug ( "parentName: %s, name: %s, runningTasks: %s" . format (
taskSet . parent . name , taskSet . name , taskSet . runningTasks ))
if ( newExecAvail ) {
taskSet . executorAdded ()
}
}
// 使用双重循环,对每一个taskset 依照调度的顺序,依次按照本地性级别顺序尝试启动task
// 数据本地性级别顺序: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for ( taskSet <- sortedTaskSets ; maxLocality <- taskSet . myLocalityLevels ) {
do {
launchedTask = resourceOfferSingleTaskSet (
taskSet , maxLocality , shuffledOffers , availableCpus , tasks )
} while ( launchedTask )
}
if ( tasks . size > 0 ) {
hasLaunchedTask = true
}
return tasks
用当前的数据本地性,调用 TaskSetManager 的 resourceOffer 方法,在当前 executor 上分配 task
private def resourceOfferSingleTaskSet (
taskSet : TaskSetManager ,
maxLocality : TaskLocality ,
shuffledOffers : Seq [ WorkerOffer ],
availableCpus : Array [ Int ],
tasks : Seq [ ArrayBuffer [ TaskDescription ]]) : Boolean = {
var launchedTask = false
for ( i <- 0 until shuffledOffers . size ) {
val execId = shuffledOffers ( i ). executorId
val host = shuffledOffers ( i ). host
//如果executor 的cup数大于 每个task的cup数目(值为1)
if ( availableCpus ( i ) >= CPUS_PER_TASK ) {
try {
//
for ( task <- taskSet . resourceOffer ( execId , host , maxLocality )) {
tasks ( i ) += task
val tid = task . taskId
taskIdToTaskSetManager ( tid ) = taskSet
taskIdToExecutorId ( tid ) = execId
executorIdToTaskCount ( execId ) += 1
executorsByHost ( host ) += execId
availableCpus ( i ) -= CPUS_PER_TASK
assert ( availableCpus ( i ) >= 0 )
launchedTask = true
}
}
原文:http://www.cnblogs.com/zhouyf/p/5743382.html
内容总结
以上是互联网集市为您收集整理的Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法全部内容,希望文章能够帮你解决Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。