博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot 实现 RabbitMQ 延迟消费
阅读量:294 次
发布时间:2019-03-01

本文共 7682 字,大约阅读时间需要 25 分钟。

何为延迟队列?

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

延迟队列能做什么?延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:

  • 延迟消费。比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
  • 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

 

实现思路

在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  • 消息因为设置了TTL而过期。
  • 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅。

流程图

聪明的你肯定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。

针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:

延迟消费

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

延迟重试

延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。

如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。

 

1、设置队列过期时间实现延迟消费

TTL指过期时间,rabbitmq可以通过设置队列的过期时间或者消息的过期时间实现延时消费。

准备工作:安装rabbitmq,添加相关maven依赖 

org.springframework.boot
spring-boot-starter-amqp

配置文件: 

debug: trueserver:  port: 8080spring:  #配置rabbitMq 服务器  rabbitmq:    host: localhost    port: 5672    username: guest    password: guest

代码实现

代码中有四个配置,第一个配置的exchange是用来接收已过期的队列信息并进行重新分配队列进行消费,第二个配置的repeatTradeQueue为exchange重新分配的队列名,第三个是将repeatTradeQueue队列与exchange交换机绑定,并指定对应的routing key,第四个配置的就是我们要设置过期时间的队列deadLetterQueue,配置中有三个参数,x-message-ttl为过期时间,该队列所有消息的过期时间都为我们配置的这个值,单位为毫秒,这里我设置过期时间为3秒,x-dead-letter-exchange是指过期消息重新转发到指定交换机,也就是exchange,x-dead-letter-routing-key是该交换机上绑定的routing-key,将通过配置的routing-key分配对应的队列,也就是前面配置的repeatTradeQueue。

import java.util.HashMap;import java.util.Map; import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean; @SpringBootApplicationpublic class Application {        //交换机用于重新分配队列    @Bean    DirectExchange exchange() {        return new DirectExchange("exchange");    }        //用于延时消费的队列    @Bean    public Queue repeatTradeQueue() {        Queue queue = new Queue("repeatTradeQueue",true,false,false);        return queue;     }        //绑定交换机并指定routing key    @Bean    public Binding  repeatTradeBinding() {        return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue");    }        //配置死信队列    @Bean    public Queue deadLetterQueue() {        Map
args = new HashMap<>(); args.put("x-message-ttl", 3000); args.put("x-dead-letter-exchange", "exchange"); args.put("x-dead-letter-routing-key", "repeatTradeQueue"); return new Queue("deadLetterQueue", true, false, false, args); } public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); }}

配置生产者,这里生产者需要指定前面配置了过期时间的队列deadLetterQueue

import java.time.LocalDateTime; import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; @Componentpublic class DeadLetterSender {     @Autowired    private AmqpTemplate rabbitTemplate;      public void send(String msg) {        System.out.println("DeadLetterSender 发送时间:"+LocalDateTime.now().toString()+" msg内容:"+msg);        rabbitTemplate.convertAndSend("deadLetterQueue", msg);    } }

配置消费者,消费者监听指定用于延时消费的队列repeatTradeQueue

import java.time.LocalDateTime; @Component@RabbitListener(queues = "repeatTradeQueue")public class RepeatTradeReceiver {        @RabbitHandler    public void process(String msg) {        System.out.println("repeatTradeQueue 接收时间:"+LocalDateTime.now().toString()+" 接收内容:"+msg);    } }

写一个简单的接口调用测试延时消费是否成功

import org.springframework.beans.factory.annotation.Autowired; @RestController@RequestMapping("/rabbit")public class RabbitTest {     @Autowired    private DeadLetterSender deadLetterSender;      @GetMapping("/deadTest")    public void deadTest() {        deadLetterSender.send("队列设置过期时间测试");    }    }

启动项目开始测试

 

2、设置消息过期时间实现延时消费 

还是先贴上配置的代码,基本配置都一样,唯一的区别是deadLetterQueue的过期时间这里不做配置,需要注意的是,因为我这里用的是同一个队列名,所以即使将队列过期时间配置删除,mq中该队列过期时间仍然还是存在的,所以需要删除该队列,启动项目时才能重新配置该队列属性,可能可以通过配置的方式重新覆盖属性配置,小白没研究出来(ಥ_ಥ),当然也可以保留队列过期时间的配置,当两个过期时间都存在时,消息取更小的过期时间。

import java.util.HashMap; @SpringBootApplicationpublic class Application {        //用于死信队列转发的交换机    @Bean    DirectExchange exchange() {        return new DirectExchange("exchange");    }        //用于延时消费的队列    @Bean    public Queue repeatTradeQueue() {        Queue queue = new Queue("repeatTradeQueue",true,false,false);        return queue;     }        //绑定交换机并指定routing key    @Bean    public Binding  repeatTradeBinding() {        return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue");    }        //配置死信队列    @Bean    public Queue deadLetterQueue() {        Map
args = new HashMap<>(); args.put("x-dead-letter-exchange", "exchange"); args.put("x-dead-letter-routing-key", "repeatTradeQueue"); return new Queue("deadLetterQueue", true, false, false, args); } public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); }}

配置生产者,message的expiration就是过期时间的设置,单位也是毫秒

import java.time.LocalDateTime; import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; @Componentpublic class DeadLetterSender {     @Autowired    private AmqpTemplate rabbitTemplate;     public void send(String msg, long times) {        System.out.println("DeadLetterSender 发送时间:" + LocalDateTime.now().toString() + " msg内容:" + msg);        MessagePostProcessor processor = new MessagePostProcessor() {            @Override            public Message postProcessMessage(Message message) throws AmqpException {                message.getMessageProperties().setExpiration(times + "");                return message;            }        };        rabbitTemplate.convertAndSend("deadLetterQueue", (Object)msg, processor);    }}

消费者不变,用之前的类即可

稍微修改一下接口,设置时间为5秒

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController; import me.miaobo.mq.sender.DeadLetterSender; @RestController@RequestMapping("/rabbit")public class RabbitTest {     @Autowired    private DeadLetterSender deadLetterSender;        @GetMapping("/deadTest")    public void deadTest() {        deadLetterSender.send("消息设置过期时间测试",5000);    }}

补充一下队列的删除,在控制台选择queues菜单,找到我们配置的队列,点击名称进详情,操作介绍有点傻,不清楚mq的可以看看之前的链接贴。

进入详情界面可以看到之前的配置,过期时间3秒,自己通过项目重启发现过期时间并不会删除,只好在管理界面手动删除。

下拉详情页面,找到删除按钮,删除该队列

启动服务,测试接口

时间相隔5秒,和接口设置的时间保持一致。

转载地址:http://oert.baihongyu.com/

你可能感兴趣的文章