Spark内核源码解析六:worker原理解析和源码解析
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Spark内核源码解析六:worker原理解析和源码解析,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2310字,纯文字阅读大概需要4分钟。
内容图文
![Spark内核源码解析六:worker原理解析和源码解析](/upload/InfoBanner/zyjiaocheng/633/34d792d181e04fe8bb4609b6fe33a287.jpg)
1、worker里面先找到launchDriver
case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") // 创建DriverRunner线程,包括在driver失败时自动重启driver val driver = new DriverRunner( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, akkaUrl) drivers(driverId) = driver driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem }
DriverRunner里面的start方法
def start() = { new Thread("DriverRunner for " + driverId) { override def run() { try { // 创建工作目录 val driverDir = createWorkingDirectory() // 下载用户的jar包,下载用户jar包到工作目录,然后返回在worker中的路径 val localJarFilename = downloadUserJar(driverDir) def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } // TODO: If we add ability to submit multiple jars they should also be added here // 构建processBuilder,传入Driver启动命令和需要的cpu和内存信息 val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) launchDriver(builder, driverDir, driverDesc.supervise) } catch { case e: Exception => finalException = Some(e) } val state = if (killed) { DriverState.KILLED } else if (finalException.isDefined) { DriverState.ERROR } else { finalExitCode match { case Some(0) => DriverState.FINISHED case _ => DriverState.FAILED } } finalState = Some(state) // driver启动后向worker发送driver启动的消息 worker ! DriverStateChanged(driverId, state, finalException) } }.start() }
worker接收到driver启动消息后会将消息发送给master
case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") case DriverState.FAILED => logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => logInfo(s"Driver $driverId was killed by user") case _ => logDebug(s"Driver $driverId changed state to $state") } master ! DriverStateChanged(driverId, state, exception) val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver memoryUsed -= driver.driverDesc.mem coresUsed -= driver.driverDesc.cores
内容总结
以上是互联网集市为您收集整理的Spark内核源码解析六:worker原理解析和源码解析全部内容,希望文章能够帮你解决Spark内核源码解析六:worker原理解析和源码解析所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。