KafKa手动提交问题描述
2023-12-19 22:23:08
KafKa手动提交offset问题描述
在分布式架构,我在XXJOB来进行调用接口产生数据,将数据提交KafKa进行存储;然后将KafKa中数据拿出来消费,在处理KafKa出来的数据时,调用另外一个服务时,正好另外一个服务挂了,导致手动提交offset失败,造成数据的重复消费;
@KafkaListener(topics = "xxx",groupId = "consumer01")
public void listen(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment ack) {
/**
代码块(其中有调用别的服务接口)
**/
// 手动提交offset
ack.acknowledge();
}
问题解决
解决方案就是将可能有问题的代码使用try…catch进行捕获异常,然后在finally进行手动提交offset
@KafkaListener(topics = "xxx",groupId = "consumer01")
public void listen(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment ack) {
try{
/**
代码块(其中有调用别的服务接口)
**/
}catch(Exception e){
}finally{
// 手动提交offset
ack.acknowledge();
}
}
解决方案仅供参考,不喜勿喷。
文章来源:https://blog.csdn.net/qq_37881565/article/details/135087932
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!