RabbitMQ学习—day6—springboot整合

news/2025/2/24 10:54:22

目录

1. springboot配置

2. 开始写RabbitMq代码

 3. 队列优化

4. 插件实现延迟队列

5. 总结


前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

1. springboot配置

  1. 创建一个 Maven 工程或者 Spring Boot工程

  2. 添加依赖坐标,这里的 Spring Boot 是3.4.3 版本

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.51</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <scope>provided</scope>
        </dependency>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Knife4j API文档生产工具 -->
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId>
            <version>4.4.0</version>
        </dependency>
        <!-- swagger注解支持:Knife4j依赖本依赖 -->
        <dependency>
            <groupId>io.swagger</groupId>
            <artifactId>swagger-annotations</artifactId>
            <version>1.5.22</version>
        </dependency>

    </dependencies>

        3.创建 application.yml 文件

server:
  port: 8888
spring:
  rabbitmq:
    host: 你的服务器ip
    port: 5672
    username: admin
    password: 123456

这里是 8808 端口,可根据需求决定端口

        4. 配置swgger

@Configuration
public class Knife4jConfig {

    @Bean
    public OpenAPI springShopOpenAPI() {
        return new OpenAPI()
                // 接口文档标题
                .info(
                        new Info().title("接口文档")
                                // 接口文档简介
                                .description("RabbitMq测试文档")
                                // 接口文档版本
                                .version("v1.0")
                                // 开发者联系方式
                                .contact(new Contact().name("luckily").email("3298244978@qq.com"))
                );
    }

}

        5. 新建主启动类

@Log4j2
@SpringBootApplication
public class RabbitMqDemoApplication {

    public static void main(String[] args) {
        SpringApplication app = new SpringApplication(RabbitMqDemoApplication.class);
        Environment env = app.run(args).getEnvironment();
        app.setBannerMode(Banner.Mode.CONSOLE);
        logApplicationStartup(env);

    }
    private static void logApplicationStartup(Environment env) {
        String protocol = "http";
        if (env.getProperty("server.ssl.key-store") != null) {
            protocol = "https";
        }
        String serverPort = env.getProperty("server.port");
        String contextPath = env.getProperty("server.servlet.context-path");
        if (StringUtils.isBlank(contextPath)) {
            contextPath = "/doc.html";
        } else {
            contextPath = contextPath + "/doc.html";
        }
        String hostAddress = "localhost";
        try {
            hostAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            log.warn("The host name could not be determined, using `localhost` as fallback");
        }
        log.info("""
                        ----------------------------------------------------------
                        \t应用程序“{}”正在运行中......
                        \t接口文档访问 URL:
                        \t本地: \t\t{}://localhost:{}{}
                        \t外部: \t{}://{}:{}{}
                        \t配置文件: \t{}
                        ----------------------------------------------------------""",
                env.getProperty("spring.application.name"),
                protocol,
                serverPort,
                contextPath,
                protocol,
                hostAddress,
                serverPort,
                contextPath,
                env.getActiveProfiles());
    }


}

2. 开始写RabbitMq代码

代码架构

创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wugMpbMe-1630999921193)(D:\学习资料\图片\image-20210902150742461.png)]

代码实现

配置类代码

/*
* TTL队列 配置文件类代码
*
* */
@Configuration
public class TtlQueueConfig {


    //普通交换机的名称
    public static final String  X_EXCHANGE = "X";
    //死信交换机的名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列的名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列的名称
    public static final String DEAD_LATTER_QUEUE = "QD";

    //声明xExchange
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明yExchange
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明队列
    @Bean("queueA")
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信Routing-key
        arguments.put("x-dead-letter-routing-key","YD");
        //设置TTL 单位是ms
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    //声明普通队列 TTL为40s
    @Bean("queueB")
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信Routing-key
        arguments.put("x-dead-letter-routing-key","YD");
        //设置TTL 单位是ms
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    //死信队列
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();
    }

    //绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //绑定
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //绑定
    @Bean
    public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者代码

 /*
 * 发送延迟消息
 * */
 @Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //开始发消息
    @GetMapping("/sendMsg/{message}")
    @Operation(summary = "Rabbitmq发送消息")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);

        rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);
        rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);

    }
}

消费者

/*
 * 队列TTL 消费者
 * */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }
}

效果:

 3. 队列优化

问题:

第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求

代码架构图

增加一个队列QC实现动态延时

实现

配置文件类

 /*
 * TTL队列 配置文件类代码
 *
 * */
 @Configuration
 public class TtlQueueConfig {
 
 
     //普通交换机的名称
     public static final String  X_EXCHANGE = "X";
     //死信交换机的名称
     public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
     //普通队列的名称
     public static final String QUEUE_A = "QA";
     public static final String QUEUE_B = "QB";
     public static final String QUEUE_C = "QC";
     //死信队列的名称
     public static final String DEAD_LATTER_QUEUE = "QD";
 
     //声明QC队列
     @Bean("queueC")
     public Queue queueC(){
         Map<String, Object> arguments = new HashMap<>();
         //设置死信交换机
         arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
         //设置死信RoutingKey
         arguments.put("x-dead-letter-routing-key","YD");
         return QueueBuilder.durable().withArguments(arguments).build();
     }
 
     @Bean
     public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){
         return BindingBuilder.bind(queueC).to(xExchange).with("XC");
     }
 
     //声明xExchange
     @Bean("xExchange")
     public DirectExchange xExchange(){
         return new DirectExchange(X_EXCHANGE);
     }
 
     //声明yExchange
     @Bean("yExchange")
     public DirectExchange yExchange(){
         return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
     }
 
     //声明队列
     @Bean("queueA")
     public Queue queueA(){
         Map<String, Object> arguments = new HashMap<>(3);
         //设置死信交换机
         arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
         //设置死信Routing-key
         arguments.put("x-dead-letter-routing-key","YD");
         //设置TTL 单位是ms
         arguments.put("x-message-ttl",10000);
         return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
     }
 
     //声明普通队列 TTL为40s
     @Bean("queueB")
     public Queue queueB(){
         Map<String, Object> arguments = new HashMap<>(3);
         //设置死信交换机
         arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
         //设置死信Routing-key
         arguments.put("x-dead-letter-routing-key","YD");
         //设置TTL 单位是ms
         arguments.put("x-message-ttl",40000);
         return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
     }
 
     //死信队列
     @Bean("queueD")
     public Queue queueD(){
         return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();
     }
 
     //绑定
     @Bean
     public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                   @Qualifier("xExchange") DirectExchange xExchange){
         return BindingBuilder.bind(queueA).to(xExchange).with("XA");
     }
 
     //绑定
     @Bean
     public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                   @Qualifier("xExchange") DirectExchange xExchange){
         return BindingBuilder.bind(queueB).to(xExchange).with("XB");
     }
 
     //绑定
     @Bean
     public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,
                                   @Qualifier("yExchange") DirectExchange yExchange){
         return BindingBuilder.bind(queueD).to(yExchange).with("YD");
     }
 }

生产者

/*
 * 发送延迟消息
 * */
@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //开始发消息
    @GetMapping("/sendMsg/{message}")
    @Operation(summary = "Rabbitmq发送消息")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);

        rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);
        rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);

    }
    //开始发消息
    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    @Operation(summary = "Rabbitmq发送消息与延迟时间")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",
                new Date().toString(),ttlTime,message);
        rabbitTemplate.convertAndSend("X","XC",message,msg->{
            //发送消息的时候 延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
}

消费者

消费者代码不改变

效果:本来消息2是延迟2秒,消息1延迟20秒,消息2似乎要比消息1更早接收,但因为RabbitMQ智慧检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,这是队列特性

怎么弥补这个缺陷,需要用到Rabbitmq的插件实现延迟队列

4. 插件实现延迟队列

我们之前延迟消息是在队列进行延迟,安装插件之后是在交换机进行延迟

  • 下载延迟插件https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
  • 将延迟插件放到RabbitMQ的插件目录下:由于我通过docker容器安装的rabbitmq,所以我将安装包先通过xftp发送给主机,在通过docker命令给容器,进入容器安装

复制给容器

进入容器

启动插件

重启容器

docker restart 7af

网页端出现以下就表示成功安装插件

实战

配置文件类

 @Configuration
 public class DelayedQueueConfig {

 //队列
 public static final String DELAYED_QUEUE_NAME = "delayed.queue";
 //交换机
 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
 //routingKey
 public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

 //声明队列
 @Bean
 public Queue delayedQueue(){
   return new Queue(DELAYED_QUEUE_NAME);
 };



 //声明交换机
 @Bean
 public CustomExchange delayedExchange(){

     Map<String, Object> arguments = new HashMap<>();
     arguments.put("x-delayed-type","direct");

     return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
             true,false,arguments);
 }
 //绑定
 @Bean
 public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                   @Qualifier("delayedExchange") CustomExchange delayedExchange){
     return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
 }
}

生产者

 /*
 * 发送延迟消息
 * */
 @Slf4j
 @RestController
 @RequestMapping("/ttl")
 public class SendMsgController {
 
     @Autowired
     private RabbitTemplate rabbitTemplate;
 
     //开始发消息 基于插件的 消息 及 延迟的时间
     @GetMapping("/sendDelayMsg/{message}/{delayTime}")
     public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
         log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",
                 new Date().toString(),delayTime,message);
         rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME
                 ,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> {
             // 发送消息的时候 延迟时长 单位ms
             msg.getMessageProperties().setDelay(delayTime);
             return msg;
                 });
     }
 }

消费者

 // 消费者代码 基于插件的延迟消息
 @Slf4j
 @Component
 public class DelayQueueConsumer {

 //监听消息
 @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
 public void recieveDelayQueue(Message message){
     String msg = new String(message.getBody());
     log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
 }
}

效果

成功!

5. 总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用
RabbitMQ.的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis.的zsset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景


http://www.niftyadmin.cn/n/5864238.html

相关文章

信息学奥赛一本通1005题解

数学好的朋友们应该都知道&#xff0c;这是一道数学题&#xff0c;绝对不是什么算法题&#xff0c;我们看原题&#xff1a; 假设地球上的新生资源按恒定速度增长。照此测算&#xff0c;地球上现有资源加上新生资源可供&#x1d465;亿人生活&#x1d44e;年&#xff0c;或供&a…

更改visual studio 2022 默认NuGet包路径

本文章仅提供更改用户级别的NuGet包默认路径的更改&#xff0c;电脑级别的更改需要更改%ProgramData%\NuGet\Config\machine.config&#xff0c; 而且需要管理员权限&#xff0c;但是更改内容类似用户级别的NuGet更改。 1. 关闭VS 2. 打开NuGet配置文件路径 可通过以下两种方…

Linux 命令大全完整版(05)

2. Linux 系统设置命令 export 功能说明&#xff1a;设置或显示环境变量。语  法&#xff1a;export [-fnp][变量名称][变量设置值]补充说明&#xff1a;在 shell 中执行程序时&#xff0c;shell 会提供一组环境变量。export 可新增、修改或删除环境变量&#xff0c;供后续…

从Excel到Hadoop:数据规模的进化之路

从Excel到Hadoop&#xff1a;数据规模的进化之路 在数字时代&#xff0c;数据就像空气&#xff0c;充斥在我们生活的每个角落。今天我们谈"大数据"&#xff0c;但回头看看&#xff0c;数据的演变经历了从"小数据"到"大数据"的量变到质变的过程。…

解锁策略模式:Java 实战与应用全景解析

系列文章目录 后续补充~~~ 文章目录 一、策略模式&#xff1a;概念与原理1.1 定义与概念1.2 结构组成1.3 与其他模式的区别 二、策略模式的优势与适用场景2.1 优势剖析2.2 适用场景 三、Java 代码示例解析3.1 场景设定3.2 代码实现3.3 代码解析 四、策略模式在实际项目中的应用…

蓝桥每日打卡

#蓝桥#JAVA#奇怪的捐赠 题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 地产大亨 Q 先生临终的遗愿是&#xff1a;拿出100万元给 X 社区的居民抽奖&#xff0c;以稍慰藉心中愧疚。 麻烦的是&#xff0c;他有个很…

小迪安全23-php后台模块

cookie技术 cookie就是身份验证表示&#xff0c;通过cookie好区分每个用户的个人数据和权限&#xff0c;第一次登陆之后正常的网站都会赋予一个cookie 写写一个后台界面&#xff0c;直接让ai去写就可以 然后自己需要的提交方式&#xff0c;和表单值自己修改即可 生成cookie的…

sklearn机器学习 Python代码通用模板

以下是一个使用 scikit-learn&#xff08;sklearn&#xff09;进行机器学习的通用 Python 代码模板。这个模板涵盖了数据加载、预处理、模型训练、评估和预测的基本流程&#xff0c;适用于常见的机器学习任务。 python # 导入必要的库 import numpy as np import pandas as …