0X01 背景
大数据过滤、导入,用celery下发任务,任务内容为kafka生产一些数据。
0X02 问题
使用confluent_kafka或python-kafka模块向kafka生产数据,本地调试时代码可以正常生产消息,但是套上celery后,kafka就无法将新消息生产到topic队列中了,具体表现为调用Producer函数后无任何反应。
https://www.cnblogs.com/dplearning/p/7520211.html
https://blog.csdn.net/weixin_34050427/article/details/85940461
0X03 解决方案
- 更换为pykafka库(缺点:pykafka不支持client.id)
- 更换为python-kafka库(缺点:每个tasker都需要实例化一个Producer,否则依然会发生confluent_kafka那种无响应的情况)
如:
#coding: utf-8
from kafka import KafkaProducer
import json
import time
@celery.task(max_retries=1, default_retry_delay=3, ignore_result=True)
def WriteToKafka(key):
"""
将传入数据写入Kafka
"""
prod = KafkaProducer(
bootstrap_servers=BROKER, client_id="xxx", retries=2)
datas = redis_client.smembers(key)
for data in datas:
# 数据包格式化
json_data = {}
future = prod.send('xxx',
json.dumps(json_data).encode("utf-8")).add_errback(on_send_error)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
# 清理使用过的Redis Key
redis_client.delete(key)
prod.flush()
0X04 后记
由于pykafka不支持设置client.id,所以我这里只能使用python-kafka来解决该问题。
虽然将Producer实例化放入task函数中会导致多次建立kafka链接,但是可以通过数据打包的方式让一个tasker执行更多的任务,通过减少tasker的调用量来减少Producer实例化的次数,从而提高效率。
有一个奇怪的问题,当我尝试使用confluent_kafka库,并模仿python-kafka的解决方法——在task函数中实例化Producer时,celery上面执行依然无响应,怀疑是confluent_kafka库的bug
0X05 参考内容
https://github.com/celery/celery/issues/4021
https://github.com/dpkp/kafka-python/issues/1098