在使用RocksDB作为state存储之后,发现性能被严重影响,吞吐能力下降了几倍不止,所以根据网上的几篇调优文档做了自己的优化,感谢下面这些博客做出的贡献:
https://www.jianshu.com/p/2e61c2c83c57
https://blog.csdn.net/wangshuo2019/article/details/107250801/
https://blog.csdn.net/huang358468/article/details/115221066
https://github.com/apachecn/flink-doc-zh/blob/master/docs/1.7/119.md
https://www.jianshu.com/p/df98fa755a2d
https://blog.csdn.net/weixin_44904816/article/details/105672235
https://blog.csdn.net/yscoder/article/details/117064790

下面是我自己的flink-conf.yaml文件关于RocksDB的优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
state.backend: rocksdb
# 激活RocksDB压缩过滤清除
state.backend.rocksdb.ttl.compaction.filter.enabled: true
# block大小,默认4KB
state.backend.rocksdb.block.blocksize: 32kb
# block cache大小,默认8MB,内存余量充足建议128m或256m,提升读的性能
state.backend.rocksdb.block.cache-size: 128m
state.backend.rocksdb.compaction.level.use-dynamic-size: true
# 后台负责 flush 和 compaction 的最大并发线程数,默认为1
state.backend.rocksdb.thread.num: 4
# 指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见
state.checkpoints.dir: file:///ssd/flink2kafka/flink-1.12.4/data
# 用于指定定时器服务的工厂类实现类,默认为“HEAP”
state.backend.rocksdb.timer-service.factory: rocksdb
# 用于指定同时可以操作RocksDBStateBackend的线程数量,默认是1
state.backend.rocksdb.checkpoint.transfer.thread.num: 2
# 配置任务本地恢复
state.backend.local-recovery: true
# 指定RocksDB存储状态数据的本地文件路径,在每个TaskManager提供该路径节点中的状态存储
state.backend.rocksdb.localdir: /ssd/flink2kafka/flink-1.12.4/rocksdb
# 设为 false 禁用 RocksDB 内存托管
state.backend.rocksdb.memory.managed: true
# 限制每个slot的RocksDB内存的使用上限,避免了OOM的风险
state.backend.rocksdb.memory.fixed-per-slot: 20mb
# 默认值 0.5,即 50% 的给定内存会分配给写缓冲区使用
state.backend.rocksdb.memory.write-buffer-ratio: 0.9
# 默认值 0.1,即 10% 的 block cache 内存会优先分配给索引及过滤
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
#state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.num-running-compactions: true
# 监控当前的实际延迟写入率
state.backend.rocksdb.metrics.background-errors: true
# 启用了Flink RocksDB指标
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-pinned-usage: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.estimate-table-readers-mem: true
# 监控待处理的压缩,如果压缩失败返回1,否则返回0
state.backend.rocksdb.metrics.compaction-pending: true
# L1层单个 sstable 文件的大小阈值,默认值为64MB
state.backend.rocksdb.compaction.level.target-file-size-base: 32m
# L1层的数据总大小阈值,默认值为256MB,建议设为 target_file_size_base 的倍数,且不能太小
state.backend.rocksdb.compaction.level.max-size-level-base: 160m

调优之前一定要先了解RocksDB的原理,才能根据自己的实际情况去调整参数
这是官方对于RocksDBStateBackend配置的解释:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#advanced-rocksdb-state-backends-options