1. State Backends 的作用

有状态的流计算是Flink的一大特点,状态本质上是数据,数据是需要维护的,例如数据库就是维护数据的一种解决方案。State Backends 的作用就是用来维护State的。一个 State Backend 主要负责两件事:Local State Management(本地状态管理)Remote State Checkpointing(远程状态备份)

2. Local State Management(本地状态管理)

State Management 的主要任务是确保状态的更新和访问。类似于数据库系统对数据的管理,State Backends 的状态管理就是提供对 State 的访问或更新操作,从这一点上看,State Backends 与数据库很相似。Flink 提供的 State Backends 主要有两种形式的状态管理:

  • 直接将 State 以对象的形式存储到JVM的堆上面
  • 将 State 对象序列化后存储到 RocksDB 中(RocksDB会写到本地的磁盘上)

以上两种方式,第一种存储到JVM堆中,因为是在内存中读写,延迟会很低,但State的大小受限于内存的大小;第二种方式存储到State Backends上(本地磁盘上),读写较内存会慢一些,但不受内存大小的限制,同时因为state存储在磁盘上,可以减少应用程序对内存的占用。根据使用经验,对延迟不是特别敏感的应用,选择第二种方式较好,尤其是State比较大的情况下。

3. Remote State Checkpointing(远程状态备份)

Flink程序是分布式运行的,而State都是存储到各个节点上的,一旦TaskManager节点出现问题,就会导致State的丢失。State Backend 提供了 State Checkpointing 的功能,将 TaskManager 本地的 State 的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。不同的 State Backends 备份的方式不同,会有效率高低的区别。

4. 可用的状态后端

  • MemoryStateBackend
    • 对于状态管理,MemoryStateBackend直接将State对象存储到TaskManager的JVM堆上,如MapState会被存储为一个HashMap对象
    • 对于远程备份,MemoryStateBackend会将State备份到JobManager的堆内存上,这种方式是非常不安全的,且受限于JobManager的内存大小
  • FsStateBackend
    • 对于状态管理,FsStateBackend与MemoryStateBackend一样,将State存储到TaskManager的JVM堆上
    • 对于远程备份,FsStateBackend会将State写入到远程的文件系统,如HDFS中
  • RocksDBStateBackend
    • 对于状态管理,RocksDBStateBackend将state存储到TaskManager节点上的RocksDB数据库实例上
    • 对于远程备份,RocksDBstateBackend会将State备份到远程的存储系统中

4.1 MemoryStateBackend

MemoryStateBackend 是将状态维护在 Java 堆上的一个内部状态后端。键值状态和窗口算子使用哈希表来存储数据(values)和定时器(timers)。当应用程序 checkpoint 时,此后端会在将状态发给 JobManager 之前快照下状态,JobManager 也将状态存储在 Java 堆上。默认情况下,MemoryStateBackend 配置成支持异步快照。异步快照可以避免阻塞数据流的处理,从而避免反压的发生。当然,使用 new MemoryStateBackend(MAX_MEM_STATE_SIZE, false) 也可以禁用该特点。
缺点:

  • 默认情况下,每一个状态的大小限制为 5 MB。可以通过 MemoryStateBackend 的构造函数增加这个大小。状态大小受到 akka 帧大小的限制(maxStateSize <= akka.framesize 默认 10 M),所以无论怎么调整状态大小配置,都不能大于 akka 的帧大小。也可以通过 akka.framesize 调整 akka 帧大小
  • 状态的总大小不能超过 JobManager 的内存

使用场景:

  • 本地测试、几乎无状态的作业,比如 ETL、JobManager 不容易挂,或挂掉影响不大的情况
  • 不推荐在生产场景使用

4.2 FsStateBackend

FsStateBackend需要配置的主要是文件系统,如 URL(类型,地址,路径)。
当选择使用 FsStateBackend时,正在进行的数据会被存在TaskManager的内存中。在checkpoint时,此后端会将状态快照写入配置的文件系统和目录的文件中,同时会在JobManager的内存中(在高可用场景下会存在 Zookeeper 中)存储极少的元数据。容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。

默认情况下,FsStateBackend 配置成提供异步快照,以避免在状态 checkpoint 时阻塞数据流的处理。该特性可以实例化 FsStateBackend 时传入false的布尔标志来禁用掉,例如:new FsStateBackend(path, false)
使用场景:

  • 处理大状态,长窗口,或大键值状态的有状态处理任务, 例如分钟级窗口聚合或 join
  • 适合用于高可用方案(需要开启HA的作业)
  • 可以在生产环境中使用

4.3 RocksDBStateBackend

RocksDBStateBackend 的配置也需要一个文件系统(类型,地址,路径)。
RocksDB 是一种嵌入式的本地数据库。RocksDBStateBackend 将处理中的数据使用 RocksDB 存储在本地磁盘上。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。RocksDB 默认也是配置成异步快照的模式。

RocksDB是一个 key/value 的内存存储系统,和其他的 key/value 一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中,但需要注意RocksDB不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过RocksDB支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。它的 Checkpoint 存储在外部文件系统(本地或HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单Key最大2G,总大小不超过配置的文件系统容量即可。
缺点:

  • RocksDB支持的单key和单value的大小最大为每个 2^31 字节。这是因为 RocksDB 的 JNI API 是基于byte[]的
  • 对于使用具有合并操作的状态的应用程序,例如 ListState,随着时间可能会累积到超过 2^31 字节大小,这将会导致在接下来的查询中失败

使用场景:

  • 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务
  • 非常适合用于高可用方案
  • 最好是对状态读写性能要求不高的作业

RocksDB的安装方法:

CentOS7 安装RocksDB
Ubuntu下安装RocksDB

综上所述,MemoryStateBackendFsStateBackend 都是在内存中进行状态管理,所以可以获取较低的读写延迟,但会受限于TaskManager的内存大小;而RocksDBStateBackend直接将State存储到RocksDB数据库中,所以不受JobManager的内存限制,但会有读写延迟,同时 RocksDBStateBackend 支持增量备份,这是其他两个都不支持的特性。一般来说,如果不是对延迟有极高的要求,RocksDBStateBackend是更好的选择。