Last updated on a month ago
kafka 版本 2.6
创建一个 topic 后,kafka 底层做了什么?生成了什么元数据以及存放在哪? topic 的分区数目是如何分布在各个节点的?
通过命令创建 一个副本数为 1 ,分区数为 1 的 topic,先看看底层发生了什么,然后在分析流程
kafka 日志目录 一个分区对应一个目录,文件有 索引文件和日志文件
1 2 3 4 5 6 7 8 9 10 11 (base) user@userdeMacBook-Air-10 bin % ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test Created topic test. (base) user@userdeMacBook-Air-10 bin % ls -l /tmp/kafka-logs/test-0 total 8 -rw-r--r-- 1 user wheel 10485760 Oct 14 10:33 00000000000000000000.index -rw-r--r-- 1 user wheel 0 Oct 14 10:33 00000000000000000000.log -rw-r--r-- 1 user wheel 10485756 Oct 14 10:33 00000000000000000000.timeindex -rw-r--r-- 1 user wheel 8 Oct 14 10:33 leader-epoch-checkpoint (base) user@userdeMacBook-Air-10 bin %
zookeeper 生成了什么? 记录了配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 localhost:2181 $ get /config/topics/test {"version":1,"config":{}} cZxid = 0x88d ctime = Mon Oct 14 10:33:38 CST 2024 mZxid = 0x88d mtime = Mon Oct 14 10:33:38 CST 2024 pZxid = 0x88d cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 25 numChildren = 0
kafka AdminClient –> broker
kafka 执行命令也是遵从 CS 架构, 有个 AdminClient 现在本地做校验,初步验证没问题后,发送给 broker 处理,本文主要介绍 broker 端口的处理,AdminClient 简单带过
AdminClient AdminClient 源码目录在 ~/kafka-prj/core/src/main/scala/kafka/admin/TopicCommand.scala
我们调用 kafka-topics.sh 命令实现的功能逻辑都在这个文件,这里面有个 TopicCommand 类
在 2.6 的版本还可以用 zookeeper ,命令可以指定用 zookeeper 还是 brokerserve(必须选一个),两者逻辑不一样,这里主要介绍 AdminClientTopicService ,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def main (args: Array [String ]): Unit = { val opts = new TopicCommandOptions (args) opts.checkArgs() val topicService = if (opts.zkConnect.isDefined) ZookeeperTopicService (opts.zkConnect) else AdminClientTopicService (opts.commandConfig, opts.bootstrapServer) var exitCode = 0 try { if (opts.hasCreateOption) topicService.createTopic(opts) } .... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 override def createTopic (topic: CommandTopicPartition ): Unit = { if (topic.replicationFactor.exists(rf => rf > Short .MaxValue || rf < 1 )) throw new IllegalArgumentException (s"The replication factor must be between 1 and ${Short.MaxValue} inclusive" ) if (topic.partitions.exists(partitions => partitions < 1 )) throw new IllegalArgumentException (s"The partitions must be greater than 0" ) try { val newTopic = if (topic.hasReplicaAssignment) new NewTopic (topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get)) else { new NewTopic ( topic.name, topic.partitions.asJava, topic.replicationFactor.map(_.toShort).map(Short .box).asJava) } val configsMap = topic.configsToAdd.stringPropertyNames() .asScala .map(name => name -> topic.configsToAdd.getProperty(name)) .toMap.asJava newTopic.configs(configsMap) val createResult = adminClient.createTopics(Collections .singleton(newTopic)) createResult.all().get() println(s"Created topic ----- ${topic.name} ." ) } catch { }
发送过程省略,又是一个大话题,最后发送到 controller 节点
Broker 最终由 handleCreateTopicsRequest 处理,这里在做进一步的校验,如 是不是 control 节点,鉴权,是否有权限创建,是否已存在等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def handleCreateTopicsRequest (request: RequestChannel .Request ): Unit = { val createTopicsRequest = request.body[CreateTopicsRequest ] val results = new CreatableTopicResultCollection (createTopicsRequest.data.topics.size) if (!controller.isActive) { } else { createTopicsRequest.data.topics.forEach { topic => results.add(new CreatableTopicResult ().setName(topic.name)) } .... } def handleCreateTopicsResults (errors: Map [String , ApiError ]): Unit = { errors.foreach { case (topicName, error) => val result = results.find(topicName) result.setErrorCode(error.error.code) .setErrorMessage(error.message) if (error != ApiError .NONE ) { result.setConfigs(List .empty.asJava) .setNumPartitions(-1 ) .setReplicationFactor(-1 ) .setTopicConfigErrorCode(0. toShort) } } sendResponseCallback(results) } adminManager.createTopics(createTopicsRequest.data.timeoutMs, createTopicsRequest.data.validateOnly, toCreate, authorizedForDescribeConfigs, handleCreateTopicsResults) } }
createTopics 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 def createTopics (timeout: Int , validateOnly: Boolean , toCreate: Map [String , CreatableTopic ], includeConfigsAndMetatadata: Map [String , CreatableTopicResult ], responseCallback: Map [String , ApiError ] => Unit ): Unit = { val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata (b.id, b.rack) } val metadata = toCreate.values.map(topic => try { if (metadataCache.contains(topic.name)) throw new TopicExistsException (s"Topic '${topic.name} ' already exists." ) val configs = new Properties () topic.configs.forEach { entry => configs.setProperty(entry.name, entry.value) } LogConfig .validate(configs) if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR ) && !topic.assignments().isEmpty) { throw new InvalidRequestException ("Both numPartitions or replicationFactor and replicasAssignments were set. " + "Both cannot be used at the same time." ) } val assignments = if (topic.assignments().isEmpty) { AdminUtils .assignReplicasToBrokers( brokers, resolvedNumPartitions, resolvedReplicationFactor) } else { val assignments = new mutable.HashMap [Int , Seq [Int ]] topic.assignments.forEach { assignment => assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int ) } assignments } createTopicPolicy match { case Some (policy) => adminZkClient.validateTopicCreate(topic.name, assignments, configs) if (!validateOnly) adminZkClient.createTopicWithAssignment(topic.name, configs, assignments) case None => if (validateOnly) adminZkClient.validateTopicCreate(topic.name, assignments, configs) else adminZkClient.createTopicWithAssignment(topic.name, configs, assignments) } includeConfigsAndMetatadata.get(topic.name).foreach { result => val logConfig = LogConfig .fromProps(KafkaServer .copyKafkaConfigToLog(config), configs) val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false , includeDocumentation = false )(_, _) val topicConfigs = logConfig.values.asScala.map { case (k, v) => val entry = createEntry(k, v) val source = ConfigSource .values.indices.map(_.toByte) .find(i => ConfigSource .forId(i.toByte) == entry.source) .getOrElse(0. toByte) new CreatableTopicConfigs () .setName(k) .setValue(entry.value) .setIsSensitive(entry.isSensitive) .setReadOnly(entry.isReadOnly) .setConfigSource(source) }.toList.asJava result.setConfigs(topicConfigs) result.setNumPartitions(assignments.size) result.setReplicationFactor(assignments(0 ).size.toShort) } CreatePartitionsMetadata (topic.name, assignments.keySet, ApiError .NONE ) } catch { error(s"Error processing create topic request $topic " , e) CreatePartitionsMetadata (topic.name, Set .empty, ApiError .fromThrowable(e)) }).toBuffer }
createTopicWithAssignment
这里会将 topic 的配置写入 zk 中 ,也就是我们看到的 /config/topics/目录下的内容,以及 /brokers/topics/[topic-name]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def createTopicWithAssignment (topic: String , config: Properties , partitionReplicaAssignment: Map [Int , Seq [Int ]]): Unit = { validateTopicCreate(topic, partitionReplicaAssignment, config) zkClient.setOrCreateEntityConfigs(ConfigType .Topic , topic, config) writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment (v) }, isUpdate = false ) }
此时,已经更新了 zk 数据, Controller 节点上有监听 zk 数据变化的线程(由 TopicChangeListener 监听), 如果 /brokers/topics 目录下发生了变化,会有相对应的逻辑处理,这里是由 processTopicChange 函数处理
processTopicChange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private def processTopicChange (): Unit = { if (!isActive) return val topics = zkClient.getAllTopicsInCluster(true ) val newTopics = topics -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics.diff(topics) controllerContext.setAllTopics(topics) registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics) deletedTopics.foreach(controllerContext.removeTopic) addedPartitionReplicaAssignment.foreach { case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment) } info(s"New topics: [$newTopics ], deleted topics: [$deletedTopics ], new partition replica assignment " + s"[$addedPartitionReplicaAssignment ]" ) if (addedPartitionReplicaAssignment.nonEmpty) onNewPartitionCreation(addedPartitionReplicaAssignment.keySet) }
onNewPartitionCreation 进入状态机流程, 这里 Replica状态机 和 Partition状态机 , 这里先留坑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private def onNewPartitionCreation (newPartitions: Set [TopicPartition ]): Unit = { info(s"New partition creation callback for ${newPartitions.mkString(",")} " ) partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition ) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica ) partitionStateMachine.handleStateChanges( newPartitions.toSeq, OnlinePartition , Some (OfflinePartitionLeaderElectionStrategy (false )) ) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica ) }
… 这中间过程先留坑
最后, control 节点会先发送 leaderAndIsrRequest 请求给 leader ,本地创建副本,也就是 我们分区看到的文件夹