kafka数据清理

Kafka 的消息存储在磁盘中,为了控制磁盘占用空间,Kafka 需要不断地对过去的一些消息进行清理工作。Kafka 的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在 Kafka 中,提供两种日志清理方式:

  • 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。
  • 日志压缩(Log Compaction):按照消息的 key 进行整合,有相同 key 的但有不同 value 值,只保留最后一个版本。

在Kafka的broker或topic配置中:

配置项 配置值 说明
log.cleaner.enable true(默认) 开启自动清理日志功能
log.cleanup.policy delete(默认) 删除日志
log.cleanup.policy compaction 压缩日志
log.cleanup.policy delete,compact 同时支持删除、压缩

日志删除是以段(segment日志)为单位来进行定期清理的。

日志删除(Log Deletion)

Kafka 日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数log.retention.check.interval.ms来配置,默认值为 300,000,即 5 分钟。当前日志分段的保留策略有 3 种:

  1. 基于时间的保留策略
  2. 基于日志大小的保留策略
  3. 基于日志起始偏移量的保留策略

1. 基于时间的保留策略

以下三种配置可以指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:

其中,优先级为log.retention.ms > log.retention.minutes > log.retention.hours。默认情况,在broker中,配置如下:log.retention.hours=168,也就是,默认日志的保留时间为 168 小时,相当于保留 7 天。

删除日志分段时:

  1. 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作
  2. 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)
  3. Kafka 的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为 60000,即 1 分钟。

2. 基于日志大小的保留策略

日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过 broker 端参数log.retention.bytes来配置,默认值为 -1,表示无穷大。如果超过该大小,会自动将超出部分删除。

注意:log.retention.bytes配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。

3. 基于日志起始偏移量保留策略

每个 segment 日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。

日志压缩(Log Compaction)

Log Compaction 是默认的日志删除之外的清理过时数据的方式。它会将相同的 key 对应的数据只保留一个版本。

  • Log Compaction 执行后,offset 将不再连续,但依然可以查询 Segment
  • Log Compaction 执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction 会生成一个新的 Segment 文件
  • Log Compaction 是针对 key 的,在使用的时候注意每个消息的 key 不为空
  • 基于 Log Compaction 可以保留 key 的最新更新,可以基于 Log Compaction 来恢复消费者的最新状态