python-kafka手动提交消息测试
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python-kafka手动提交消息测试,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2352字,纯文字阅读大概需要4分钟。
内容图文
![python-kafka手动提交消息测试](/upload/InfoBanner/zyjiaocheng/1317/3a0cdd548c8d45219b03c3bfc02ab03c.jpg)
发送方代码
import json import time import traceback from kafka import KafkaProducer from kafka.errors import kafka_errors if__name__ == ‘__main__‘: producer = KafkaProducer( bootstrap_servers=[‘192.168.32.10:9092‘], key_serializer=lambda k: json.dumps(k).encode(), value_serializer=lambda v: json.dumps(v).encode()) # 发送三条消息for i in range(0, 3): time.sleep(2) data = { "filePath": "filePath", "dataName": "2222", "index": i } future = producer.send( ‘kafka_test‘, # 同一个key值,会被送至同一个分区 key=11, value=data) # 向分区1发送消息print("send {}".format(data)) try: future.get(timeout=100) # 监控是否发送成功except kafka_errors: # 发送失败抛出kafka_errors traceback.format_exc()
接收方代码
import json import time from threading import Thread from kafka import KafkaConsumer def parse(con, value): try: i = value["index"] print(i, time.ctime(), "start") time.sleep(10 - i * 3) # if i == 0:# pass# else:# con.commit()print(i, time.ctime(), "end") except ZeroDivisionError: print("除 0 ") con.commit() except Exception as e: print(e) finally: print("---") if__name__ == ‘__main__‘: consumer = KafkaConsumer( bootstrap_servers=[‘192.168.32.10:9092‘], group_id=‘test‘, auto_offset_reset=‘latest‘, value_deserializer=lambda m: json.loads(m.decode()), enable_auto_commit=False ) consumer.subscribe(topics=["kafka_test"]) while True: fetch_data_dict = consumer.poll(timeout_ms=100, max_records=1) # print("msg=====", fetch_data_dict)for keys, values in fetch_data_dict.items(): for i in values: th = Thread(target=parse, args=(consumer, i.value)) th.start()
测试场景一:
设置enable_auto_commit=False后,发送方发送3条消息,接收方不手动commit(), 则下次重启消费方会重复消费上次未提交的数据
测试场景二:
设想:三条消息,每条消息消费的时长不一致,假设第一条10s, 第二条7s,第三条4s, kafka的消费是按照顺序的,我们启动三个线程对应消费三条消息。可想而知第三条消费最快,提交后会阻塞吧?因为此时第二条,第一条还没提交。
结论:不会阻塞,查了文档不知道为啥。。大家知道可以留言讨论
测试场景三:
还是启动三个线程对应消费三条消息,parse方法如下,第一条消费失败,第二条第三条正常消费提交,此时消费方重启,还会消费几次?
def parse(con, value): i = value["index"] print(i, time.ctime(), "start") # time.sleep(10 - i * 3)print(10/i) # if i == 0:# pass# else:# con.commit() con.commit() print(i, time.ctime(), "end")
结论: 不会消费了,第一条被提交了
测试场景四:
启动三个线程对应消费三条消息,第一条不提交,第二条第三条正常消费提交,此时消费方重启,还会消费吗?
结论: 不会消费了,第一条被提交了
测试场景五:
不启动线程,同步消费,,第一条消费失败,第二条第三条正常消费提交,此时消费方重启,还会消费吗?
结论:第一,二,三会再次消费
测试场景六:
不启动线程,同步消费,第一条不提交,第二条第三条正常消费提交,此时消费方重启,还会消费吗?
结论:第一,二,三会再次消费
可见启动线程异步消费的话,部分失败的消息会被成功的消息commit掉,同步消费则不会!
原文:https://www.cnblogs.com/saintlu/p/15130417.html
内容总结
以上是互联网集市为您收集整理的python-kafka手动提交消息测试全部内容,希望文章能够帮你解决python-kafka手动提交消息测试所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。