IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

记录一种工作流心跳机制的设计

四火的唠叨 2016-05-05 22:43:15 累计浏览 1,697 次
本机暂存

   最近工作中一直和SWF(Amazon的Simple Work Flow)打交道,在一个基于SWF的工作流框架上面开发和修bug。SWF的activity超时时间是5分钟,在activity task开始执行以后,activity worker需要主动发送心跳请求告知service端:“我还活着,我还在干活”,如果出现超过5分钟(可以配置)没有心跳,SWF的service端就认为,你已经挂了,我需要把这个activity安排到别的activity worker上来执行了。借用AWS官网的一张图:

   记录一种工作流心跳机制的设计

   可以看到,在activity任务启动起来以后,需要用不断的心跳来告知service端任务还在进行,activity worker还活着。这个“汇报”需要activity worker所在的host主动进行,这也是SWF的service端无状态(几年前写过一点东西介绍它)的基本要求之一。任务都是由worker端去pull的,这些行为也都是worker端主动触发的。

   这个机制描述起来很简单,但是实际在相关设计实现的时候,有许多有趣和值得琢磨的地方。

   在我手头的这个workflow里面,心跳机制是这样实现的:

  • 有两个queue,它们都是dequeue(双端队列),一个是main queue,一个是backup queue,都是用来存放需要发送心跳的activity信息(heartbeatable对象);

  • 每秒钟都尝试执行这样一个方法A:从main queue里面poll一个heartbeatable对象(如果queue为空就忽略本次执行),检查该心跳所代表的activity task是否还在工作,如果是,就发送一个心跳。发送成功以后,就把这个heartbeatable对象扔到backup queue里面去。这样,一秒一个,逐渐地,main queue的heartbeatable对象全部慢慢被转移到backup queue里去了。

  • 每隔两分钟(称为一个cycle)执行方法B:把backup queue里面所有的heartbeatable对象全部转移到main queue里去,于是就又可以继续执行上面一步的逐个心跳逻辑。

   这个机制的基本好处是,所有activity task的心跳统一管理,通常情况下保证了心跳不会过快(默认配置下是一秒一个,或者不发送),同时保证了没有谁会被遗漏:

   记录一种工作流心跳机制的设计

   但是,这里又会浮现好多好多问题:

   为什么要使用两个queue?

   首先,有这样一个事实:方法A在执行的时候,理论上每秒钟会执行一次,但是这里并没有强制的保证,使得前一秒的A执行一定会在这一秒的A开始之前完成。换言之,它们的理论启动时间是按序的,但是实际启动时间和实际的心跳执行时间是不定的,需要处理并发的情形。而到底最多可能存在多少个执行A的线程并行,取决于用于此心跳功能的线程池的配置。因此,在执行和判断的过程中,需要对当前poll出来的heartbeatable对象加锁。

   使用两个queue,这主要是为了记录在本次cycle里面,能够很容易判断某一个heartbeatable对象是否已经完成心跳行为。还没有完成心跳的,都在main queue里;完成了的,都放到backup queue里。

   如果使用一个queue,那么也是有解决方案的:

  • 有一个公共计数器,每个cycle+1。

  • 每一个heartbeatable对象自身需要携带一个私有计数器,用以标识当前这轮cycle的心跳是否已经完成。

  • 每次完成的heartbeatable对象给自己的计数器+1以后扔到队尾;每次A取新的heartbeatable对象的时候从队首取。

  • 如果取到的对象自己的计数器已经等于公共计数器的数值,说明整个queue里面的对象心跳都已经完成了。

   当然,这种方法的弊端在于,判断是否还需要发送心跳这件事情,不仅需要从queue里取对象,还要判断对象的计数器数值,明显比两个queue的解决方案复杂和开销大。因为两个queue的解决方案下,只需要尝试从main queue里面取对象就好,取不到了就说明本次cycle里没有需要发送心跳的对象了。看起来是多了一个queue,但是方案其实还是简单一些。

   心跳的频率保持在多久为好?

   显然不是越高越好,不只是成本,因为心跳也是需要消耗资源的,比如CPU资源;而且,心跳在service端也有throttling,当前activity worker发起太频繁的心跳,当前心跳可能被拒,还可能会让别的activity worker的正常心跳被拒了。

   我们要解决的最核心问题是,正常情况下,必须保持上限5分钟内能发起一次成功心跳就好。

   要这么说,尽量增大cycle,那我设计一个每隔5分钟就执行一次的定时器就好了。但是问题没那么简单,首先要考虑心跳的发起不一定成功。如果在接近5分钟的时候才去尝试发起心跳,一旦失败了,也没有时间重试了。因此,要trade-off。比如,配置cycle为120秒,这样的好处是,5分钟的超时时间内,可以覆盖1~2个完整的cycle。如果cycle配置为3分钟,那么5分钟无法严格保证一定覆盖有一个完整的cycle。

   确定心跳频率的有两个重要参数,一个是方法A的执行频率,一个则是一个cycle的时间长度。例如,前者为1 per second,后者为2分钟,那么在理想情况下,一个cycle 120秒,可以处理120个activity task,换言之,极限是120个activity task在这台机器上一起执行。超过了这个数,就意味着在一个cycle内,无法完成所有的心跳发送任务。

   当然,实际情况没有那么理想,考虑到暂时性的网络问题,线程、CPU资源的竞争等等,实际可以并行的activity task要比这个数低不少。

   异常处理和重试

   在上图中,步骤③有三个箭头,表示了心跳出现不同种情形的处理:

  • 有一些常规异常,比如表示资源不存在,或者任务已经cancel了,这种情况发生的时候,要把相应的activity task给cancel掉,同时,把自己这个heartbeatable对象永久移除出queue。

  • 重试机制1:throttling导致的异常,这种异常发生的时候,把当前heartbeatable对象再addFirst回main queue,因为这不是当前有什么不可解决的或者不明原因的问题造成的,只需要简单重试即可。

  • 重试机制2:其它未知原因的异常,这种情况当然需要重试(之前我们缺少这样的重试机制,导致下一次该activity task能够得到心跳的机会被推到了下一个cycle,这显然是不够合理的),但是,可以把heartbeatable对象放到queue尾部去重试(addLast),并且附上一个私有计数器,如果重试超过一定次数,就挪到下一个cycle(backup queue)去。这个放到queue尾部的办法,使得重试可以在当前cycle里进行,又可以使得这个重试能够尽量不影响其他heartbeatable对象的心跳及时发送。整个重试过程其实就是把当前失败对象再放回queue的过程,没有线程阻塞。

   曾经遇到过一些这方面的问题,经过改进才有了上述的机制:

   在CPU或者load达到一定程度的时候(比如这个时候有一个进程在call service,占用了大量的CPU资源),就很容易发生心跳无法及时进行的问题,比如有时候线程已经初始化了,但是会stuck若干时间,因为没有足够的资源去进行。等到某一时刻,资源被释放(比如这个call service 的进程结束),这个时候之前积攒的心跳任务会一下子爆发出来。不但这些心跳的顺序无法保证,而且严重的情况下会导致throttling。如果没有当前cycle内的重试机制,那么下一次该对象的心跳需要等到下一个cycle,很容易造成activity task的timeout。

   下面再说一个和心跳异常有关的问题。

   有这样一个例子,在这个工作流框架内,我们需要管理EMR资源,有一个activity把EMR cluster初始化完成,另一个activity把实际执行的steps提交上去。但是发现在实际运行时有如下的问题:EMR cluster已经初始化完成,但是steps迟迟没有办法提交上去,导致了这个cluster空闲太长时间,被框架内的monitor认为已经没有人使用了,需要回收,于是EMR cluster就被terminate了。但是这之后,steps才被提交上去,但是这时候cluster已经处于terminating状态了,自然这个step提交就失败了。而经过分析,造成这个EMR cluster非预期的termination,包括这样几个原因:

  • decision task timeout。在EMR cluster创建好之后,SWF会问decider下一步该干嘛,这时候如果因为CPU高负荷等各种原因,导致decision task timeout,SWF就会一直等在那里,而如果这个timeout的时间配得太长,这段超时就足以让上面的这个EMR cluster空闲过长时间导致被误回收了。

  • 判断EMR cluster空闲到一定时间就要回收的逻辑有问题。我们以前的实现是,每隔2分钟执行一次“EMR资源操作”,包括检查资源状态,进行资源操作,然后如果发现该EMR资源创建后经过了4次资源操作,依然没有step提交上去,就认为空闲时间过长,需要回收(2 x 4 = 8分钟)。但是问题在于,实际由于种种原因(和心跳的执行间隔实际时间不确定的原理一样),间隔执行EMR资源操作并不能严格保证每隔2分钟一次,有时一段时间都得不到执行,而有时候会迎来一次集中爆发,这个时候就可能实际EMR资源空闲了远远不到8分钟就被回收了。因此,这个逻辑最好是能够用绝对的“空闲时间”来判断,例如EMR资源创建时记录好时间,之后每次检查时都用当前时间去和创建时间比较,空闲超过8分钟再回收。

  • 由于之前提到过的心跳无法按时完成导致activity task timeout,于是这个EMR cluster创建的任务实际已经完成了,但是被当做超时给无视了。

   最后,我想说的是。设计一个好的工作流框架,还是有很多困难的地方,需要尤其考虑周全的地方。即便是基于SWF这样现有的workflow来搭积木和叠加功能,也有很多不易和有趣的地方。

同分类推荐文章

  1. 等了十年的 Go 链式管道,终于来了:seq 让你像写 Scala 一样写 Go (2026-06-25 18:38:18)
  2. Go 实验特性详解 (2026-06-21 10:05:27)
  3. amd64 微架构级别对 Go 程序性能提升多少? (2026-06-21 09:38:49)

查看更多 后端 文章 →

建议继续学习

  1. 如何设置一个严格30分钟过期的Session (累计阅读 5,200)
  2. Heartbeat+DRBD+MySQL Replication故障处理 (累计阅读 3,395)
  3. 数据库HA方案 (累计阅读 3,065)
  4. 关于DRBD与Heartbeat的一些思考 (累计阅读 3,068)
  5. 一次连接超时问题排查的历程 (累计阅读 2,910)
  6. MySQL Timeout解析 (累计阅读 1,972)
  7. 提高工作效率的方法 (累计阅读 1,854)
  8. 如何在 Linux 中的特定时间运行命令 (累计阅读 1,564)