Kafka的生产消费
Kafka 作为一款高性能、高吞吐量的分布式消息队列,在实时数据管道、日志收集、事件驱动架构等场景中被广泛应用。其核心优势在于高吞吐、低延迟、可持久化、高容错等特性。
# 一、Kafka 核心概念与 Zookeeper 的作用
# 1.1 核心概念速览
- Broker:Kafka 集群中的服务器节点,负责存储消息和处理客户端请求。
- Topic:消息的分类容器,生产者向 Topic 发送消息,消费者从 Topic 读取消息(类似数据库的"表")。
- Partition:Topic 的分区,每个 Topic 可分为多个分区,实现消息的并行存储与消费(提升吞吐量)。
- Replica:分区的副本,用于故障容错(副本数越多,可靠性越高,但存储成本也越高)。
- Producer:消息生产者,负责向 Topic 发送消息。
- Consumer:消息消费者,负责从 Topic 读取消息。
- Consumer Group:消费者组,多个消费者组成一个组,共同消费 Topic 数据(同组内消费者分工消费不同分区,避免重复消费)。
- Offset:消息在分区中的唯一序号,用于标记消费者的消费位置(类似"书签")。
# 1.2 为什么需要 Zookeeper?
在 Kafka 2.8 版本之前,Zookeeper 是必选依赖,主要承担以下核心职责:
- 存储元数据:包括 Topic 配置(分区数、副本数等)、Broker 节点信息、分区与 Broker 的映射关系等。
- 协调集群:负责 Broker 节点的注册与发现、Leader 分区的选举(确保分区故障时自动切换)。
- 同步状态:Topic 的创建、删除、修改等操作需通过 Zookeeper 同步到整个集群。
(注:Kafka 2.8+ 版本引入了 KRaft 模式,可脱离 Zookeeper 独立运行,但目前主流生产环境仍多使用 Zookeeper 模式。)
# 二、环境搭建:Docker Compose 部署 Zookeeper 与 Kafka
使用 Docker Compose 可快速搭建本地 Kafka 环境,无需手动配置依赖。
# 2.1 配置文件(docker-compose.yml)
version: '3.8'
services:
# ZooKeeper 服务(Kafka 依赖)
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0 # 稳定版本,与 Kafka 版本兼容
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181 # 客户端连接端口(默认2181)
ZOOKEEPER_TICK_TIME: 2000 # 心跳间隔时间(毫秒,集群同步基础)
ports:
- "2181:2181" # 宿主机端口映射,外部可访问 Zookeeper
volumes:
- ./zookeeper-data:/data # 持久化 Zookeeper 数据(避免容器重启丢失)
networks:
- kafka-network # 加入自定义网络,与 Kafka 通信
# Kafka 服务
kafka:
image: confluentinc/cp-kafka:7.5.0 # 与 Zookeeper 版本保持一致
container_name: kafka
user: root # 避免容器内文件权限问题
depends_on:
- zookeeper # 依赖 Zookeeper,确保其先启动
environment:
KAFKA_BROKER_ID: 1 # Broker 唯一标识(集群模式需为不同值)
# 连接 Zookeeper(Docker 内部通过服务名访问,无需 IP)
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# 监听配置:区分内部(容器间)和外部(宿主机)访问
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
# 暴露给外部的访问地址(宿主机用 localhost,服务器用公网 IP)
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
# 监听协议映射(需与 LISTENERS 一一对应)
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" # 开发环境自动创建 Topic(生产环境建议关闭)
KAFKA_LOG_RETENTION_HOURS: 168 # 消息留存时间(默认7天,可按需调整)
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # 偏移量 Topic 的副本数(单机1即可)
ports:
- "9092:9092" # Docker 内部通信端口(容器间调用用)
- "29092:29092" # 宿主机外部访问端口(本地代码/工具连接用)
volumes:
- ./kafka-data:/var/lib/kafka/data # 持久化 Kafka 消息数据
networks:
- kafka-network
# 自定义网络(隔离容器,避免端口冲突)
networks:
kafka-network:
driver: bridge
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 2.2 启动环境
在配置文件目录执行以下命令启动服务:
docker-compose up -d # -d 表示后台运行
// 验证服务是否启动成功:
docker ps # 查看 zookeeper 和 kafka 容器是否处于运行状态
1
2
3
4
2
3
4
# 三、Topic 管理:创建与查看
Topic 是消息的载体,生产消费前需先创建 Topic(或依赖自动创建)。
# 3.1 创建 Topic
使用 Kafka 内置工具 kafka-topics 创建 Topic:
kafka-topics \
--create \
--bootstrap-server localhost:29092 \ # 连接 Broker(本地用 29092 端口)
--topic test \ # Topic 名称
--if-not-exists \ # 若已存在则不报错(推荐添加)
--partitions 1 \ # 分区数(测试用1,生产环境根据并发需求调整,如4、8)
--replication-factor 1 # 副本数(测试用1,生产环境建议≥2以保证高可用)
1
2
3
4
5
6
7
2
3
4
5
6
7
参数说明:
partitions:分区数越多,并行处理能力越强,但会增加集群管理成本。replication-factor:副本数越多,数据可靠性越高,但会占用更多存储空间。
# 3.2 查看 Topic 列表
kafka-topics --list --bootstrap-server localhost:29092
1
# 3.3 查看 Topic 详情
kafka-topics --describe --bootstrap-server localhost:29092 --topic test
1
输出内容包括分区数、副本分布、Leader 分区等信息,可用于验证 Topic 配置。
# 四、命令行快速体验:生产与消费消息
Kafka 提供了命令行工具,可快速测试 Topic 的生产消费功能。
# 4.1 发送消息(生产者)
kafka-console-producer --broker-list localhost:29092 --topic test
1
执行后进入交互模式,输入任意内容并回车,即可向 test Topic 发送消息。
# 4.2 接收消息(消费者)
kafka-console-consumer --bootstrap-server localhost:29092 --topic test --from-beginning
1
--from-beginning:表示从 Topic 最开始的消息开始消费(默认只消费启动后的新消息)。
打开两个终端,分别执行生产者和消费者命令,即可实时看到消息传递效果。
# 五、Java 代码实现:生产与消费消息
实际开发中,需通过代码实现生产者和消费者。以下是基于 Kafka 客户端的实现示例。
# 5.1 依赖引入(Maven)
在 pom.xml 中添加 Kafka 客户端依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version> <!-- 与 Kafka 服务版本兼容 -->
</dependency>
1
2
3
4
5
2
3
4
5
# 5.2 生产者实现
# 5.2.1 同步发送消息
@Test
void testSyncSendMsg() throws ExecutionException, InterruptedException {
// 1. 配置生产者参数
Properties properties = new Properties();
// 连接 Broker 地址(外部访问用 29092 端口)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
// key 序列化器(将字符串转为字节数组,Kafka 传输的是字节)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value 序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2. 创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3. 发送消息(同步发送:等待 Broker 响应)
for (int i = 0; i < 10; i++) {
// 构建消息:Topic 名称、key(用于分区路由)、value(消息内容)
ProducerRecord<String, String> record = new ProducerRecord<>("test",
String.valueOf(i), // key
"Hello World " + i); // value
// 发送消息并等待结果(同步阻塞)
RecordMetadata metadata = producer.send(record).get();
// 打印发送结果
System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
// 4. 关闭生产者(释放资源)
producer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
关键参数说明:
BOOTSTRAP_SERVERS_CONFIG:Broker 地址列表(多个用逗号分隔,如 "host1:9092,host2:9092")。- 序列化器:Kafka 消息在网络中以字节传输,需指定序列化器将对象转为字节(此处用字符串序列化器)。
# 5.2.2 异步发送消息
同步发送会阻塞等待响应,效率较低;异步发送通过回调函数处理结果,性能更优:
@Test
void testAsyncSendMsg() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test",
String.valueOf(i),
"Async Hello " + i);
// 异步发送:通过 Callback 回调处理结果
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
// 发送成功
System.out.printf("异步发送成功:topic=%s, offset=%d%n",
metadata.topic(), metadata.offset());
} else {
// 发送失败
System.err.println("发送失败:" + e.getMessage());
}
}
});
}
// 关闭前需等待异步回调完成(实际开发中可配合线程池或CountDownLatch)
producer.flush(); // 强制刷新缓冲区
producer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 5.3 消费者实现
@Test
void testPullMsg() {
// 1. 配置消费者参数
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
// 消费者组 ID(必须指定,同组消费者分工消费分区)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
// 自动提交 offset(开发环境简化配置,生产环境建议手动提交)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交 offset 的间隔(毫秒)
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// key 反序列化器(将字节数组转为字符串)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value 反序列化器
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 首次消费位置(earliest:从最早消息开始;latest:从最新消息开始)
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2. 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 3. 订阅 Topic(可订阅多个,用列表传入)
consumer.subscribe(Arrays.asList("test"));
// 4. 循环拉取消息(Kafka 是"拉取式"消费)
while (true) {
// 拉取消息(超时时间:5秒内无消息则返回空)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
// 遍历处理消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
关键参数说明:
GROUP_ID_CONFIG:消费者组 ID,同组内消费者共同消费 Topic,每个分区只能被组内一个消费者消费(避免重复消费)。AUTO_OFFSET_RESET_CONFIG:当消费者组首次消费或 offset 无效时,指定从最早(earliest)还是最新(latest)消息开始消费。poll()方法:消费者主动拉取消息的核心方法,参数为超时时间(避免无限阻塞)。
# 六、注意事项与最佳实践
端口使用:
- 容器间通信用 9092 端口(如其他容器连接 Kafka)。
- 宿主机或外部服务连接用 29092 端口。
Topic 设计:
- 分区数:根据并发需求设置(建议与 Broker 核心数匹配),一旦创建无法减少(可增加)。
- 副本数:生产环境建议 ≥2,确保单 Broker 故障时数据不丢失。
消息可靠性:
- 生产者:通过
acks参数控制可靠性(acks=all表示所有副本确认后才算发送成功,可靠性最高)。 - 消费者:生产环境建议关闭自动提交 offset,通过
consumer.commitSync()手动提交(避免消费失败后 offset 已提交)。
- 生产者:通过
性能优化:
- 生产者:启用批量发送(
batch.size)、压缩消息(compression.type=gzip)。 - 消费者:合理设置
fetch.min.bytes(拉取最小字节数)和fetch.max.wait.ms(最大等待时间),平衡延迟与吞吐量。
- 生产者:启用批量发送(
编辑 (opens new window)
上次更新: 2026/01/21, 19:29:14