1. 什么是状态一致性

有状态的流处理,内部每个算子任务都可以有自己的状态,对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。一条数据不应该丢失,也不应该重复计算。在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。

在流处理中,一致性分为三个级别:

  • at-most-once(最多一次):
    • 这其实是没有正确性保障的委婉说法——故障发生之后, 计数结果可能丢失,类似的还有 udp
  • at-least-once(至少一次):
    • 这表示计数结果可能大于正确值, 但绝不会小于正确值。也就是说, 计数程序在发生故障后可能多算, 但是绝不会少算
    • 它意味着所有事件最终都会处理,虽然有些可能会处理多次
  • exactly-once(精确一次):
    • 这指的是系统保证在发生故障后得到的计数结果与正确值一致
    • 它不但能够保证事件没有丢失,而且每个事件对于内部状态的更新都只有一次
    • Flink利用Checkpoints机制来保证精确一次语义

曾经, at-least-once 非常流行。第一代流处理器(如 Storm 和 Samza)刚问世时只保证 at-least-once, 原因有二:

  • 保证 exactly-once 的系统实现起来更复杂。这在基础架构层(决定什么代表正确, 以及 exactly-once 的范围是什么)和实现层都很有挑战性
  • 流处理系统的早期用户愿意接受框架的局限性, 并在应用层想办法弥补(例如使应用程序具有幂等性, 或者用批量计算层再做一遍计算)

最先保证 exactly-once 的系统(Storm Trident 和 Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证 exactly-once,这些系统无法单独地对每条记录运用应用逻辑, 而是同时处理多条(一批)记录, 保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此, 用户经常不得不使用两个流处理框架(一个用来保证 exactly-once, 另一个用来对每个元素做低延迟处理), 结果使基础设施更加复杂。曾经, 用户不得不在保证exactly-once 与获得低延迟和效率之间权衡利弊。Flink 避免了这种权衡。

Flink 使用了一种轻量级快照机制,检查点(checkpoint)来保证 exactly-once语义。有状态流应用的一致性检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。应用状态的一致性检查点,是 Flink故障恢复机制的核心。
Flink 的一个重大价值在于,它既保证了 exactly-once, 也具有低延迟和高吞吐的处理能力

2. 端到端的状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的; 而在真实应用中, 流处理应用除了流处理器以外还包含了数据源( 例如 Kafka) 和输出到持久化系统。
端到端的一致性保证, 意味着结果的正确性贯穿了整个流处理应用的始终; 每一个组件都保证了它自己的一致性, 整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:

  • 内部保证 —— 依赖 checkpoint
  • source 端 —— 需要外部源可重设数据的读取位置
  • sink 端 —— 需要保证从故障恢复时, 数据不会重复写入外部系统

其中前两种在上文已经介绍过了,下面说说 Sink 如何提供端到端的精确一次性保障。
应用若是想提供端到端的精确一次性保障,就需要一些特殊的Sink连接器,根据情况不同,这些连接器可以使用两种技术来实现精确一次保障:

  • 幂等性写入(idempotent write)
    • 幂等操作的含义就是可以多次执行,但是只会引起一次改变,也就是说, 后面再重复执行就不起作用了
    • 例如我们将相同的键值对插入一个哈希结构中就是一个幂等操作, 因为由于该键值对已存在后,无论插入多少次都不会改变结果
    • 由于可以在不改变结果的前提下多次执行,因此幂等性写操作在一定程度上减轻Flink检查点机制所带来的重复结果的影响
  • 事务性写入(transactional write)
    • 事务性写其实就是原子性写,即只有在上次成功的检查点之前计算的结果才会被写入外部Sink系统
    • 事务性写虽然不会像幂等性写那样出现重放过程中的不一致现象,但是会增加一定延迟,因为结果只有在检查点完成后才对外可见
    • 实现思想:构建的事务对应着Checkpoints,待Checkpoints真正完成的时候,才把所有对应的结果写入Sink系统中

对于事务性写入, 具体又有两种实现方式: 预写日志( WAL)两阶段提交( 2PC)
预写日志

  • 把结果数据先当成状态保存,然后在收到Checkpoints完成的通知时,一次性写入Sink系统
  • 简单易于实现,由于数据提前在状态后端做了缓存,所以无论什么Sink系统都能用这种方式一批搞定
  • 但同时它也存在问题,写入数据时出现故障则会导致一部分数据成功一部分失败
  • DataStream API提供了一个模板类GenericWriteAheadSink,来实现这种事务性Sink

两阶段提交

  • 对于每个Checkpoints,Sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里
  • 然后将这些数据写入外部 Sink,但不提交它们,这时只是“预提交”
  • 当它收到Checkpoints完成的通知时,它才正式提交事务,实现结果的真正写入
  • 这种方式真正实现了精确一次,它需要一个提供事务支持的外部Sink系统,Flink提供了TwoPhaseCommitSinkFunction接口
  • 对外部Sink系统的要求
    • 外部Sink系统必须提供事务支持,或者Sink任务必须能够模拟外部系统上的事务
    • 在Checkpoints的隔离期间里,必须能够开启一个事务并接受数据写入
    • 在收到Checkpoints完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候 Sink系统关闭事务(例如超时了),那么未提交的数据就会丢失
    • Sink任务必须能够在进程失败后恢复事务
    • 提交事务必须是幂等操作

3. 检查点(checkpoint)

Flink 具体如何保证 exactly-once 呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点 的作用。
假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢? 如果项链上有很多珠子,你显然不想从头再数一 遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。

于是,你想了一个更好的办法:在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开;当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手, 让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。
Flink 检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是:对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。
Flink的检查点算法:
Flink 检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。记住 这一基本点之后,我们用一个例子来看检查点是如何运行的。Flink 为用户提供了用 来定义状态的工具。例如,以下这个 Scala 程序按照输入记录的第一个字段(一个字 符串)进行分组并维护第二个字段的计数状态。

1
2
3
4
5
6
7
8
val stream: DataStream[(String, Int)] = ... 
val counts: DataStream[(String, Int)] = stream
.keyBy(record => record._1)
.mapWithState( (in: (String, Int), state: Option[Int]) =>
state match {
case Some(c) => ( (in._1, c + in._2), Some(c + in._2) )
case None => ( (in._1, in._2), Some(in._2) )
})

该程序有两个算子: keyBy 算子用来将记录按照第一个元素(一个字符串)进行分 组,根据该 key 将数据进行重新分区,然后将记录再发送给下一个算子: 有状态的 map 算子(mapWithState)。map 算子在接收到每个元素后,将输入记录的第二个字段 的数据加到现有总数中,再将更新过的元素发射出去。下图表示程序的初始状态: 输 入流中的 6 条记录被检查点分割线(checkpoint barrier)隔开,所有的 map 算子状态均为 0(计数还未开始)。所有 key 为 a 的记录将被顶层的 map 算子处理,所有 key 为 b 的记录将被中间层的 map 算子处理,所有 key 为 c 的记录则将被底层的 map 算子处理。
按 key 累加计数程序初始状态
上图是程序的初始状态。注意,a、b、c 三组的初始计数状态都是 0,即三个圆 柱上的值。ckpt 表示检查点分割线(checkpoint barriers)。每条记录在处理顺序上 严格地遵守在检查点之前或之后的规定,例如[“b”,2]在检查点之前被处理,[“a”,2] 则在检查点之后被处理。
当该程序处理输入流中的 6 条记录时,涉及的操作遍布 3 个并行实例(节点、CPU 内核等)。那么,检查点该如何保证 exactly-once 呢?
检查点分割线和普通数据记录类似。它们由算子处理,但并不参与计算,而是 会触发与检查点相关的行为。当读取输入流的数据源(在本例中与 keyBy 算子内联) 遇到检查点屏障时,它将其在输入流中的位置保存到持久化存储中。如果输入流来 自消息传输系统(Kafka),这个位置就是偏移量。Flink 的存储机制是插件化的,持久 化存储可以是分布式文件系统,如 HDFS。下图展示了这个过程(遇到 checkpoint barrier 时, 保存其在输入流中的位置)

当 Flink 数据源(在本例中与 keyBy 算子内联)遇到检查点分界线(barrier)时, 它会将其在输入流中的位置保存到持久化存储中。这让 Flink 可以根据该位置重启。
检查点像普通数据记录一样在算子之间流动。当 map 算子处理完前 3 条数据并 收到检查点分界线时,它们会将状态以异步的方式写入持久化存储,如下图所示(保存 map 算子状态, 也就是当前各个 key 的计数值)

位于检查点之前的所有记录([“b”,2]、[“b”,3]和[“c”,1])被 map 算子处理之后的情 况。此时,持久化存储已经备份了检查点分界线在输入流中的位置(备份操作发生在 barrier 被输入算子处理的时候)。map 算子接着开始处理检查点分界线,并触发将状 态异步备份到稳定存储中这个动作。
当 map 算子的状态备份和检查点分界线的位置备份被确认之后,该检查点操作 就可以被标记为完成,如下图所示。我们在无须停止或者阻断计算的条件下,在一 个逻辑时间点(对应检查点屏障在输入流中的位置)为计算状态拍了快照。通过确保 备份的状态和位置指向同一个逻辑时间点,后文将解释如何基于备份恢复计算,从 而保证 exactly-once。值得注意的是,当没有出现故障时,Flink 检查点的开销极小, 检查点操作的速度由持久化存储的可用带宽决定。回顾数珠子的例子: 除了因为数 错而需要用到皮筋之外,皮筋会被很快地拨过。

检查点操作完成,状态和位置均已备份到稳定存储中。输入流中的所有数据记 录都已处理完成。值得注意的是,备份的状态值与实际的状态值是不同的。备份反 映的是检查点的状态。
如果检查点操作失败,Flink 可以丢弃该检查点并继续正常执行,因为之后的某 一个检查点可能会成功。虽然恢复时间可能更长,但是对于状态的保证依旧很有力。 只有在一系列连续的检查点操作失败之后,Flink 才会抛出错误,因为这通常预示着 发生了严重且持久的错误。
现在来看看下图所示的情况:检查点操作已经完成,但故障紧随其后(故障紧跟检查点, 导致最底部的实例丢失)

在这种情况下(故障时的状态恢复),Flink 会重新拓扑(可能会获取新的执行资源),将输入流倒回到 上一个检查点,然后恢复状态值并从该处开始继续计算。在本例中,[“a”,2]、[“a”,2] 和[“c”,2]这几条记录将被重播。
下图展示了这一重新处理过程。从上一个检查点开始重新计算,可以保证在剩 下的记录被处理之后,得到的 map 算子的状态值与没有发生故障时的状态值一致。

Flink 将输入流倒回到上一个检查点屏障的位置,同时恢复 map 算子的状态值。 然后,Flink 从此处开始重新处理。这样做保证了在记录被处理之后,map 算子的状 态值与没有发生故障时的一致。
Flink 检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于 Chandy-Lamport 分布式快照算法。
检查点是 Flink 最有价值的创新之一,因为它使 Flink 可以保证 exactly-once, 并且不需要牺牲性能

4. Flink+Kafka 如何实现端到端的 exactly-once 语义

我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于 Flink + Kafka 的数据管道系统(Kafka 进、Kafka 出)而言,各组件怎样保证 exactly-once 语义呢?

  • 内部 —— 利用 checkpoint 机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
  • source —— kafka consumer 作为 source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
  • sink —— kafka producer 作为 sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction

内部的 checkpoint 机制我们已经有了了解,那 source 和 sink 具体又是怎样运行的呢?接下来我们逐步做一个分析。
我们知道 Flink 由 JobManager 协调各个 TaskManager 进行 checkpoint 存储, checkpoint 保存在 StateBackend 中,默认 StateBackend 是内存级的,也可以改为文件级的进行持久化保存。

当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流; barrier 会在算子间传递下去。

每个算子会对当前的状态做个快照,保存到状态后端。对于 source 任务而言, 就会把当前的 offset 作为状态保存起来。下次从 checkpoint 恢复时,source 任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。

每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。
当 sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据 就改为“已确认”,数据就真正可以被消费了。

所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进 行“预提交”,直到执行完 sink 操作,会发起“确认提交”,如果执行失败,预提 交会放弃掉。
具体的两阶段提交步骤总结如下:

  • 第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
  • jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
  • sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
  • jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
  • sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
  • 外部 kafka 关闭事务,提交的数据可以正常消费了

所以我们也可以看到,如果宕机需要通过 StateBackend 进行恢复,只能恢复所有确认提交的操作。