Tavio's blog Tavio's blog
首页
  • JVM底层原理
  • 邪恶多线程
  • MyBatis底层原理
  • Spring底层原理
  • MySQL的优化之路
  • ClickHouse的高性能
  • Redis的快速查询
  • RabbitMQ的生产
  • Kafka的高吞吐量
  • ES的入门到入坑
  • MySQL自增ID主键空洞
  • 前端实现长整型排序
  • MySQL无感换表
  • Redis延时双删
  • 高并发秒杀优惠卷
  • AOP无侵入式告警
  • 长短链接跳转
  • 订单超时取消
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Tavio Zhang

努力学习的小码喽
首页
  • JVM底层原理
  • 邪恶多线程
  • MyBatis底层原理
  • Spring底层原理
  • MySQL的优化之路
  • ClickHouse的高性能
  • Redis的快速查询
  • RabbitMQ的生产
  • Kafka的高吞吐量
  • ES的入门到入坑
  • MySQL自增ID主键空洞
  • 前端实现长整型排序
  • MySQL无感换表
  • Redis延时双删
  • 高并发秒杀优惠卷
  • AOP无侵入式告警
  • 长短链接跳转
  • 订单超时取消
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Kafka的深度解析
  • Kafka消息存储
  • Kafka的生产消费
    • 一、Kafka 核心概念与 Zookeeper 的作用
      • 1.1 核心概念速览
      • 1.2 为什么需要 Zookeeper?
    • 二、环境搭建:Docker Compose 部署 Zookeeper 与 Kafka
      • 2.1 配置文件(docker-compose.yml)
      • 2.2 启动环境
    • 三、Topic 管理:创建与查看
      • 3.1 创建 Topic
      • 3.2 查看 Topic 列表
      • 3.3 查看 Topic 详情
    • 四、命令行快速体验:生产与消费消息
      • 4.1 发送消息(生产者)
      • 4.2 接收消息(消费者)
    • 五、Java 代码实现:生产与消费消息
      • 5.1 依赖引入(Maven)
      • 5.2 生产者实现
      • 5.2.1 同步发送消息
      • 5.2.2 异步发送消息
      • 5.3 消费者实现
    • 六、注意事项与最佳实践
  • 《Kafka》笔记
Tavio
2025-03-08
目录

Kafka的生产消费

Kafka 作为一款高性能、高吞吐量的分布式消息队列,在实时数据管道、日志收集、事件驱动架构等场景中被广泛应用。其核心优势在于高吞吐、低延迟、可持久化、高容错等特性。

# 一、Kafka 核心概念与 Zookeeper 的作用

# 1.1 核心概念速览

  • Broker:Kafka 集群中的服务器节点,负责存储消息和处理客户端请求。
  • Topic:消息的分类容器,生产者向 Topic 发送消息,消费者从 Topic 读取消息(类似数据库的"表")。
  • Partition:Topic 的分区,每个 Topic 可分为多个分区,实现消息的并行存储与消费(提升吞吐量)。
  • Replica:分区的副本,用于故障容错(副本数越多,可靠性越高,但存储成本也越高)。
  • Producer:消息生产者,负责向 Topic 发送消息。
  • Consumer:消息消费者,负责从 Topic 读取消息。
  • Consumer Group:消费者组,多个消费者组成一个组,共同消费 Topic 数据(同组内消费者分工消费不同分区,避免重复消费)。
  • Offset:消息在分区中的唯一序号,用于标记消费者的消费位置(类似"书签")。

# 1.2 为什么需要 Zookeeper?

在 Kafka 2.8 版本之前,Zookeeper 是必选依赖,主要承担以下核心职责:

  • 存储元数据:包括 Topic 配置(分区数、副本数等)、Broker 节点信息、分区与 Broker 的映射关系等。
  • 协调集群:负责 Broker 节点的注册与发现、Leader 分区的选举(确保分区故障时自动切换)。
  • 同步状态:Topic 的创建、删除、修改等操作需通过 Zookeeper 同步到整个集群。

(注:Kafka 2.8+ 版本引入了 KRaft 模式,可脱离 Zookeeper 独立运行,但目前主流生产环境仍多使用 Zookeeper 模式。)

# 二、环境搭建:Docker Compose 部署 Zookeeper 与 Kafka

使用 Docker Compose 可快速搭建本地 Kafka 环境,无需手动配置依赖。

# 2.1 配置文件(docker-compose.yml)

version: '3.8'

services:
  # ZooKeeper 服务(Kafka 依赖)
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0  # 稳定版本,与 Kafka 版本兼容
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181  # 客户端连接端口(默认2181)
      ZOOKEEPER_TICK_TIME: 2000    # 心跳间隔时间(毫秒,集群同步基础)
    ports:
      - "2181:2181"  # 宿主机端口映射,外部可访问 Zookeeper
    volumes:
      - ./zookeeper-data:/data  # 持久化 Zookeeper 数据(避免容器重启丢失)
    networks:
      - kafka-network  # 加入自定义网络,与 Kafka 通信

  # Kafka 服务
  kafka:
    image: confluentinc/cp-kafka:7.5.0  # 与 Zookeeper 版本保持一致
    container_name: kafka
    user: root  # 避免容器内文件权限问题
    depends_on:
      - zookeeper  # 依赖 Zookeeper,确保其先启动
    environment:
      KAFKA_BROKER_ID: 1  # Broker 唯一标识(集群模式需为不同值)
      # 连接 Zookeeper(Docker 内部通过服务名访问,无需 IP)
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      # 监听配置:区分内部(容器间)和外部(宿主机)访问
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
      # 暴露给外部的访问地址(宿主机用 localhost,服务器用公网 IP)
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      # 监听协议映射(需与 LISTENERS 一一对应)
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"  # 开发环境自动创建 Topic(生产环境建议关闭)
      KAFKA_LOG_RETENTION_HOURS: 168  # 消息留存时间(默认7天,可按需调整)
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  # 偏移量 Topic 的副本数(单机1即可)
    ports:
      - "9092:9092"   # Docker 内部通信端口(容器间调用用)
      - "29092:29092" # 宿主机外部访问端口(本地代码/工具连接用)
    volumes:
      - ./kafka-data:/var/lib/kafka/data  # 持久化 Kafka 消息数据
    networks:
      - kafka-network

# 自定义网络(隔离容器,避免端口冲突)
networks:
  kafka-network:
    driver: bridge
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# 2.2 启动环境

在配置文件目录执行以下命令启动服务:

docker-compose up -d  # -d 表示后台运行

// 验证服务是否启动成功:
docker ps  # 查看 zookeeper 和 kafka 容器是否处于运行状态
1
2
3
4

# 三、Topic 管理:创建与查看

Topic 是消息的载体,生产消费前需先创建 Topic(或依赖自动创建)。

# 3.1 创建 Topic

使用 Kafka 内置工具 kafka-topics 创建 Topic:

kafka-topics \
  --create \
  --bootstrap-server localhost:29092 \  # 连接 Broker(本地用 29092 端口)
  --topic test \  # Topic 名称
  --if-not-exists \  # 若已存在则不报错(推荐添加)
  --partitions 1 \  # 分区数(测试用1,生产环境根据并发需求调整,如4、8)
  --replication-factor 1  # 副本数(测试用1,生产环境建议≥2以保证高可用)
1
2
3
4
5
6
7

参数说明:

  • partitions:分区数越多,并行处理能力越强,但会增加集群管理成本。
  • replication-factor:副本数越多,数据可靠性越高,但会占用更多存储空间。

# 3.2 查看 Topic 列表

kafka-topics --list --bootstrap-server localhost:29092
1

# 3.3 查看 Topic 详情

kafka-topics --describe --bootstrap-server localhost:29092 --topic test
1

输出内容包括分区数、副本分布、Leader 分区等信息,可用于验证 Topic 配置。

# 四、命令行快速体验:生产与消费消息

Kafka 提供了命令行工具,可快速测试 Topic 的生产消费功能。

# 4.1 发送消息(生产者)

kafka-console-producer --broker-list localhost:29092 --topic test
1

执行后进入交互模式,输入任意内容并回车,即可向 test Topic 发送消息。

# 4.2 接收消息(消费者)

kafka-console-consumer --bootstrap-server localhost:29092 --topic test --from-beginning
1
  • --from-beginning:表示从 Topic 最开始的消息开始消费(默认只消费启动后的新消息)。

打开两个终端,分别执行生产者和消费者命令,即可实时看到消息传递效果。

# 五、Java 代码实现:生产与消费消息

实际开发中,需通过代码实现生产者和消费者。以下是基于 Kafka 客户端的实现示例。

# 5.1 依赖引入(Maven)

在 pom.xml 中添加 Kafka 客户端依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>  <!-- 与 Kafka 服务版本兼容 -->
</dependency>
1
2
3
4
5

# 5.2 生产者实现

# 5.2.1 同步发送消息

  @Test
  void testSyncSendMsg() throws ExecutionException, InterruptedException {
      // 1. 配置生产者参数
      Properties properties = new Properties();
      // 连接 Broker 地址(外部访问用 29092 端口)
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
      //  key 序列化器(将字符串转为字节数组,Kafka 传输的是字节)
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      //  value 序列化器
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

      // 2. 创建生产者实例
      KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

      // 3. 发送消息(同步发送:等待 Broker 响应)
      for (int i = 0; i < 10; i++) {
          // 构建消息:Topic 名称、key(用于分区路由)、value(消息内容)
          ProducerRecord<String, String> record = new ProducerRecord<>("test", 
                                                                        String.valueOf(i),  // key
                                                                        "Hello World " + i); // value
          // 发送消息并等待结果(同步阻塞)
          RecordMetadata metadata = producer.send(record).get();
          // 打印发送结果
          System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
      }

      // 4. 关闭生产者(释放资源)
      producer.close();
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

关键参数说明:

  • BOOTSTRAP_SERVERS_CONFIG:Broker 地址列表(多个用逗号分隔,如 "host1:9092,host2:9092")。
  • 序列化器:Kafka 消息在网络中以字节传输,需指定序列化器将对象转为字节(此处用字符串序列化器)。

# 5.2.2 异步发送消息

同步发送会阻塞等待响应,效率较低;异步发送通过回调函数处理结果,性能更优:

@Test
void testAsyncSendMsg() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    for (int i = 0; i < 10; i++) {
        ProducerRecord<String, String> record = new ProducerRecord<>("test", 
                                                                     String.valueOf(i), 
                                                                     "Async Hello " + i);
        // 异步发送:通过 Callback 回调处理结果
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e == null) {
                    // 发送成功
                    System.out.printf("异步发送成功:topic=%s, offset=%d%n",
                                     metadata.topic(), metadata.offset());
                } else {
                    // 发送失败
                    System.err.println("发送失败:" + e.getMessage());
                }
            }
        });
    }

    // 关闭前需等待异步回调完成(实际开发中可配合线程池或CountDownLatch)
    producer.flush();  // 强制刷新缓冲区
    producer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 5.3 消费者实现

  @Test
  void testPullMsg() {
      // 1. 配置消费者参数
      Properties properties = new Properties();
      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
      // 消费者组 ID(必须指定,同组消费者分工消费分区)
      properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
      // 自动提交 offset(开发环境简化配置,生产环境建议手动提交)
      properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      // 自动提交 offset 的间隔(毫秒)
      properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
      // key 反序列化器(将字节数组转为字符串)
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      // value 反序列化器
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      // 首次消费位置(earliest:从最早消息开始;latest:从最新消息开始)
      properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

      // 2. 创建消费者实例
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

      // 3. 订阅 Topic(可订阅多个,用列表传入)
      consumer.subscribe(Arrays.asList("test"));

      // 4. 循环拉取消息(Kafka 是"拉取式"消费)
      while (true) {
          // 拉取消息(超时时间:5秒内无消息则返回空)
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
          
          // 遍历处理消息
          for (ConsumerRecord<String, String> record : records) {
              System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                                record.topic(), record.partition(), record.offset(),
                                record.key(), record.value());
          }
      }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

关键参数说明:

  • GROUP_ID_CONFIG:消费者组 ID,同组内消费者共同消费 Topic,每个分区只能被组内一个消费者消费(避免重复消费)。
  • AUTO_OFFSET_RESET_CONFIG:当消费者组首次消费或 offset 无效时,指定从最早(earliest)还是最新(latest)消息开始消费。
  • poll() 方法:消费者主动拉取消息的核心方法,参数为超时时间(避免无限阻塞)。

# 六、注意事项与最佳实践

  1. 端口使用:

    • 容器间通信用 9092 端口(如其他容器连接 Kafka)。
    • 宿主机或外部服务连接用 29092 端口。
  2. Topic 设计:

    • 分区数:根据并发需求设置(建议与 Broker 核心数匹配),一旦创建无法减少(可增加)。
    • 副本数:生产环境建议 ≥2,确保单 Broker 故障时数据不丢失。
  3. 消息可靠性:

    • 生产者:通过 acks 参数控制可靠性(acks=all 表示所有副本确认后才算发送成功,可靠性最高)。
    • 消费者:生产环境建议关闭自动提交 offset,通过 consumer.commitSync() 手动提交(避免消费失败后 offset 已提交)。
  4. 性能优化:

    • 生产者:启用批量发送(batch.size)、压缩消息(compression.type=gzip)。
    • 消费者:合理设置 fetch.min.bytes(拉取最小字节数)和 fetch.max.wait.ms(最大等待时间),平衡延迟与吞吐量。
编辑 (opens new window)
#Kafka的生产消费
上次更新: 2026/01/21, 19:29:14
Kafka消息存储

← Kafka消息存储

最近更新
01
订单超时取消
01-21
02
双 Token 登录
01-21
03
长短链接跳转
01-21
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式