RabbitMQ消息重复消费场景及解决方案


prtyaa
prtyaa 2023-12-30 22:06:00 64125
分类专栏: 资讯

这里介绍一下RabbitMQ重复消费的场景,以及如何解决消息重复消费的问题。

注:本文只做粗略逻辑实现借鉴,实际业务场景需根据实际情况再做细化处理。

目录

  • 消息重复消费
    • MQ的一条消息被消费者消费了多次
    • 重复消费场景重现测试
  • 如何解决消息重复消费的问题
    • 编码
    • 解决消息重复消费测试

消息重复消费

什么是消息重复消费?

首先我们来看一下消息的传输流程。消息生产者-->MQ-->消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。

所以消息重复也就出现在两个阶段:

1、生产者多发送了消息给MQ;

2、MQ的一条消息被消费者消费了多次。

第一种场景很好控制,只要保证消息生成者不重复发送消息给MQ即可。

我们着重来看一下第二个场景。

MQ的一条消息被消费者消费了多次

在保证MQ消息不重复的情况下,消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常(或者是服务中断),MQ没有接收到确认,此时MQ不会将发送的消息删除,为了保证消息被消费,当消费者网络稳定后,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

重复消费场景重现测试

1、消息发送者发送1万条消息给MQ

@GetMapping("/rabbitmq/sendToClient")
public String sendToClient() {
    String message = "server message sendToClient";
    for (int i = 0; i < 10000; i++) {
        amqpTemplate.convertAndSend("queueName3",message+": "+i);

    }
    return message;
}

启动消息发送服务,调用接口发送消息,mq成功收到1万条消息。

2、消费者监听消费消息

@RabbitListener(queues = "queueName3")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(String message) {
    System.out.println("接收者2--接收到queueName3队列的消息为:"+message);
}

启动消费者服务,然后中断消费服务,此时消费到了第7913个消息:

 

此时查看MQ的消息,现在MQ队列中应该还有2087个消息,但还有2088个消息,说明最后一个消息被消费了没有被MQ服务确认。

 

再次启动消费者服务,消息从第7913个消息开始消费,而不是第7914个消息

如何解决消息重复消费的问题

为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个全局唯一的id,流程如下:

  • 消费者监听到消息后获取id,先去查询这个id是否存中
  • 如果不存在,则正常消费消息,并把消息的id存入 数据库或者redis中(下面的编码示例使用redis)
  • 如果存在则丢弃此消息

编码

消息生产者服务:

/**
 * @Description:  发送消息 模拟消息重复消费
 *      消息重复消费情景:消息生产者已把消息发送到mq,消息消费者在消息消费的过程中突然因为网络原因或者其他原因导致消息消费中断
 *      消费者消费成功后,在给MQ确认的时候出现了网络波动,MQ没有接收到确认,
 *      为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息
 * @param:
 * @return: java.lang.String
 * @Author: chenping
 */
@GetMapping("/rabbitmq/sendMsgNoRepeat")
public String sendMsgNoRepeat() {
    String message = "server message sendMsgNoRepeat";
    for (int i = 0; i <10000 ; i++) {
        Message msg = MessageBuilder.withBody((message+"--"+i).getBytes()).setMessageId(UUID.randomUUID()+"").build();
        amqpTemplate.convertAndSend("queueName4",msg);
    }
    return message;
}

消息消费者服务:

方案1:将id存入string中(单消费者场景):

这样一个队列,redis数据只有一条,每次消息过来都覆盖之前的消息,但是消费者多的情况不适用,可能会存在问题--一个消息被多个消费者消费

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");

    String messageRedisValue = redisUtil.get("queueName4","");
    if (messageRedisValue.equals(messageId)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);

    redisUtil.set("queueName4",messageId);//以队列为key,id为value
}
方案2:将id存入list中(多消费者场景)

这个方案可以解决多消费者的问题,但是随着mq的消息增加,redis数据越来越多,需要去清除redis数据。

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage1(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");

    List<String> messageRedisValue = redisUtil.lrange("queueName4");
    if (messageRedisValue.contains(messageId)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);

    redisUtil.lpush("queueName4",messageId);//存入list
}
方案3:将id以key值增量存入string中并设置过期时间:

以消息id为key,消息内容为value存入string中,设置过期时间(可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)

@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");

    String messageRedisValue = redisUtil.get(messageId,"");
    if (msg.equals(messageRedisValue)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);

    redisUtil.set(messageId,msg,10L);//以id为key,消息内容为value,过期时间10分钟
}

解决消息重复消费测试:

首先,启动消息生成服务,发送一万条消息:

 

启动消息消费服务,然后中断服务,消费了1934条消息:

 

查看未被消费的消息条数为8067条,多了一条(10000-1934=8066 ):

 

再次启动消费者服务,消费者舍弃了已被消费的第1934条消息

网站声明:如果转载,请联系本站管理员。否则一切后果自行承担。

本文链接:https://www.xckfsq.com/news/show.html?id=32782
赞同 0
评论 0 条
prtyaaL1
粉丝 1 发表 2554 + 关注 私信
上周热门
银河麒麟添加网络打印机时,出现“client-error-not-possible”错误提示  1487
银河麒麟打印带有图像的文档时出错  1405
银河麒麟添加打印机时,出现“server-error-internal-error”  1194
统信操作系统各版本介绍  1116
统信桌面专业版【如何查询系统安装时间】  1114
统信桌面专业版【全盘安装UOS系统】介绍  1068
麒麟系统也能完整体验微信啦!  1026
统信【启动盘制作工具】使用介绍  672
统信桌面专业版【一个U盘做多个系统启动盘】的方法  616
信刻全自动档案蓝光光盘检测一体机  526
本周热议
我的信创开放社区兼职赚钱历程 40
今天你签到了吗? 27
信创开放社区邀请他人注册的具体步骤如下 15
如何玩转信创开放社区—从小白进阶到专家 15
方德桌面操作系统 14
我有15积分有什么用? 13
用抖音玩法闯信创开放社区——用平台宣传企业产品服务 13
如何让你先人一步获得悬赏问题信息?(创作者必看) 12
2024中国信创产业发展大会暨中国信息科技创新与应用博览会 9
中央国家机关政府采购中心:应当将CPU、操作系统符合安全可靠测评要求纳入采购需求 8

添加我为好友,拉您入交流群!

请使用微信扫一扫!