本文共 7682 字,大约阅读时间需要 25 分钟。
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
延迟队列能做什么?延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。
在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。
RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅。
刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:
如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅。
聪明的你肯定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。
针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:
延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。
延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。
如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。
TTL指过期时间,rabbitmq可以通过设置队列的过期时间或者消息的过期时间实现延时消费。
准备工作:安装rabbitmq,添加相关maven依赖
org.springframework.boot spring-boot-starter-amqp 配置文件:
debug: trueserver: port: 8080spring: #配置rabbitMq 服务器 rabbitmq: host: localhost port: 5672 username: guest password: guest
代码实现
代码中有四个配置,第一个配置的exchange是用来接收已过期的队列信息并进行重新分配队列进行消费,第二个配置的repeatTradeQueue为exchange重新分配的队列名,第三个是将repeatTradeQueue队列与exchange交换机绑定,并指定对应的routing key,第四个配置的就是我们要设置过期时间的队列deadLetterQueue,配置中有三个参数,x-message-ttl为过期时间,该队列所有消息的过期时间都为我们配置的这个值,单位为毫秒,这里我设置过期时间为3秒,x-dead-letter-exchange是指过期消息重新转发到指定交换机,也就是exchange,x-dead-letter-routing-key是该交换机上绑定的routing-key,将通过配置的routing-key分配对应的队列,也就是前面配置的repeatTradeQueue。
import java.util.HashMap;import java.util.Map; import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean; @SpringBootApplicationpublic class Application { //交换机用于重新分配队列 @Bean DirectExchange exchange() { return new DirectExchange("exchange"); } //用于延时消费的队列 @Bean public Queue repeatTradeQueue() { Queue queue = new Queue("repeatTradeQueue",true,false,false); return queue; } //绑定交换机并指定routing key @Bean public Binding repeatTradeBinding() { return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue"); } //配置死信队列 @Bean public Queue deadLetterQueue() { Mapargs = new HashMap<>(); args.put("x-message-ttl", 3000); args.put("x-dead-letter-exchange", "exchange"); args.put("x-dead-letter-routing-key", "repeatTradeQueue"); return new Queue("deadLetterQueue", true, false, false, args); } public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); }}
配置生产者,这里生产者需要指定前面配置了过期时间的队列deadLetterQueue
import java.time.LocalDateTime; import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; @Componentpublic class DeadLetterSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg) { System.out.println("DeadLetterSender 发送时间:"+LocalDateTime.now().toString()+" msg内容:"+msg); rabbitTemplate.convertAndSend("deadLetterQueue", msg); } }
配置消费者,消费者监听指定用于延时消费的队列repeatTradeQueue
import java.time.LocalDateTime; @Component@RabbitListener(queues = "repeatTradeQueue")public class RepeatTradeReceiver { @RabbitHandler public void process(String msg) { System.out.println("repeatTradeQueue 接收时间:"+LocalDateTime.now().toString()+" 接收内容:"+msg); } }
写一个简单的接口调用测试延时消费是否成功
import org.springframework.beans.factory.annotation.Autowired; @RestController@RequestMapping("/rabbit")public class RabbitTest { @Autowired private DeadLetterSender deadLetterSender; @GetMapping("/deadTest") public void deadTest() { deadLetterSender.send("队列设置过期时间测试"); } }
启动项目开始测试
还是先贴上配置的代码,基本配置都一样,唯一的区别是deadLetterQueue的过期时间这里不做配置,需要注意的是,因为我这里用的是同一个队列名,所以即使将队列过期时间配置删除,mq中该队列过期时间仍然还是存在的,所以需要删除该队列,启动项目时才能重新配置该队列属性,可能可以通过配置的方式重新覆盖属性配置,小白没研究出来(ಥ_ಥ),当然也可以保留队列过期时间的配置,当两个过期时间都存在时,消息取更小的过期时间。
import java.util.HashMap; @SpringBootApplicationpublic class Application { //用于死信队列转发的交换机 @Bean DirectExchange exchange() { return new DirectExchange("exchange"); } //用于延时消费的队列 @Bean public Queue repeatTradeQueue() { Queue queue = new Queue("repeatTradeQueue",true,false,false); return queue; } //绑定交换机并指定routing key @Bean public Binding repeatTradeBinding() { return BindingBuilder.bind(repeatTradeQueue()).to(exchange()).with("repeatTradeQueue"); } //配置死信队列 @Bean public Queue deadLetterQueue() { Mapargs = new HashMap<>(); args.put("x-dead-letter-exchange", "exchange"); args.put("x-dead-letter-routing-key", "repeatTradeQueue"); return new Queue("deadLetterQueue", true, false, false, args); } public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); }}
配置生产者,message的expiration就是过期时间的设置,单位也是毫秒
import java.time.LocalDateTime; import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; @Componentpublic class DeadLetterSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg, long times) { System.out.println("DeadLetterSender 发送时间:" + LocalDateTime.now().toString() + " msg内容:" + msg); MessagePostProcessor processor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(times + ""); return message; } }; rabbitTemplate.convertAndSend("deadLetterQueue", (Object)msg, processor); }}
消费者不变,用之前的类即可
稍微修改一下接口,设置时间为5秒
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController; import me.miaobo.mq.sender.DeadLetterSender; @RestController@RequestMapping("/rabbit")public class RabbitTest { @Autowired private DeadLetterSender deadLetterSender; @GetMapping("/deadTest") public void deadTest() { deadLetterSender.send("消息设置过期时间测试",5000); }}
补充一下队列的删除,在控制台选择queues菜单,找到我们配置的队列,点击名称进详情,操作介绍有点傻,不清楚mq的可以看看之前的链接贴。
进入详情界面可以看到之前的配置,过期时间3秒,自己通过项目重启发现过期时间并不会删除,只好在管理界面手动删除。
下拉详情页面,找到删除按钮,删除该队列
启动服务,测试接口
时间相隔5秒,和接口设置的时间保持一致。
转载地址:http://oert.baihongyu.com/