SpringBoot集成RabbitMQ

2022年7月2日
大约 11 分钟

SpringBoot集成RabbitMQ

1. RabbitMQ的介绍

介绍RabbitMQ之前先说一下AMQP协议:

AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
AMQP 其实和Http一样 都是一种协议, 只不过 Http是针对网络传输的, 而AMQP是基于消息队列的。

AMQP 协议中的基本概念:

  1. Broker: 接收和分发消息的应用,我们在介绍消息中间件的时候所说的消息系统就是Message Broker。
  2. Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
  3. Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
  4. Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
  5. Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  6. Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
  7. Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

Exchange的类型:

  1. direct: 这种类型的交换机的路由规则是根据一个routingKey的标识,交换机通过一个routingKey与队列绑定 ,在生产者生产消息的时候 指定一个routingKey当绑定的队列的routingKey与生产者发送的一样 那么交换机会吧这个消息发送给对应的队列。
  2. fanout: 这种类型的交换机路由规则很简单,只要与他绑定了的队列,他就会把消息发送给对应队列(与routingKey没关系)。
  3. topic: 这种类型的交换机路由规则也是和routingKey有关,只不过topic他可以根据*,#*代表过滤一个单词,#代表过滤后面所有单词,用.隔开)来识别routingKey我打个比方假设我绑定的routingKey有队列A和B,A的routingKey是:*.user,而B的routingKey是#.user,那么我生产一条消息routingKey 为:error.user那么此时2个队列都能接受到, 如果改为topic.error.user那么这时候只有B能接受到了。

RabbitMQ:是一个开源的 基于AMQP协议实现的一个完整的企业级消息中间件,服务端语言由Erlang(面向并发编程)语言编写 对于高并发的处理有着天然的优势,客户端支持非常多的语言。

RabbitMQ 如同redis一样 他也是采用c/s架构由服务端与客户端组成。

注意:在安装RabbitMQ的时候,首先要下载Erlang安装包,而且版本要对应。

2. RabbitMQ与SpringBoot的整合

  1. 首先引入SpringBoot和RabbitMQ的依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

图

  1. 写一个配置类,把我们的RabbitMQ的配置信息配置好
public class RabbitmqConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost",5672);
        //我这里直接在构造方法传入了
        //        connectionFactory.setHost();
        //        connectionFactory.setPort();
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("testhost");
        //是否开启消息确认机制
        //connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        //注意  这个ConnectionFactory 是使用javaconfig方式配置连接的时候才需要传入的  如果是yml配置的连接的话是不需要的
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        return template;
    }
}

配置完连接之后 我们就可以开始发送消息和接收消息了,当然我们也可以在yml文件中配置这些信息:

图

  1. 编写我们发送消息的工具类,并把它交给spring管理,在类里面注入RabbitTemplate来进行消息发送。
@Component
public class RabbitmqMessageSend {
@Autowired
RabbitTemplate rabbitTemplate;

    public void testSend() {
        //参数介绍: 交换机名字,路由建, 消息内容
        rabbitTemplate.convertAndSend("directExchange", "direct.key", "hello");
    }
}

图

  1. 然后我们模拟发送一次消息(通过浏览器调用一个controller,在里面使用发送消息工具)。
@RestController
public class OrderController {
@Autowired
RabbitmqMessageSend rabbitmqMessageSend;

    @RequestMapping("/order.do")
    public Object order(){
        rabbitmqMessageSend.testSend();
        return null;
    }
}

图

启动项目,通过浏览器访问后台调用发送消息的方法。

图

访问后,我们去rabbitmq的控制台查看是否有消息生成

图

到这里,我们发送消息的功能就完成了。

  1. 然后我们在编写一个消费消息的功能:

引入依赖,创建RabbitMQ的配置类,这些都和发送消息功能一致

图

图

然后我们编写消费消息的工具类,主要使用@RabbitListener这个注解来进行消息消费:

@Component
public class ConsumerUtil {
    @RabbitListener(queues = "testQueue")
    public void get(String message) throws Exception{
        System.out.println(message);
    }
}

图

然后我们启动项目,发现打印台已经消费了消息

图

这时候我们再去RabbitMQ的控制台,发现消息队列已经为0了

图

到这里,我们消费消息的功能也完成了。

至此,最简单的RabbitMQ和SpringBoot的整合就完成了。

还有一种自定义Bean的方式也可以作为消费消息来使用,就不需要使用注解了,在RabbitConfig消费类中使用SimpleMessageListenerContainer

//通过Bean的方式来定义消息消费方法
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer (){
    SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer();
    messageListenerContainer.setConnectionFactory(connectionFactory());
    messageListenerContainer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        //消费消息
    }
    });
    //        messageListenerContainer.addQueueNames("");
    //        messageListenerContainer.addQueues();
    //设置手动确认消息消费,AcknowledgeMode有三个枚举方式:NONE,不设置/MANUAL,手动设置/AUTO,自动设置
    messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return messageListenerContainer;
}

图

3. RabbitMQ的高级特性

1)如果在正常情况下,我们的消息发送和消费是没有问题的,但是如果出现意外,我们该怎么处理呢?

首先,我们有可能并不知道我们的消息到底有没有发送到rabbitmq当中,假设我们是一个电商项目的话 用户下了订单 订单发送消息给库存 结果这个消息没发送到rabbitmq当中 但是订单还是下了,这时候 因为没有消息 库存不会去减少库存, 这种问题是非常严重的, 所以 接下来就讲一种解决方案:

在RabbitMQ中提供了一种叫做:发送方确认模式的方案,这种方式不仅对性能的影响非常小 而且也能确定消息是否发送成功。

发送模式开始方式:

在连接工厂方法中开启发送方确认机制;

connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

图

然后在RabbitTemplate中设置一个回调类的方法

template.setConfirmCallback(new MyConfirmCallback());

图

自己定义一个回调类,实现RabbitTemplate.ConfirmCallback接口

图

在这个confirm方法中有一个CorrelationData参数,这个参数可以为我们在发送消息时进行业务标识的传递

图

失败回调,就是消息发送失败的时候会调用我们事先准备好的回调函数,并且把失败的消息 和失败原因等 返回过来。这里主要是针对交换机把消息转换到队列里失败了。

首先和发送确认一样,在连接工厂方法中开启发送方确认机制;

connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

然后在RabbitTemplate里进行相关设置:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    //注意  这个ConnectionFactory 是使用javaconfig方式配置连接的时候才需要传入的  如果是yml配置的连接的话是不需要的
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    // 2.开启发送方确认回调方法
    template.setConfirmCallback(new MyConfirmCallback());
    // 3.开启mandatory模式(开启失败回调)
    template.setMandatory(true);
    // 4.指定失败回调接口的实现类
    template.setReturnCallback(new MyReturnCallback());
    return template;
}

图

自定义失败回调类,实现RabbitTemplate.ReturnCallback接口:

/**
* 发送方失败回调类
  */
public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println(message);
        System.out.println(replyCode);
        System.out.println(replyText);
        System.out.println(exchange);
        System.out.println(routingKey);
    }
}

在returnedMessage方法中会给我们返回一些失败信息。

图

我们通常都是发送方确认模式和失败回调一起使用,这样, 就能确保消息100%投递了。

简单的总结一下就是 confirm机制是确认我们的消息是否投递到了 RabbitMq(Broker)上面,而mandatory是在我们的消息进入队列失败时候不会被遗弃(让我们自己进行处理)

  1. RabbitTemplate如何对消息做处理,当我们传入一个对象,或者Map作为消息的传输介质,如何对消息进行处理?

我们可以调用setMessageConverter方法,传入实现MessageConverter接口的类,来实现里面的toMessage方法或者fromMessage方法即可。

rabbitTemplate.setMessageConverter(new MessageConverter() {
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        messageProperties.setContentType("text/xml");
        messageProperties.setContentEncoding("UTF-8");
        Message message = new Message(JSON.toJSONBytes(o),messageProperties);
        System.out.println("调用了消息解析器");
        return message;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return null;
    }
});
  1. 消费者确认消费了消息。

为什么要确认消费? 默认情况下 消费者在拿到rabbitmq的消息时 已经自动确认这条消息已经消费了,也就是rabbitmq的队列里就会删除这条消息了, 但是 我们实际开发中 难免会遇到这种情况, 比如说 拿到这条消息 发现我处理不了 比如说 参数不对, 又比如说 我当前这个系统出问题了, 暂时不能处理这个消息, 但是 这个消息已经被你消费掉了 rabbitmq的队列里也删除掉了, 你自己这边又处理不了, 那么 ,这个消息就被遗弃了。 这种情况在实际开发中是不合理的, rabbitmq提供了解决这个问题的方案,也就是消费方需要手动确认消费消息。

首先在消费配置类中得声明一个容器并且在容器里面指定消息确认为手动确认:

@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
    SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
    simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory());
    //设置手动确认消息消费
    simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return simpleRabbitListenerContainerFactory;
}

图

然后我们在注解中加入这个监听器,containerFactory = "simpleRabbitListenerContainerFactory"(名字为@Bean的方法名,spring中通过@Bean注入,默认使用方法名作为id)

图

这时候我们启动消费程序,发现,消息状态从Ready转成了Unacked,并没有删除

图

既然现在是手动确认了 那么我们在处理完这条消息之后 得使这条消息确认:

@Component
public class ConsumerUtil {
    @RabbitListener(queues = "testQueue",containerFactory = "simpleRabbitListenerContainerFactory")
    public void get(Message message, Channel channel) throws Exception{
        System.out.println(message);
        if (true){//处理自己的业务逻辑,等逻辑成功后再确认消费消息
            //第一个参数是:消息的一个唯一标识,第二个参数是:是否批量处理
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }else if (false){//如果处理业务逻辑失败了,需要把消息返回给队列
            //批量返回
            //第一个参数是:消息的一个唯一标识;第二个参数是:是否批量处理;第三个参数是:是否把消息退回消息队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            //单个返回
            //  channel.basicReject();
        }
        System.out.println("消息消费成功了");
    }
}

重新消费后,我们去看RabbitMQ控制台,发现消息没有了

图

RabbitMQ默认是使用轮询的方式来进行消息消费的。

引用资料

  • https://www.cnblogs.com/wk-missQ1/p/13522939.html