从5分钟到60秒 Kangaroo Cloud Stack热重启技术提升效率的探索

大数据 2023-08-12 22:17:47
5阅读

更好地提高效率一直是Kangaroo Cloud Stack产品的主要目标之一。目前DataStack客户的实时任务都是基于Per-Job模式。修改部分任务参数后,客户只能先取消当前任务,然后选择CheckPoint恢复或重新运行。整个过程需要3-5分钟。比较浪费时间。为了达到提高效率的目的,我们对Per-Job任务的整体流程分析进行了相关探索。

接下来我就和大家聊聊Datastack在热重启技术方面的探索。

热重启是什么?

热重启技术旨在复用当前Per-Job集群的相关资源,减少重新创建集群和申请资源的时间,同时通过检查点机制。

Flink 的Per-Job 模式意味着每个任务都会对应一个独立的Flink 集群。当任务提交后,会创建一个Flink集群来运行该任务,整个集群只会服务于这个任务。同时Flink集群是不允许继续提交任务的,所以任务修改后,只能Cancel当前任务。重新提交修改后的作业以创建新的Flink 集群来运行。

经过分析,耗时主要有以下两个原因:

• 客户端需要在Yarn上启动Flink集群,这是客户端最耗时的部分,因为这部分包括上传jar、上传文件到Hdfs、申请资源启动Flink集群,这些都是耗时的步骤

• 集群运行时申请资源等操作比较耗时

我们认为,如果只是修改一些任务参数或者Sql逻辑,不涉及代码的修改,那么PerJob任务是否可以像Session模式一样进行改造,支持JobGraph的重新提交,解决解决了Client需要启动Flink集群的耗时问题,大大提高了提交效率。

同时,整个Flink集群的资源得到复用。如果并行度发生变化,只需申请新增资源即可。现有的资源不需要重复应用到Yarn的Resourcemanager中。

热重启改造后的流程

Flink中Per-Job任务运行的整体流程大致如下:

客户端流程

• 客户端创建JobGraph

• 将JobGraph上传到hdfs

• 通过YarnClient 提交YarnApplication 来运行Flink 任务

• 获得结果

Flink 集群流程

• 启动Flink集群,启动WebMonitor、ResourceManager、Dispatcher组件

• 客户端上传到远程文件服务的JobGraph将被反序列化并由DISpatcher持有;

• DISpatcher 会基于这个JobGraph 创建一个JobManagerRunner 对象来运行;

• JobManagerRunner会交给内部的ScheduleNg来调度和运行任务:

A。构建ScheduleNg时,JobGraph将转换为ExecutionGraph

b. ScheduleNg根据ExecutionGraph调度并运行任务

• 任务运行,等待任务结束,并进行相应的回调处理

从5分钟到60秒,袋鼠云数栈在热重启技术上的提效探索之路_复用

从上图我们可以看到,Per-Job任务的操作主要包括两部分:一部分是客户端上传文件jar等操作,然后直接将任务上传到Yarn中启动Flink任务;第二部分是Flink集群启动,然后处理客户端上传的JobGraph到远程文件。因此,为了优化Per-Job下的效率,我们对这两部分进行了修改。

思路逻辑是,集群首先转型支持JobGraph的重新提交,然后当DIspatcher处理JobGraph时,并不是创建新的JobMaster,而是将当前JobGraph中的一些信息填充到新的JobGraph中,这样作为当前任务。 CheckPoint信息等。任务最终的调度操作是JobMaster中的ScheduleNg对象。所以我们认为只需要重建ScheduleNg,其余组件都可以复用。

下图是我们热重启技术改造后的大致流程:

从5分钟到60秒,袋鼠云数栈在热重启技术上的提效探索之路_重启_02

热重启技术改造后流程

• WebMonitor 支持任务提交

• DISpatcher 缓存新的JobGraph

• 取消当前任务并等待异步回调

• 将结果返回给客户端

• 任务取消的异步回调中,主要是热重启的关键改造部分:

A。判断是否有新的JobGraph缓存,如果有则进入热重启逻辑,如果没有则进入当前已有逻辑

b.获取被取消任务的CheckPoint信息,填充到新的JobGraph中

C。将jobGrap更新为JobMaster,并清除之前JobGraph的缓存信息

d.释放JobMaster中SlotPool管理的资源

e. JobMaster重新创建ScheduleNg并调度运行,至此新的JobGraph已经成功调度运行

热重启改造部分详解

JobGraph 介绍

在上面的过程中,JobGraph是整体流程的主要对象,后续的所有操作都是围绕JobGraph进行处理的,所以这里先介绍一下JobGraph。

JobGraph 是Flink 作业的内部表示。它是一种有向无环图(DAG),主要将一些可以优化的算子节点组合成一个节点。从下图可以看出,一个完整的JobGraph图包括Source Sink Transform节点、节点的输出IntermrdiateDataset和输入边JobEdge。除了Application模式外,其他提交模式都是在Client上创建JobGraph,然后通过Rest请求提交到Flink集群处理。

从5分钟到60秒,袋鼠云数栈在热重启技术上的提效探索之路_复用_03

读完JobGraph的结构,可以得到以下信息:

taskVertices:上图中的每个顶点对应一个jobVertex,taskVertices维护了jobGraph图中的每个jobVertex

snapshotSettings:与checkponit相关的配置信息,如CheckPoint的间隔时间等。

savepointRestoreSettings:任务恢复的检查点文件信息。热启动时,新的jobGraph会用上一个任务的Checkpoint信息填充该参数,新任务会在CheckPoint站点恢复运行

jobConfiguration:整个作业的相关配置信息

userJars calsspath:任务运行时需要的一些jar和classpath相关信息

其中,JobVertex是jobGraph中非常重要的一个对象,再看一下这一类结构:JobVertex主要存储了JobEdge、IntermediateDataSet以及并行度等相关信息。对于JobVertex,IntermediateDataSet是JobVertex的输出,JobEdge是其输入。

从5分钟到60秒,袋鼠云数栈在热重启技术上的提效探索之路_重启_04

WebMonitor 改造

WebMonitor 组件是Flink 的Web 端点。可以通过Rest Api查询Flink集群的状态、任务、指标等。还支持任务提交、取消、触发SavePoint等操作。

Per-Job 模式下,Flink 集群不支持客户端继续提交任务运行。因此,需要对WebMonitor进行修改。与Session类似,同一个Flink集群可以持续提交并运行JobGraph。

从下图可以看出,当WebMonitor组件启动时,其本质就是一个以Netty为核心的Web端点。启动时的主要流程如下:

• 创建一个Router 来管理http 请求和处理器处理程序之间的映射关系

•initializeHandlers 初始化所有处理程序。不同集群对应的WebMonitor提供的API函数不同,因此handlers也不同

• 向router注册handlers,完成URL和请求方法(GET、POST、DELETE、PUT)与Handler的映射关系

创建一个Netty处理程序,包装路由器,并将其注册到Netty管道中

从5分钟到60秒,袋鼠云数栈在热重启技术上的提效探索之路_重启_05

WebMonitor支持的各种Rest请求实际上都是交给各个handler来处理的。这些handler是通过Router来维护的,Router内部维护着url和Rest请求方法与handler之间的映射关系。 Router收到客户端的Rest请求后,找到对应的处理器handler,handler进行最终处理并返回结果。

由于Per-job集群不支持客户端继续提交任务,因此initializeHandlers方法初始化的handler中不包含处理任务提交的handler,导致router找不到对应的handler而报错。因此,提交的处理任务需要在initializeHandlers中注册handler。

从5分钟到60秒,袋鼠云数栈在热重启技术上的提效探索之路_缓存_06

JobSubmitHandler处理请求的主要逻辑如下图所示。核心是从Rest请求的Body中反序列化JobGraph,反序列化后的JobGraph通过DIspatcherGateway发送到Dispatcher进行后续的提交处理。

从5分钟到60秒,袋鼠云数栈在热重启技术上的提效探索之路_重启_07

这样客户端只需要重新生成JobGraph然后提交即可,避免了重新上传jar到hdfs,也避免了浪费时间从yarn集群重新申请资源来启动AppMaster。

Dispatcher 改造

DisPatcher,顾名思义,就是一个分发器。其主要功能是Flink集群接收作业提交、取消、触发SavePoint等操作,并将其分发给相应的JobMaster进行处理,或者创建新的JobMaster来执行任务运行。

DisPatcher处理任务提交的核心流程是根据JobGraph创建并启动一个JobManagerRunner对象,然后包装成一个DispatcherJob并在内部缓存。任务的具体调度和执行是由创建的JobManagerRunner异步处理的。

JobManagerRunner内部具体操作其实是JobMasterService,主要实现类是JobMaster。 JobMaster内部有两个主要对象:

ScheduleNg:负责将JobGraph转换为ExecutionGraph,然后调度并运行Job

SlotPool:负责Slot资源的申请和管理

以上就是Dispatcher处理的主要流程。目前改造后,只支持任务的重新提交,但新任务仍然对应一个新的JobMaster,这实际上是一个类似会话的过程,所以为了达到热重启的效果,需要进行如下改造。

主流程的转换逻辑如下:

• 添加了hotRestartJobGraph字段,将新的JobGraph对象分配给该字段

• Dispatcher会取消缓存的运行任务并对异步返回结果进行回调处理

• 直接返回客户端结果

由于Flink整体是异步处理的,所以源码中充满了CompletableFuture的回调处理。主进程只对提交的JobGraph进行缓存处理。热重启的主要步骤在任务取消的回调中处理:

• 判断hotRestartJobGraph是否为空,如果不为空,则进行热重启处理,如果为空,则使用之前的逻辑,关闭整个Per-job集群

• 获取已取消任务的最后一个CheckPoint

• 将CheckPoint 信息填充到新的Jobgraph 中

• 反射将之前Jobgraph生成的JobManagerRunner和jobMaster对象的JobGraph字段替换为新的JobGraph

• jobMaster对象根据jobGraph重新生成scheduleNg进行调度运行

• jobMaster的slotPool会缓存心跳周期内已经释放的slot,这部分缓存需要清除

• MiniDispatcher的close方法修改下,如果hotRestartJobGraph不为空,则不会关闭集群

• hotRestartJobGraph 为空

注意,以上只是部分主要修改,其他边的细节不再重复。

所以在热重启时,DISpatcher不会为每个JobGraph创建一个新的JobMaster对象。通过将新的JobGraph更新为JobMaster,内部仅重新构建了ScheduleNg,其余组件均被复用,例如SlotPool。

之所以需要重建ScheduleNg,是因为JobGraph到ExecutionGraph的转换需要在构建时创建ScheduleNg,所以需要重建一个ScheduleNg来调度和执行任务,这样就实现了整个资源的复用性,大大提高了。效率。

Slot 资源的复用

Flink中对资源的抽象主要是Slot,各个组件对Slot的管理是由不同的组件来处理的:

Flink的ResourceManager由SlotManager管理,主要用于资源申请和任务的管理

JobMaster中的slot管理是SlotPool,主要管理当前任务申请的slot

TaskExecutor中,SlotTable管理Slot,维护JobId和Slot的关系

热重启时,上一个任务取消后,JobMaster中SlotPool管理的slot状态由alted变为available。这样,当JobMaster通过新的ScheduleNg重新调度时,SlotPool中缓存的slot就会被复用,这时候其实就是一个问题。当TaskExecutor接收到任务时,会报错,并且在其内部JobTable中找不到新任务的JobId,因为TaskExecutor维护的Jobid仍然是之前的任务。

因此,JobMaster的SlotPool需要释放其内部缓存信息。请注意,它仅清除内部缓存。此时TaskManager的Slot资源还没有被释放,仍然由Resourcemanager的SlotManager管理。这样,SlotPool如果发现内部没有可用的Slot槽,就会向ResourceManager的SlotManager申请资源,而SlotManager仍然会复用之前的Slot槽,并通过RPC 请求。这样就实现了slot资源的复用,减少了Flink集群的ResourceManager向Yarn的ResourceManager重新申请资源的需要。

总结

Per-job模式下,为了尽快看到任务修改的效果,在业务允许的情况下,采用热重启技术复用相关资源,减少了大量时间,大大提高效率。在开发验证中,上一个任务等待任务结束并重新提交的总运行流程超过4分钟,但在热重启的情况下,可以在1分钟内调度执行。

未来我们将进一步丰富热重启场景,在更多场景下支持热重启技术,比如jar代码修改、如何更新环境中的jar、支持k8s场景等。

袋鼠云一直非常重视产品升级和用户体验,真诚倾听用户需求。新的一年,我们将继续保持产品升级的节奏,以提升效率为目标,满足不同行业用户的更多需求。为了更好的产品、更好的用户体验,Data Stack一直在路上。

《数据治理行业实践白皮书》下载地址:https://fs80.cn/380a4b

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=sz51cto

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术qun」,交流最新开源技术信息,qun号码:30537511,项目地址:https://github.com/DTStack

the end
免责声明:本文不代表本站的观点和立场,如有侵权请联系本站删除!本站仅提供信息存储空间服务。