Kafka生产消费

好的,我来帮你系统整理一下 Kafka 常用的生产/消费命令,并附上对应的 Python confluent_kafka API 使用方法,方便对照。



1️⃣ 创建主题(Topic)

Kafka CLI

# 创建一个 topic
bin/kafka-topics.sh --create --topic my_topic --bootstrap-server 127.0.0.1:9092 --partitions 3 --replication-factor 1

# 查看所有 topic
bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092

# 查看 topic 信息
bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server 127.0.0.1:9092

Python (confluent_kafka.admin)

from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({'bootstrap.servers': '127.0.0.1:9092'})

topic = NewTopic("my_topic", num_partitions=3, replication_factor=1)
admin.create_topics([topic])


2️⃣ 发送消息(Producer)

Kafka CLI

# 向 topic 发送消息
bin/kafka-console-producer.sh --topic my_topic --bootstrap-server 127.0.0.1:9092
# 然后在终端输入消息,每行一条
hello kafka

Python (confluent_kafka.Producer)

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': '127.0.0.1:9092'})

# 异步发送消息
p.produce('my_topic', key='key1', value='hello kafka')

# 阻塞等待所有消息发送完成
p.flush()
Tips: 可以给 produce() 加上 callback 确认消息是否发送成功。 支持批量发送消息,提高性能。


3️⃣ 消费消息(Consumer)

Kafka CLI

# 从 topic 消费消息
bin/kafka-console-consumer.sh --topic my_topic --bootstrap-server 127.0.0.1:9092 --group my_group --from-beginning

Python (confluent_kafka.Consumer)

from confluent_kafka import Consumer

c = Consumer({
    'bootstrap.servers': '127.0.0.1:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest'
})
c.subscribe(['my_topic'])

while True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Consumer error: {msg.error()}")
        continue
    print(f"Received: {msg.value().decode('utf-8')}")

c.close()
Tips: auto.offset.reset = earliest 从头消费,latest 从最新开始。 可以使用多个消费者实例在同一组实现负载均衡。


4️⃣ 删除主题(Topic)

Kafka CLI

bin/kafka-topics.sh --delete --topic my_topic --bootstrap-server 127.0.0.1:9092

Python (confluent_kafka.admin)

admin.delete_topics(['my_topic'])
注意:删除主题需要 Kafka 配置 delete.topic.enable=true。


5️⃣ 查看消费者组状态

Kafka CLI

# 查看组列表
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

# 查看组详情(每个分区的 offset、lag 等)
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my_group --describe

Python

# confluent_kafka Python 客户端没有直接 CLI 那样的命令
# 需要使用 AdminClient 获取 group 信息:
from confluent_kafka.admin import AdminClient

admin = AdminClient({'bootstrap.servers': '127.0.0.1:9092'})
groups = admin.list_groups()
print(groups)


6️⃣ 总结对照表

功能Kafka CLIPython (confluent_kafka)
创建 topickafka-topics.sh --create ...AdminClient.create_topics()
查看 topickafka-topics.sh --list / --describeAdminClient.list_topics()
发送消息kafka-console-producer.shProducer.produce() + flush()
消费消息kafka-console-consumer.shConsumer.poll() + subscribe()
删除 topickafka-topics.sh --deleteAdminClient.delete_topics()
消费组状态kafka-consumer-groups.sh --describeAdminClient.list_groups()


如果你需要,我可以帮你写一个 完整 Python 示例

  • 多主题
  • 多分区
  • 多消费者组
  • 带消息发送回调
  • 打印消费延迟(lag)

这样就可以完全模拟 Kafka CLI 功能。

你希望我写这个完整示例吗?

文章标签:

评论(0)