Kafka生产消费
已于 2025年09月05日 16:30 修改
访问次数:0
好的,我来帮你系统整理一下 Kafka 常用的生产/消费命令,并附上对应的 Python confluent_kafka API 使用方法,方便对照。
1️⃣ 创建主题(Topic)
Kafka CLI
# 创建一个 topic
bin/kafka-topics.sh --create --topic my_topic --bootstrap-server 127.0.0.1:9092 --partitions 3 --replication-factor 1
# 查看所有 topic
bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
# 查看 topic 信息
bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server 127.0.0.1:9092
Python (confluent_kafka.admin)
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({'bootstrap.servers': '127.0.0.1:9092'})
topic = NewTopic("my_topic", num_partitions=3, replication_factor=1)
admin.create_topics([topic])
2️⃣ 发送消息(Producer)
Kafka CLI
# 向 topic 发送消息
bin/kafka-console-producer.sh --topic my_topic --bootstrap-server 127.0.0.1:9092
# 然后在终端输入消息,每行一条
hello kafka
Python (confluent_kafka.Producer)
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': '127.0.0.1:9092'})
# 异步发送消息
p.produce('my_topic', key='key1', value='hello kafka')
# 阻塞等待所有消息发送完成
p.flush()
Tips: 可以给 produce() 加上 callback 确认消息是否发送成功。 支持批量发送消息,提高性能。
3️⃣ 消费消息(Consumer)
Kafka CLI
# 从 topic 消费消息
bin/kafka-console-consumer.sh --topic my_topic --bootstrap-server 127.0.0.1:9092 --group my_group --from-beginning
Python (confluent_kafka.Consumer)
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': '127.0.0.1:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
})
c.subscribe(['my_topic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received: {msg.value().decode('utf-8')}")
c.close()
Tips: auto.offset.reset = earliest 从头消费,latest 从最新开始。 可以使用多个消费者实例在同一组实现负载均衡。
4️⃣ 删除主题(Topic)
Kafka CLI
bin/kafka-topics.sh --delete --topic my_topic --bootstrap-server 127.0.0.1:9092
Python (confluent_kafka.admin)
admin.delete_topics(['my_topic'])
注意:删除主题需要 Kafka 配置 delete.topic.enable=true。
5️⃣ 查看消费者组状态
Kafka CLI
# 查看组列表
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
# 查看组详情(每个分区的 offset、lag 等)
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my_group --describe
Python
# confluent_kafka Python 客户端没有直接 CLI 那样的命令
# 需要使用 AdminClient 获取 group 信息:
from confluent_kafka.admin import AdminClient
admin = AdminClient({'bootstrap.servers': '127.0.0.1:9092'})
groups = admin.list_groups()
print(groups)
6️⃣ 总结对照表
| 功能 | Kafka CLI | Python (confluent_kafka) |
|---|---|---|
| 创建 topic | kafka-topics.sh --create ... | AdminClient.create_topics() |
| 查看 topic | kafka-topics.sh --list / --describe | AdminClient.list_topics() |
| 发送消息 | kafka-console-producer.sh | Producer.produce() + flush() |
| 消费消息 | kafka-console-consumer.sh | Consumer.poll() + subscribe() |
| 删除 topic | kafka-topics.sh --delete | AdminClient.delete_topics() |
| 消费组状态 | kafka-consumer-groups.sh --describe | AdminClient.list_groups() |
如果你需要,我可以帮你写一个 完整 Python 示例:
- 多主题
- 多分区
- 多消费者组
- 带消息发送回调
- 打印消费延迟(lag)
这样就可以完全模拟 Kafka CLI 功能。
你希望我写这个完整示例吗?
评论(0)