• linkedu视频
  • 平面设计
  • 电脑入门
  • 操作系统
  • 办公应用
  • 电脑硬件
  • 动画设计
  • 3D设计
  • 网页设计
  • CAD设计
  • 影音处理
  • 数据库
  • 程序设计
  • 认证考试
  • 信息管理
  • 信息安全
菜单
linkedu.com
  • 网页制作
  • 数据库
  • 程序设计
  • 操作系统
  • CMS教程
  • 游戏攻略
  • 脚本语言
  • 平面设计
  • 软件教程
  • 网络安全
  • 电脑知识
  • 服务器
  • 视频教程
  • JavaScript
  • ASP.NET
  • PHP
  • 正则表达式
  • AJAX
  • JSP
  • ASP
  • Flex
  • XML
  • 编程技巧
  • Android
  • swift
  • C#教程
  • vb
  • vb.net
  • C语言
  • Java
  • Delphi
  • 易语言
  • vc/mfc
  • 嵌入式开发
  • 游戏开发
  • ios
  • 编程问答
  • 汇编语言
  • 微信小程序
  • 数据结构
  • OpenGL
  • 架构设计
  • qt
  • 微信公众号
您的位置:首页 > 程序设计 >编程技巧 > Flink运行时之客户端提交作业图-下

Flink运行时之客户端提交作业图-下

作者:VinoYang的专栏 字体:[增加 减小] 来源:互联网 时间:2017-07-23

VinoYang的专栏通过本文主要向大家介绍了Flink等相关知识,希望对您有所帮助,也希望大家支持linkedu.com www.linkedu.com

submitJob方法分析

JobClientActor通过向JobManager的Actor发送SubmitJob消息来提交Job,JobManager接收到消息对象之后,构建一个JobInfo对象以封装Job的基本信息,然后将这两个对象传递给submitJob方法:

case SubmitJob(jobGraph, listeningBehaviour) =>  
    val client = sender()  
    val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),    
        jobGraph.getSessionTimeout)  
    submitJob(jobGraph, jobInfo)

我们会以submitJob的关键方法调用来串讲其主要逻辑。首先判断jobGraph参数,如果为空则直接回应JobResultFailure消息:

if (jobGraph == null) {  
    jobInfo.client ! decorateMessage(JobResultFailure(    
        new SerializedThrowable(      
            new JobSubmissionException(null, "JobGraph must not be null.")    
        )  
    ))
}

接着,向类库缓存管理器注册该Job相关的库文件、类路径:

libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,  
                                jobGraph.getClasspaths)

必须确保该步骤率先成功执行,因为一旦后续产生任何异常才可以确保上传的类库和Jar等被成功从类库缓存管理器中移除。从这开始的整个代码段都被包裹在try语句块中,一旦捕获到任何异常,会通过libraryCacheManager的unregisterJob方法将相关Jar文件删除:

catch {  case t: Throwable =>    
    libraryCacheManager.unregisterJob(jobId)
    //...
}

接下来是获得用户代码的类加载器classLoader以及发生失败时的重启策略restartStrategy:

val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
val restartStrategy = Option(jobGraph.getRestartStrategyConfiguration())  
    .map(RestartStrategyFactory.createRestartStrategy(_)) match {    
        case Some(strategy) => strategy    
        case None => defaultRestartStrategy  
}

接着,获得执行图ExecutionGraph对象的实例。首先尝试从缓存中查找,如果缓存中存在则直接返回,否则直接创建然后加入缓存:

executionGraph = currentJobs.get(jobGraph.getJobID) match {  
    case Some((graph, currentJobInfo)) =>    
        currentJobInfo.setLastActive()    
        graph  
    case None =>    
        val graph = new ExecutionGraph(      
            executionContext,      
            jobGraph.getJobID,      
            jobGraph.getName,      
            jobGraph.getJobConfiguration,      
            timeout,      
            restartStrategy,      
            jobGraph.getUserJarBlobKeys,      
            jobGraph.getClasspaths,      
            userCodeLoader)    
        currentJobs.put(jobGraph.getJobID, (graph, jobInfo))    
        graph
}

获得了executionGraph之后会对其相关属性进行设置,这些属性包括调度模式、是否允许被加入调度队列、计划的Json格式表示。

executionGraph.setScheduleMode(jobGraph.getScheduleMode())
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph))

接下来初始化JobVertex的一些属性:

val numSlots = scheduler.getTotalNumberOfSlots()
for (vertex <- jobGraph.getVertices.asScala) {  
    val executableClass = vertex.getInvokableClassName 
    if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {    
        vertex.setParallelism(numSlots)  
    }  
    vertex.initializeOnMaster(userCodeLoader)
}

获得JobGraph中从source开始的按照拓扑顺序排序的顶点集合,然后将该集合附加到ExecutionGraph上,附加的过程完成了很多事情,我们后续进行分析:

val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
executionGraph.attachJobGraph(sortedTopology)

接下来将快照配置和检查点配置的信息写入ExecutionGraph:

val snapshotSettings = jobGraph.getSnapshotSettings
if (snapshotSettings != null) {  
    val jobId = jobGraph.getJobID()  
    val idToVertex: JobVertexID => ExecutionJobVertex = id => {    
        val vertex = executionGraph.getJobVertex(id)      
        vertex  
    }  
    val triggerVertices: java.util.List[ExecutionJobVertex] =    
        snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava  
    val ackVertices: java.util.List[ExecutionJobVertex] =    
        snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava  
    val confirmVertices: java.util.List[ExecutionJobVertex] =    
        snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava  
    val completedCheckpoints = checkpointRecoveryFactory    
        .createCompletedCheckpoints(jobId, userCodeLoader)  
    val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)  
    executionGraph.enableSnapshotCheckpointing(    
        snapshotSettings.getCheckpointInterval,    
        snapshotSettings.getCheckpointTimeout,    
        snapshotSettings.getMinPauseBetweenCheckpoints,    
        snapshotSettings.getMaxConcurrentCheckpoints,    
        triggerVertices,    
        ackVertices,    
        confirmVertices,    
        context.system,    
        leaderSessionID.orNull,    
        checkpointIdCounter,    
        completedCheckpoints,    
        recoveryMode,    
        savepointStore)
}

JobManager自身会注册Job状态变更的事件回调:

executionGraph.registerJobStatusListener(new AkkaActorGateway(self, leaderSessionID.orNull))

如果Client也需要感知到执行结果以及Job状态的变更,那么也会为Client注册事件回调:

if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {    
    val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)  
    executionGraph.registerExecutionListener(gateway)  
    executionGraph.registerJobStatusListener(gateway)
}

以上这些代码从将Job相关的Jar加入到类库缓存管理器开始,都被包裹在try块中,如果产生异常将进入catch代码块中进行异常处理:

catch {  
    case t: Throwable =>    
        log.error(s"Failed to submit job $jobId ($jobName)", t)    
        libraryCacheManager.unregisterJob(jobId)    
        currentJobs.remove(jobId)    
        if (executionGraph != null) {      
            executionGraph.fail(t)    
        }    
        val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {      
            t    
        } else {      
            new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)    
        }    
        jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))    
        return
}

异常处理时首先根据jobID移除类库缓存中跟当前Job有关的类库,接着从currentJobsMap中移除job对应的ExecutionGraph,JobInfo元组信息。然后调用ExecutionGraph的fail方法,促使其失败。最后,将产生的异常以JobResultFailure消息告知客户端并结束方法调用。

从当前开始直到最后的这段代码可能会造成阻塞,将会被包裹在future块中并以异步的方式执行。先判断当前的是否是恢复模式,如果是恢复模式则从最近的检查点恢复:

if (isRecovery) {  
    executionGraph.restoreLatestCheckpointedState()
}

如果不是恢复模式,但快照配置中存在保存点路径,也将基于保存点来重置状态:

executionGraph.restoreSavepoint(savepointPath)  

然后会把当前的JobGraph信息写入SubmittedJobGraphStore,它主要用于恢复的目的

submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))

执行到这一步,就可以向Client回复JobSubmitSuccess消息了:

jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))

接下来会基于ExecutionGraph触发Job的调度,这是Task被执行的前提:

if 



 
分享到:QQ空间新浪微博腾讯微博微信百度贴吧QQ好友复制网址打印

您可能想查找下面的文章:

相关文章

  • 2017-05-12Web开发/设计人员应当知道的15个网站
  • 2017-05-12Git基本常用命令
  • 2018-01-022018,想成为软件测试师的你准备好了吗?
  • 2017-05-1212种实现301网页重定向方法的代码实例(含Web编程语言和Web服务器)
  • 2017-05-12Git 教程之创建仓库详解
  • 2017-05-12php和asp利用Shell.Application来执行程序的代码
  • 2017-05-12ibatis简单实现与配置
  • 2017-05-12Git 教程之服务器搭建详解
  • 2017-05-12几款开源的中文分词系统
  • 2017-05-12256种编程语言大汇总

文章分类

  • JavaScript
  • ASP.NET
  • PHP
  • 正则表达式
  • AJAX
  • JSP
  • ASP
  • Flex
  • XML
  • 编程技巧
  • Android
  • swift
  • C#教程
  • vb
  • vb.net
  • C语言
  • Java
  • Delphi
  • 易语言
  • vc/mfc
  • 嵌入式开发
  • 游戏开发
  • ios
  • 编程问答
  • 汇编语言
  • 微信小程序
  • 数据结构
  • OpenGL
  • 架构设计
  • qt
  • 微信公众号

最近更新的内容

    • Web开发人员常用速查手册 英文集合推荐
    • 编程之显示/隐式声明
    • 绑定/约束 (binding)指两个东西之间的关联
    • HTTP 2.0 详细介绍
    • 五个最佳编程文本编辑器分享
    • 关于代码阅读问题的小技巧 脚本之家原创(适合所有网站)不定时更新
    • 分享10个免费超棒的编程用等宽字体
    • asp php 清空access mysql mssql数据库的代码
    • springcloud: 路由网关(zuul)(Finchley版本)
    • git分支的创建、切换、合并及删除操作小结

关于我们 - 联系我们 - 免责声明 - 网站地图

©2020-2025 All Rights Reserved. linkedu.com 版权所有