协议
消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,是采用底层的TCP/IP,UDP 协议还是在这基础上自己构建等,而这些约定成俗的规范就称之为:协议。
面试题:为什么消息中间件不直接使用 http 协议呢?
因为 http 请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码、晌应码等附加的功能,但是对于个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就够,要追求的是高性能。尽量简洁,快速。 大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。
常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka、OpenMessage协议
AMQP:高级消息队列协议。RabbitMQ
Kafka:Kafka
TOMP:简单文本协议。ActiveMQ
OpenMessage:RocketMQ
MQTT:消息队列遥测传输协议。基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于 TCP/IP 协议上。
JMS:Java 平台的一个标准,定义了一套 API,让开发者能够在不同的消息中间件产品之间进行互操作。ActiveMQ、IBM MQ 等消息中间件支持 JMS 规范。
工作原理

消息模式

注意:上图中简单模式和工作模式虽然没有画出交换机,但是都会有一个默认的交换机,类型为direct
交换机
直连交换机 Direct

主题交换机 Topic

广播交换机 Fanout

消息可靠性
消息应答
默认情况下,RabbitMQ 一旦向消费者发送了一条消息后,便立即将该消息标记为删除。由于消费者处理一个消息可能需要一段时间,假如在处理消息中途消费者挂掉了,我们会丢失其正在处理的消息以及后续发送给该消费者的消息。
为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答意思就是:消费者在接收消息并且处理完该消息之后,才告知 RabbitMQ 可以把该消息删除了。
// 肯定确认(deliveryTag 表示消息的标志,multiple表示是否为批量应答)
void basicAck(long deliveryTag, boolean multiple)
// 否定确认
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
void basicReject(long deliveryTag, boolean requeue)RabbitMQ 中消息应答方式有两种:自动应答(默认)、手动应答
自动应答:消息发送后立即被认为已经传送成功,高吞吐,可能会消息丢失,仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
手动应答:未收到 ACK 确认会采用 消息自动重新入队,可以避免自动应答中消息丢失的情况
持久化
默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会清空队列和消息,除非告知它不要这样做。
确保消息不会丢失需要做两件事:
队列持久化:把
durable参数设置为 true消息持久化:添加
MessageProperties.PERSISTENT_TEXT_PLAIN属性。
不公平分发
预取值
发布确认—简单
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
默认,没有开启。
// 开启发布确认
channel.confirmSelect();
// 进行确认
boolean flag = channel.waitForConfirms()三种策略:单个确认发布、批量确认发布、异步确认发布
对比
单独发布消息:同步等待确认,简单,但吞吐量非常有限。
批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
发布确认—高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

回调接口——消息确认
生产者到交换机:让无法发送到交换机的消息能够让生产者感知并做出处理
spring.rabbitmq.publisher-confirm-type=correlateNONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换机后会触发回调方法
SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法;其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
RabbitTemplate.ConfirmCallback
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
rabbitTemplate.setConfirmCallback(this);回调接口——消息回退
交换机到队列:让无法被路由的消息能够让生产者感知并做出处理
spring.rabbitmq.publisher-returns=trueRabbitTemplate.ReturnsCallback
@Override
public void returnedMessage(ReturnedMessage returned) {
}
rabbitTemplate.setReturnsCallback(this);备份交换机
解决:消息路由失败(交换机到队列)
会将消息发送给备份交换机,其广播到备份队列、报警队列

注意:备份交换机和消息回退接口同时存在,备份交换机优先级高
消息幂等性
针对:消息重复消费问题
比如消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者的重复消费。
一般使用 全局ID,就是每次完成一次操作应该生成一个唯一标识,比如时间戳、UUID、消息队列中消息的 id 号等等。这样每次消费消息时都先通过该唯一标识先判断该消息是否已消费过,如果消费过则不再消费,则避免了消息重复消费问题。
业界主流的幂等性解决方案:
指纹码机制:指纹码是按照一定规则,比如时间戳、其他服务给的唯一信息码而拼接出来的唯一标识,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来。然后就利用查询语句进行判断这个指纹码是否存在数据库中,优势就是实现简单,只需要进行拼接即可,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库会出现写入性能瓶颈,当然也可以采用分库分表提升性能。(不推荐)
数据库约束:利用数据库的唯一键约束(如主键、唯一索引)来防止重复插入。当一个请求尝试插入重复数据时,数据库会抛出异常,服务端捕获异常后,可以返回已处理的提示信息。适用于涉及数据写入的操作,如创建资源、更新状态等。
乐观锁:在更新数据时,使用版本号(version field)或时间戳(timestamp)等辅助字段来实现乐观锁。当更新数据时,先检查当前版本号或时间戳是否与请求携带的一致,一致则执行更新,否则拒绝并返回冲突信息。适用于并发环境下避免竞态条件导致的数据不一致。
分布式锁:如 ZooKeeper、Redis、etcd 等。利用 redis setnx命令、Redisson。(推荐)
进阶
TTL
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL:
消息设置TTL
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData -> {
correlationData.getMessageProperties().setExpiration(String.valueOf(ttl));
return correlationData;
});队列设置TTL
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 5000);
channel.queueDeclare(Constant01.NORMAL_QUEUE, false, false, false, arguments);如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
死信队列
死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。 还有比如说:用户在商城下单成功(或秒杀)并点击去支付后在指定时间未支付时自动失效
死信的原因:
消息 TTL 过期
队列达到最大长度(队列满了无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

延迟队列
通过死信队列实现
设置 ttl(包括队列的 ttl、消息的 ttl),消息在存活时间内未被消费(已到达队列),转到死信队列消费,从而实现延迟。

问题:
如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。本质是遵循 先来先到 的原则,先发送的消息延迟时间到了后才继续处理后面的消息。
通过 RabbitmQ 插件
https://www.rabbitmq.com/community-plugins
下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
# 1.将插件移动到 RabbitMQ 的插件 plugins 目录
cp rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins/
# 2.使插件生效
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 3.重启 rabbitMQ
systemctl restart rabbitmq-server
# docker 容器下
cd /mydata/rabbitmq
docker cp ./rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins/
docker exec -it rabbitmq bash
cd /plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart rabbitmq
实现图:
消息在延迟交换机处延迟,之后到达队列被消费。
