kafka 的删除策略

Last updated on a month ago

前段时间客户咨询了个Kafka过期策略删除的问题,趁着机会,深入理解下

kafka 日志数据若是设置了清除策略,会按照策略定期清除kafka log 数据,清除方式有两种

清除策略有有两种,分别是compact 和 delete ,两者都能达到释放空间的效果,这里介绍触发的机制和相关参数

从配置文件看参数:

log.retention.hours/ log.retention.minutes/l og.retention.ms

日志留存时间,默认是 72h,以上三个参数都作用相同,控制的时间粒度不同,l若果同时设置 log.retention.ms 优先级最高

log.retention.check.interval.ms

检查日志段以查看是否可以删除的周期时间,默认 5 分钟,即每隔 5 分钟扫描一次日志,看是否有匹配策略的文件

log.segment.bytes

每个日志段最大的大小,默认 1G(注意 : 每个日志段就是一个文件),kafka 数据是不断的追加到一个文件上,当一个日志段达到 log.segment.bytes 大小时,会写入到新的 日志文件上, 此外 kafka 策略删除数据粒度是按照 日志段来的

log.retention.bytes

这个参数独立于 log.retention.hours,如果所有的日志段大小超过 log.retention.bytes,比且超过的大小至少是一个日志段的大小,这样才会被清除; 默认是-1,不开启
举个例子: log.segment.bytes 设置为 100M,log.retention.bytes 为 400M,现有五个日志段,分别是 4 个100M,1 个 50M,总大小是 450M,此时 超过log.retention.bytes 50M,但超出的大小小于一个日志段大小,所以Kafka不会执行任何删除操作

接下来结合代码调试来参数作用

清理任务是用单独的线程任务来执行,这里有很多后台任务线程,这里单独介绍清除策略任务

周期调度 cleanupLogs 这个任务,间隔时间为retentionCheckMs ,该参数就是 log.retention.check.interval.ms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 
def startup(): Unit = {
/* Schedule the cleanup task to delete old logs */
if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))

scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
// .....
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner)
cleaner.startup()
}

当 Kafka 启动后,可以在 server.log 日志中看到提示, 说明当前轮询周期是 5000ms

1
2

[2024-10-10 20:47:24,621] INFO Starting log cleanup with a period of 5000 ms. (kafka.log.LogManager)

现在看看 cleanupLogs() 函数逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def cleanupLogs(): Unit = {
info(s"Beginning log cleanup...")

var total = 0
val startMs = time.milliseconds

// clean current logs.
//这里防止多个 clean 线程竞争
//TODO:
val deletableLogs = {
if (cleaner != null) {
// prevent cleaner from working on same partitions when changing cleanup policy
cleaner.pauseCleaningForNonCompactedPartitions()
} else {
currentLogs.filter {
case (_, log) => !log.config.compact
}
}
}
//deletableLogs 是一个 分区目录集合 ,逐个遍历分区目录
try {
deletableLogs.foreach {
case (topicPartition, log) =>
//开始 检查日志段是否符合,也是关键逻辑,后面单独介绍
total += log.deleteOldSegments()

val futureLog = futureLogs.get(topicPartition)
if (futureLog != null) {
// clean future logs
debug(s"Garbage collecting future log '${futureLog.name}'")
total += futureLog.deleteOldSegments()
}
}
} finally {
if (cleaner != null) {
cleaner.resumeCleaning(deletableLogs.map(_._1))
}
}
info(s"Log cleanup completed. $total files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}

log.deleteOldSegments()

这里策略有三种, 分别是按照时间维度,空间大小维度,偏移量维度,下面分别介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* If topic deletion is enabled, delete any log segments that have either expired due to time based retention
* or because the log size is > retentionSize.
*
* Whether or not deletion is enabled, delete any log segments that are before the log start offset
*/
def deleteOldSegments(): Int = {
if (config.delete) {
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}

时间维度 deleteRetentionMsBreachedSegments

1
2
3
4
5
6
7
8
private def deleteRetentionMsBreachedSegments(): Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
// 闭包的形式传入函数
//比较时间 现在时间减去最近修改的时间?
deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
reason = s"retention time ${config.retentionMs}ms breach")
}

空间维度 deleteRetentionSizeBreachedSegments

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private def deleteRetentionSizeBreachedSegments(): Int = {
if (config.retentionSize < 0 || size < config.retentionSize) return 0
//diff 为改 topic 的差值
var diff = size - config.retentionSize

def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
//如果差值 还大于当前日志段的大小,则开始清理
if (diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
}

偏移量 deleteLogStartOffsetBreachedSegments

通过偏移量删除,是一种外部触发方式,即改变最初始的日志偏移量,小于最初始的偏移量则会被删除 ,可用命令 kafka-delete-records.sh 来变更

1
2
3
4
5
6
7
8
9
10
private def deleteLogStartOffsetBreachedSegments(): Int = {
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
//baseOffset : 是这个分区的偏移量的下限
//logStartOffset : 任何小于logStartOffset的偏移量都不会被客户端访问
// 如果说日志段的最小 baseOffset 小于日志段的起始 则被删除
nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
}

deleteOldSegments(shouldDelete, StartOffsetBreach(this))
}

以上三种方式都是先构造一个判断条件函数(像是闭包?lambada?), 最终都是调用了一个函数 deleteOldSegments

predicate 则是上面构造的验证函数

highWatermark: 高水位即 offset 是被 commit 的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
reason: SegmentDeletionReason): Int = {
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
//确实当前 日志段是已备份的
highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
predicate(segment, nextSegmentOpt)
}
//上锁
lock synchronized {
//deletableSegments 关键流程
val deletable = localLog.deletableSegments(shouldDelete)
if (deletable.nonEmpty)
deleteSegments(deletable, reason)
else
0
}
}


如果都满足条件 则执行 deletableSegments

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
//segments 是一个描述日志段的LogSegment 类,里面很多操作
if (segments.isEmpty) {
Seq.empty
} else {
//创建一个ArrayBuffer类型的deletable,用于存储可删除的日志片段
val deletable = ArrayBuffer.empty[LogSegment]
// 用于遍历segments集合中的所有日志片段
val segmentsIterator = segments.values.iterator
//存放下一个片段
var segmentOpt = nextOption(segmentsIterator)
//这里就开始循环遍历日志段 (感觉像是 leetcode 题目)
while (segmentOpt.isDefined) {
//当前的 segment
val segment = segmentOpt.get
//下一个的 nextSegmentOpt
val nextSegmentOpt = nextOption(segmentsIterator)
val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
//用刚才传入的条件判断是否满足 以及 当前和下一个segment的容量
if (predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) {
//累加
deletable += segment
segmentOpt = nextSegmentOpt
} else {
segmentOpt = Option.empty
}
}
//最大得到 带删除的segment 集合 deletable
deletable
}
}

得到需要删除的 segments 后,调用 deleteSegments

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
val numToDelete = deletable.size

if (numToDelete > 0) {
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
var segmentsToDelete = deletable
if (localLog.segments.numberOfSegments == numToDelete) {
val newSegment = roll()
if (deletable.last.baseOffset == newSegment.baseOffset) {
warn(s"Empty active segment at ${deletable.last.baseOffset} was deleted and recreated due to $reason")
segmentsToDelete = deletable.dropRight(1)
}
}
//检测当前的 segments是否还映射在内存中,还有则抛出异常
localLog.checkIfMemoryMappedBufferClosed()

// 删除日志和索引 removeAndDeleteSegments 比较关键,下面抽出来讲
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
deleteProducerSnapshots(deletable, asyncDelete = true)
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, SegmentDeletion)
}
numToDelete
}
}

一般删除的时候,我们都是先看到 文件出现后面多一个 .delete 后缀,其实就是异步删除,逻辑在 removeAndDeleteSegments

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment],
asyncDelete: Boolean,
reason: SegmentDeletionReason): Unit = {
if (segmentsToDelete.nonEmpty) {
// Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
// removing the deleted segment, we should force materialization of the iterator here, so that results of the
// iteration remain valid and deterministic. We should also pass only the materialized view of the
// iterator to the logic that actually deletes the segments.
val toDelete = segmentsToDelete.toList
reason.logReason(toDelete)
toDelete.foreach { segment =>
//segments 是个 map ,先删除 内存 中的元数据
segments.remove(segment.baseOffset)
}
//正式删除 关键函数
LocalLog.deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
}
}

deleteSegmentFiles

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private[log] def deleteSegmentFiles(segmentsToDelete: immutable.Iterable[LogSegment],
asyncDelete: Boolean,
dir: File,
topicPartition: TopicPartition,
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): Unit = {
segmentsToDelete.foreach {
segment =>
if (!segment.hasSuffix(DeletedFileSuffix))
//变更日志段文件夹的后缀
segment.changeFileSuffixes("", DeletedFileSuffix)
}

def deleteSegments(): Unit = {
info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
val parentDir = dir.getParent
maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
segmentsToDelete.foreach { segment =>
segment.deleteIfExists()
}
}
}
// 默认是异步删除
if (asyncDelete)
//开始调度删除,间隔时间是 fileDeleteDelayMs ,参数 log.segment.delete.delay.ms 可以修改,默认 1min
//deleteSegments 删除流程以后再研究
scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs)
else
deleteSegments()
}