news 2026/4/18 1:53:46

Kafka批量消费实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka批量消费实现

批量消费指的是一次性拉取一批消息,然后批量处理
依赖spring-kafka

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.4.RELEASE</version></dependency>

配置消费者工厂

@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConsumerFactory<String,String>consumerFactory(){Map<String,Object>props=newHashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG,"batch-consumer-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);// 批量消费配置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,100);// 每次最多拉取100条props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,10240);// 至少10KB才返回returnnewDefaultKafkaConsumerFactory<>(props);}@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>batchFactory(ConsumerFactory<String,String>consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setBatchListener(true);// 启用批量监听// 手动提交模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);returnfactory;}}

实现批量消费监听器

@ServicepublicclassOrderBatchConsumer{@KafkaListener(topics="order-topic",containerFactory="batchFactory")publicvoidconsumeBatch(List<ConsumerRecord<String,String>>records,Acknowledgmentack){log.info("收到批量消息: {} 条",records.size());try{// 业务处理for(ConsumerRecord<String,String>record:records){processOrder(record.value());}// 全部成功才提交offsetack.acknowledge();log.info("批次处理成功,已提交offset");}catch(Exceptione){log.error("批量处理失败",e);// 不提交offset,等待重新投递}}}

批量消费可靠性保障
不能在finally中提交offset,因为不管消费是否成功都会提交offset

@KafkaListener(topics="topic",containerFactory="batchFactory")publicvoidconsume(List<ConsumerRecord<String,String>>records,Acknowledgmentack){try{processBatch(records);}finally{ack.acknowledge();// 无论成败都提交,会丢消息!}}

不能设置自动提交,自动提交模式下,批量消费有可能会丢消息。要改为手动提交

props.put("enable.auto.commit","true");// 自动提交// 处理过程中失败,但offset已自动提交,消息丢失

需要用到线程池的时候,需要等到全部消息处理成功才能提交。使用CompletionService实现

@ServicepublicclassReliableBatchConsumer{privatefinalExecutorServiceexecutor=Executors.newFixedThreadPool(10);@KafkaListener(topics="payment-topic",containerFactory="batchFactory")publicvoidconsumeBatch(List<ConsumerRecord<String,String>>records,Acknowledgmentack){CompletionService<Boolean>completionService=newExecutorCompletionService<>(executor);List<Future<Boolean>>futures=newArrayList<>();// 1. 提交所有任务到线程池并发处理for(ConsumerRecord<String,String>record:records){Callable<Boolean>task=()->{try{processPayment(record.value());returntrue;}catch(Exceptione){log.error("支付处理失败: {}",record.value(),e);returnfalse;}};futures.add(completionService.submit(task));}// 2. 等待所有任务完成并检查结果booleanallSuccess=true;try{for(inti=0;i<records.size();i++){Future<Boolean>future=completionService.take();if(!future.get()){allSuccess=false;break;// 发现失败立即终止}}}catch(Exceptione){allSuccess=false;log.error("任务执行异常",e);}// 3. 全部成功才提交offsetif(allSuccess){ack.acknowledge();log.info("批次全部成功,已提交offset");}else{log.warn("批次中有失败消息,不提交offset,等待重新投递");// 重新投递会导致重复消费,需要业务保证幂等性}}}

当有失败消息的时候,不提交offset,等待消息重投或者重新拉取,但是会有消息重复的情况,业务上要做好幂等。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/17 21:42:08

2026市场主流APP制作公司有哪些?其核心功能与选择建议梳理

摘要如果你在寻找“最适合自己的APP制作公司”&#xff0c;核心结论是&#xff1a;没有绝对的最优解&#xff0c;只有基于你项目类型、预算、工期和技术栈的最适配方案。 对于追求高定制化、全流程把控且预算充足的中大型项目&#xff0c;拥有CMMI3/ISO27001等国际认证、技术团…

作者头像 李华
网站建设 2026/4/16 10:20:02

GLM-TTS能否用于紧急警报系统?高穿透力语音生成研究

GLM-TTS能否用于紧急警报系统&#xff1f;高穿透力语音生成研究 在地铁站突然响起的广播中&#xff0c;一句“请立即撤离”是否真的能让人听清、听懂、并迅速行动&#xff1f;在火灾、地震或突发公共事件中&#xff0c;时间以秒计算&#xff0c;而信息传递的有效性直接关系到生…

作者头像 李华
网站建设 2026/4/17 20:02:29

【多智能体】深度多智能体强化学习simulink实现

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;擅长数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。 &#x1f34e; 往期回顾关注个人主页&#xff1a;Matlab科研工作室 &#x1f447; 关注我领取海量matlab电子书和数学建模资料 &#x1…

作者头像 李华
网站建设 2026/4/16 20:40:57

为什么越来越多企业选择GLM-TTS做智能客服语音引擎?

为什么越来越多企业选择GLM-TTS做智能客服语音引擎&#xff1f; 在智能客服系统日益普及的今天&#xff0c;用户对“机器声音”的容忍度正变得越来越低。当一位客户拨打银行热线&#xff0c;听到的不再是冰冷僵硬的合成音&#xff0c;而是一个语气温和、发音准确、甚至带着熟悉…

作者头像 李华
网站建设 2026/4/16 7:22:04

AI公平性合作项目:定义、挑战与公私合作优势

关于某中心-国家科学基金会AI公平性合作的三个问题 一年前&#xff0c;某中心和美国国家科学基金会&#xff08;NSF&#xff09;宣布了一项为期三年、耗资2000万美元的合作计划&#xff0c;旨在资助人工智能公平性方面的学术研究。一个月前&#xff0c;NSF宣布了该计划首批十个…

作者头像 李华
网站建设 2026/4/18 3:40:40

语音合成与安装包捆绑:发布独立运行的离线语音合成工具

语音合成与安装包捆绑&#xff1a;发布独立运行的离线语音合成工具 在内容创作、教育配音和企业语音系统日益依赖自动化生成的今天&#xff0c;一个稳定、私密且无需网络的语音合成工具正变得不可或缺。尽管云端TTS服务提供了便捷接口&#xff0c;但其高昂成本、延迟问题以及数…

作者头像 李华