在使用RocksDB作为state存储之后,发现性能被严重影响,吞吐能力下降了几倍不止,所以根据网上的几篇调优文档做了自己的优化,感谢下面这些博客做出的贡献:
https://www.jianshu.com/p/2e61c2c83c57
https://blog.csdn.net/wangshuo2019/article/details/107250801/
https://blog.csdn.net/huang358468/article/details/115221066
https://github.com/apachecn/flink-doc-zh/blob/master/docs/1.7/119.md
https://www.jianshu.com/p/df98fa755a2d
https://blog.csdn.net/weixin_44904816/article/details/105672235
https://blog.csdn.net/yscoder/article/details/117064790
下面是我自己的flink-conf.yaml文件关于RocksDB的优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| state.backend: rocksdb # 激活RocksDB压缩过滤清除 state.backend.rocksdb.ttl.compaction.filter.enabled: true # block大小,默认4KB state.backend.rocksdb.block.blocksize: 32kb # block cache大小,默认8MB,内存余量充足建议128m或256m,提升读的性能 state.backend.rocksdb.block.cache-size: 128m state.backend.rocksdb.compaction.level.use-dynamic-size: true # 后台负责 flush 和 compaction 的最大并发线程数,默认为1 state.backend.rocksdb.thread.num: 4 # 指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见 state.checkpoints.dir: file:///ssd/flink2kafka/flink-1.12.4/data # 用于指定定时器服务的工厂类实现类,默认为“HEAP” state.backend.rocksdb.timer-service.factory: rocksdb # 用于指定同时可以操作RocksDBStateBackend的线程数量,默认是1 state.backend.rocksdb.checkpoint.transfer.thread.num: 2 # 配置任务本地恢复 state.backend.local-recovery: true # 指定RocksDB存储状态数据的本地文件路径,在每个TaskManager提供该路径节点中的状态存储 state.backend.rocksdb.localdir: /ssd/flink2kafka/flink-1.12.4/rocksdb # 设为 false 禁用 RocksDB 内存托管 state.backend.rocksdb.memory.managed: true # 限制每个slot的RocksDB内存的使用上限,避免了OOM的风险 state.backend.rocksdb.memory.fixed-per-slot: 20mb # 默认值 0.5,即 50% 的给定内存会分配给写缓冲区使用 state.backend.rocksdb.memory.write-buffer-ratio: 0.9 # 默认值 0.1,即 10% 的 block cache 内存会优先分配给索引及过滤 state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 #state.backend.rocksdb.metrics.estimate-num-keys: true state.backend.rocksdb.metrics.num-running-compactions: true # 监控当前的实际延迟写入率 state.backend.rocksdb.metrics.background-errors: true # 启用了Flink RocksDB指标 state.backend.rocksdb.metrics.block-cache-capacity: true state.backend.rocksdb.metrics.block-cache-pinned-usage: true state.backend.rocksdb.metrics.block-cache-usage: true state.backend.rocksdb.metrics.estimate-table-readers-mem: true # 监控待处理的压缩,如果压缩失败返回1,否则返回0 state.backend.rocksdb.metrics.compaction-pending: true # L1层单个 sstable 文件的大小阈值,默认值为64MB state.backend.rocksdb.compaction.level.target-file-size-base: 32m # L1层的数据总大小阈值,默认值为256MB,建议设为 target_file_size_base 的倍数,且不能太小 state.backend.rocksdb.compaction.level.max-size-level-base: 160m
|
调优之前一定要先了解RocksDB的原理,才能根据自己的实际情况去调整参数
这是官方对于RocksDBStateBackend配置的解释:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#advanced-rocksdb-state-backends-options