RabbitMQ架构及特性

2021-08-18 19:44:08 741

rabbitmq:3.9.2
spring-boot-starter-amqp:2.3.0.RELEASE

架构

  1. Producer 生产者消息成功发送到交换机, 会触发回调事件ConfirmCallback(需要配置)消息不能被交换机转发到队列中时, 会触发回调事件ReturnCallback(需要配置)
  2. Exchange 交换器Fanout把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中. 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的Direct 把消息路由到BingingKey 和RoutingKey 完全匹配的队列中会把消息路由到BindingKey 和RoutingKey 完全匹配的队列中. 如果一个队列绑定到该交换机上要求路由键"dog",则只有被标记为"dog"的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog, 多个队列可以使用一个键TopicTopic 类型的交换器在 direct 匹配规则上进行了扩展. 不是进行完全匹配. 而是进行模糊匹配. 匹配规则如下: BindingKey和RoutingKey一样都是由"."分隔的字符串; BindingKey中可以存在两种特殊字符"*" 和 "#", 用于模糊匹配, 其中"*"用于匹配一个单词, "#"用于匹配多个单词(可以是0个), 因此"audit.#"能够匹配到"audit.irs.corporate",但是"audit.*" 只会匹配到"audit.irs"HeadersHanders 类型的交换器不是根据路由匹配规则来的,而是根据消息中的 headers 属性进行匹配的。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果匹配,消息就会路由到该队列。headers类型的交换器性能很差,不实用
  3. Bindings 绑定(匹配)器 把exchange和queue按照路由规则绑定(匹配)起来BindingKey 队列绑定的key, 初始化时绑定RoutingKey 消息携带的
  4. Queues 队列
  5. Consumer 消费者
其他概念
  • Broker 提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输
  • Vhost 虚拟主机,一个broker里可以有多个vhost, 用作不同用户的权限分离
  • Channel 消息通道, 可以理解为建立在生产者/消费者和RabbitMQ服务器之间的TCP连接上的虚拟连接,一个TCP连接上可以建立多个Channel

特性

消息确认
  1. 自动确认RabbitMQ成功将消息发出 (即消息成功写入TCP Socket) 中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递
  2. 手动确认basic.ack 肯定确认basic.nack 用于否定确认 (注意: 这是AMQP 0-9-1的RabbitMQ扩展)basic.reject用于否定确认, 但与basic.nack相比有一个限制: 一次只能拒绝单条消息
  • 使用手动确认需要对客户端进行配置.
  • 第2, 3种, 都可以通过设置参数, 将消息重新放回到队列中
TTL
  1. 时间范围 0 <= n <= 2^32-1 ms, 约 49 天
死信队列
  1. 可以和TTL配合实现延时队列 将消息设置ttl, 发送到死信队列中 (不设置消费者处理), 等待过期被转发到延时队列
  2. 但该延时队列有缺陷, 若发送两条延时消息, 第一条延时10s, 第二条延时5秒, 若第一条先入队列, 则只有当第一条消息过期发送到死信队列后, 第二条消息才能被处理, 即过期是阻塞的. 但可以通过安装rabbitmq_delayed_message_exchange插件解决, https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

注意该插件存在几点问题:

  1. 仅适用于较少量延时消息(大概10w以下)的场景. 延迟插件作者并没有实现比较完善的延迟消息存储和扫描, 因此不建议实现大量消息的存储. 延时不可靠, 存在消息数量较大或使用很久后延迟不准确(会推迟)
  2. 插件其内部基于改良版定时器实现,所以会占用大量的CPU用于运算, 且会比普通消息占用更多内存
  3. 时间范围 0 <= n <= 2^32-1 ms, 约 49 天, 大于该时间的需要配合其他技术方案. 且由于其内部是没有存储消息的进入时间, 只有一个TTL, 其在重启的时候很容易造成延迟时间的重置终端启用插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange 代码中配置/** * * @author https://www.skypyb.com/ */ @Configuration public class RabbitBindConfig { public final static String SKYPYB_DELAY_EXCHANGE = "skypyb-delay-exchange"; public final static String SKYPYB_DELAY_QUEUE = "skypyb-delay-queue"; public final static String SKYPYB_DELAY_KEY = "skypyb.key.delay"; @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); //自定义交换机 return new CustomExchange(SKYPYB_DELAY_EXCHANGE, "x-delayed-message", false, true, args); } @Bean public Queue delayQueue() { return new Queue(SKYPYB_DELAY_QUEUE, false, false, true); } @Bean public Binding bindingDelayExchangeAndQueue() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(SKYPYB_DELAY_KEY).noargs(); } }
消息重试
  1. 生产者重试# 开启重试机制 spring.rabbitmq.template.retry.enabled=true # 重试起始间隔时间 spring.rabbitmq.template.retry.initial-interval=1000ms # 最大重试次数 spring.rabbitmq.template.retry.max-attempts=10 # 最大重试间隔时间 spring.rabbitmq.template.retry.max-interval=10000ms # 间隔时间乘数 (这里配置间隔时间乘数为 2, 则第一次间隔时间 1 秒, 第二次重试间隔时间 2 秒, 第三次 4 秒, 以此类推) spring.rabbitmq.template.retry.multiplier=2
  2. 消费者重试
  • spring.rabbitmq.listener.simple.acknowledge-mode: auto
  • spring.rabbitmq.listener.simple.acknowledge-mode: manual
  • spring.rabbitmq.listener.simple.acknowledge-mode: none
消息重新入列

消息重新入列是在队列头部

Demo验证

    @RabbitHandler
    public void process(Map<String, Object> testMessage, Channel channel, Message message) throws Exception {
        Thread.sleep(3000);
        String data = testMessage.get("messageData").toString();
        log.info("DirectReceiver消费者收到消息: {}", data);
        if (data.contains("2")) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

消费者端配置手动确认 消费线程数为1

生产者发送6条消息, messageData分别为1, 2, 3, 4, 5, 6

prefetch设为100, 消费情况如下

DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 3
DirectReceiver消费者收到消息: 4
DirectReceiver消费者收到消息: 5
DirectReceiver消费者收到消息: 6
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...

prefetch设为3, 消费情况如下

DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 3
DirectReceiver消费者收到消息: 4
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 5
DirectReceiver消费者收到消息: 6
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...

prefetch设为1, 消费情况如下

DirectReceiver消费者收到消息: 1
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
DirectReceiver消费者收到消息: 2
...
事务性

1. 生产者 保证消息发送的原子性

事务与异步确认机制是冲突的, 只能启用其中一个

配置

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //启用通道事务性
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
}

@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

使用 发送方法上加注解

@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")

开启事务模式之后,RabbitMQ 生产者发送消息会多出几个步骤:

  • 客户端发出请求,将信道设置为事务模式
  • 服务端给出回复,同意将信道设置为事务模式
  • 客户端发送消息
  • 客户端提交事务
  • 服务端给出响应,确认事务提交
  • 2. 消费者 保证消息消费的原子性

配置

@Bean
public AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
    configure(AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> factory) {
    factory.setChannelTransacted(true);
    return factory;
}

使用 同生产者一致

消费者若启用事务, 则spring.rabbitmq.listener.simple.acknowledge-mode最好为auto, 若为manual, rollback后, 会阻塞当前消费者, 消息一直为unacked状态

高可用
  1. 消息集群 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
  2. 镜像队列 可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用
  3. 跟踪机制 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么
  4. 持久化 交换机/队列/消息
其他配置
  1. 默认配置可能导致Channel线程不安全, 可以配置Channel缓存(池化)
//缓存模式 缓存channel
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); 
//最多缓存25个channel
connectionFactory.setChannelCacheSize(25);
//channel超时等待 当ChannelCheckoutTimeout的值大于0的时候,ChannelCacheSize的值就是最大的channel数量了,一旦从缓存中获取不到channel,等待ChannelCheckoutTimeout毫秒后,如果还是获取不到的,就会抛AmqpTimeoutException
connectionFactory.setChannelCheckoutTimeout(1000);


排查记录: Java所有接口卡死

线上接口, 每天凌晨三四点后开始卡死, 重启后恢复调用后等待两分钟, 然后504两分钟正好是nginx反代的超时时间排查过程日志把nginx和应用日志都找出来看了下, 没发现什么问题应用机器状态看了下cpu占用率/内存, 没问题磁盘占用也没问题应用状态通过jps查找到java进程然后jstack 进
2024-11-07

RabbitMQ/RocketMQ消息可靠性保证

RabbitMQRabbitMQ刷盘机制异步写入文件前会有一个Buffer,大小为1M(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(此时写入到内核态缓存中, 未必刷到磁盘);有个固定的刷盘时间:25ms,也就是不管Buffer
2022-11-04

RabbitMQ架构及特性

rabbitmq:3.9.2 spring-boot-starter-amqp:2.3.0.RELEASE 架构Producer 生产者消息成功发送到交换机, 会触发回调事件ConfirmCallback(需要配置)消息不能被交换机转发到队列中时, 会触发回调事件ReturnCallback(需要配
2021-08-18

freemarker 时间显示不正常 设置时区

项目在本地开发的时候显示正常,部署上服务器就一直差8个小时,最后发现freemarker官方文档有这样的说明time_zone:时区的名称来显示并格式化时间。 默认情况下,使用JVM的时区。 也可以是 Java 时区 API 接受的值,或者 "JVM default" (从 FreeMarker 2
2020-03-28
IDEA 2019.1 xml 不高亮

IDEA 2019.1 xml 不高亮

前几天更新了idea后,发现xml里的代码都没有了高亮,变得跟记事本一个德性了打开setting ,搜索 File Types,找到xml项, 查看下方的匹配格式,果然没有xml,(idea真是厉害)点击右方的+,输入*.xml,点击ok,解决问题
2020-03-28

npm install 淘宝镜像

npm install --registry=https://registry.npm.taobao.org
2020-03-28
Java中方法的参数传递机制

Java中方法的参数传递机制

来看一段代码 public class Man { private String name; private Integer age; public String getName() { return name; } publi
2020-03-28
基于自定义注解手写权限控制

基于自定义注解手写权限控制

方法一: AOP 方法二: 拦截器项目结构项目依赖<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-w
2020-03-28

Docker 部署 详细全过程 附代码

Docker 部署本站 全过程环境:CentOS7.61. 安装Docker其他版本CentOS可以参考这个https://help.aliyun.com/document_detail/187598.html查看本机内核版本,内核版本需高于 3.10uname -r 确保 yum 包最新yum u
2020-03-28

SpringBoot 启动普通java工程

引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.0.9</version> </dependency>
2020-03-28

Vue.js DOM操作

<template> <input type="button" @click="reply($event)" value="回复"> </template> export default { methods: { replyFun(e) {
2020-03-29
CentOS7编译调试OpenJDK12

CentOS7编译调试OpenJDK12

1. 下载源码https://hg.openjdk.java.net/jdk/jdk12点击左侧的browse,再点击zip,就可以下载zip格式的源码压缩包。unzip xxx.zip 解压文件2. 安装jdkyum install java-11-openjdk-devel -y3. 运行con
2020-04-23
编写自己的Spring Boot Starter

编写自己的Spring Boot Starter

1.新建一个maven项目命名规则统一是xxx-spring-boot-starter完整pom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"
2020-06-29