RabbitMQ 是一种高可用且数据安全的消息通信中间件。
AMQP 协议,即 Advanced Message Queuing Protocol,直译为高级消息队列协议,
AMQP 协议由三大模块组成,分别是交换机、消息队列、消息队列路由。通过这三大模块间的配合,可以实现对消息的发送、消息的监听等功能。
RabbitMQ 就是借鉴了 AMQP 协议的组成模块,在 AMQP 协议的基础上,对这些模块进行整合、拓展,并最终形成了基于 erlang 语言和 AMQP 协议的一个完备的消息中间件,因此,在 RabbitMQ 中,有些基础概念是和 AMQP 协议中的是几乎一样的,而有些概念却是 AMQP 协议中没有的。
术语概念
生产者与消费者
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort("5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ";
channel.basicPublish("", "test01", null, msg.getBytes());
String queueName = "test01";
channel.queueDeclare(queueName, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel);
channel.basicConsume(queueName, true, defaultConsumer);ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort("5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ";
channel.basicPublish("", "test01", null, msg.getBytes());
String queueName = "test01";
channel.queueDeclare(queueName, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel);
channel.basicConsume(queueName, true, defaultConsumer);4.2 消息
定义:
消息,顾名思义,就是我们平常所说的一条信息、一条数据,消息是 RabbitMQ 中的核心元素,我们产生的任何数据,在 RabbitMQ 中都被称为消息。
在整个 RabbitMQ 工作流程中间,有且只有消息被流转,我们使用 RabbitMQ 的目的就是来处理我们应用程序中需要 RabbitMQ 来处理的任何消息,该消息可以是一句话,可以是一张图片,可以是一条视频,也可以是一种文档。
4.3 虚拟主机
定义:
虚拟主机,即 Virtual Host ,是用来存储 RabbitMQ 中所有消息数据的集合,每个 RabbitMQ 服务中默认只有一台虚拟主机,并且提供用户自定义虚拟主机的功能。
我们可以这样理解:RabbitMQ 中的每个虚拟主机都是一台数据库,在这个数据库中会存储 RabbitMQ 的交换机、频道、路由 Key ,以及消息队列,每一个虚拟主机是一个独立的单元,各虚拟主机之间不会相互干扰,各自完成各自的任务。
4.4 交换机
定义:
交换机,即 exchange ,是传递消息的中间工具,我们可以把交换机理解为,传递消息的媒介,即我们发到 RabbitMQ 服务器中的消息,在经过虚拟主机之后,会首先到达 exchange 中,然后由 exchange 根据不同的匹配策略来将消息传递到对应的频道中去。
4.5 路由 Key
定义:
路由 Key ,即 routing key ,交换机与频道之间进行绑定的 key ,通过这个 key ,可以实现消息由交换机转移至频道的过程,从而实现消息的流转。
在同一个虚拟主机中,不能存在相同路由 key 的 key 值。
4.6 频道
定义:
频道,即 channel ,我们可以理解为传递消息的通道,在 RabbitMQ 中,消息最终经过 channel 发送给对应的消费者,消费者接收到消息并将该消息进行消费。
4.7 消息队列
定义:
消息队列,即消息所在的场所。在 RabbitMQ 中,不可能只存在一条消息,那么当存在多条消息时,RabbitMQ 会将这些消息组成一个队列,用来存放同一领域下的消息,这个队列就被称为消息队列。
当生成消息队列时,RabbitMQ 会根据不同的消息匹配策略,将不同领域的消息划分到对应的消息队列中去。消费者可以获取消息队列中的消息并消费,且我们可以通过设置一个阀值来规定我们同一时刻消费者需要消费的消息条数,这在高并发环境中是很重要的。
服务端配置
在 Windows 平台中,该配置文件被称为 rabbitmq-env-conf.bat ,在 Centos 平台中,该配置文件被称为 rabbitmq-env.conf 。
在 Windows 平台中,rabbitmq-env-conf.bat 配置文件的所在目录就位于 RabbitMQ 服务的安装目录下的 sbin 目录中;
在 Centos 平台中,rabbitmq-env.conf 配置文件的所在目录就位于 /usr/lib/rabbitmq/bin 目录下。
// 假定我们的应用程序所在地址为 192.165.22.123
RABBITMQ_NODE_IP_ADDRESS="192.165.22.123"
// 假定我们需要将 RabbitMQ 的服务绑定到 6672 端口上
RABBITMQ_NODE_PORT="6672"
// 假定我们需要将当前 RabbitMQ 服务所在节点的名称修改为 rabbitmq-server-one
RABBITMQ_NODENAME="rabbitmq-server-one"
// 用来规定 RabbitMQ 服务所生成的服务日志的路径
RABBITMQ_CONSOLE_LOG=/usr/temp/rabbtmq-log/one.log
// 用来描述 RabbitMQ 服务节点间进行通信的端口号
// 假定我们需要修改端口号为 RABBITMQ_NODE_PORT + 20001
RABBITMQ_DIST_PORT=RABBITMQ_NODE_PORT + 20001// 假定我们的应用程序所在地址为 192.165.22.123
RABBITMQ_NODE_IP_ADDRESS="192.165.22.123"
// 假定我们需要将 RabbitMQ 的服务绑定到 6672 端口上
RABBITMQ_NODE_PORT="6672"
// 假定我们需要将当前 RabbitMQ 服务所在节点的名称修改为 rabbitmq-server-one
RABBITMQ_NODENAME="rabbitmq-server-one"
// 用来规定 RabbitMQ 服务所生成的服务日志的路径
RABBITMQ_CONSOLE_LOG=/usr/temp/rabbtmq-log/one.log
// 用来描述 RabbitMQ 服务节点间进行通信的端口号
// 假定我们需要修改端口号为 RABBITMQ_NODE_PORT + 20001
RABBITMQ_DIST_PORT=RABBITMQ_NODE_PORT + 20001RabbitMQ 基础配置文件及参数
3.1 文件定义与路径
RabbitMQ 基础配置文件主要用来对 RabbitMQ 服务本身做一些配置,从而来满足我们的业务需求。同样地,该配置文件也是根据不同的操作系统来进行命名的,在 Windows 系统中,该配置文件被称为 rabbitmq-conf.bat , 在 Centos 系统中,该配置文件被称为 rabbitmq.conf 。
在 Centos 系统中,该配置文件默认的路径为 /etc/rabbitmq/ ,在 Windows 系统中,该配置文件默认的路径为 %APPDATA%\RabbitMQ ,其中,APPDATA 指的是 RabbitMQ 中数据文件所在的路径。
3.2 核心配置属性介绍
listeners 属性
listeners 属性是 AMQP 协议用来监听 tcp 端口的监听器,完整属性名称为 listeners.tcp.default 。 该属性的默认值为 5672 ,即 AMQP 协议默认监听 5672 端口。
log.file.level 属性 log.file.level 属性用来定义 RabbitMQ 服务日志的打印级别,一共有 4 种日志打印级别,分别是 error 、warning 、info 、debug,这四种日志级别根据这个顺序互相包含,即 debug 级别的日志会打印出 error 、warning、info、debug 的所有数据。
该属性默认值为 info ,即默认的服务日志打印级别为 info 级别,会打印包括 error 、warning 、info 的所有数据。
channel_max & channel_operation_timeout 属性 channel_max 属性是用来规定具体的一个频道与客户端的最大连接数量,该数量的默认值为 2047 。
channel_operation_timeout 属性是用来规定获取具体的一个频道连接的最大超时时间,该时间默认值为 15000 毫秒。
max_message_size & heartbeat & default_vhost 属性
max_message_size 属性用来规定一条消息的最大占用空间,该属性的单位为比特,默认值为 134217728 ,最大值为 536870912 。
heartbeat 属性用来规定 RabbitMQ 服务中心跳检测的超时时间,如果该属性的值设置为 0 ,则 RabbitMQ 服务的心跳检测机制会自动关闭,该属性的默认值为 60 秒。
default_vhost 属性用来规定 RabbitMQ 服务中默认的虚拟主机名称,默认值为 / 。
default_user & default_pass & default_user_tags & default_permissions 属性
default_user 属性和 default_pass 属性分别用来规定 RabbitMQ 服务中默认的用户名和密码,默认值均为 guest 。
default_user_tags 属性用来对用户的角色进行设置,默认值为 administrator,即默认用户为 RabbitMQ 服务的超级管理员,如下代码所示:
default_user_tags.administrator = true
default_permissions 属性用来规定用户的默认权限,默认值为所有用户都可以对 RabbitMQ 进行配置,以及读写操作。
如果我们想修改用户的权限,那么我们需要这样进行修改:
default_permissions.configure = .*
default_permissions.read = .*
default_permissions.write = .*
上述代码中,configure 用来设置是否允许用户对 RabbitMQ 进行配置,read 用来设置用户的读权限,write 用来设置用户的写权限。
cluster_formation.classic_config.nodes 属性
cluster_formation.classic_config.nodes 属性用来设置不同 RabbitMQ 服务节点间连接,该属性会作为一个列表生效,即该列表中的所有 RabbitMQ 服务节点在 RabbitMQ 服务启动时都将被启动,且各节点间的通信通道将被打开。
那么该如何定义我们所需要的节点呢?如下代码所示:
// 假定我们有两个服务节点需要通信
cluster_formation.classic_config.nodes.1 = rabbit@hostname1 cluster_formation.classic_config.nodes.2 = rabbit@hostname2
Tips:
- rabbitmq.conf 文件中的所有属性,一经修改,需要重启 RabbitMQ 服务才可生效。
- 出于不同的安装方式,rabbitmq.conf 配置文件可能有的安装方式不会自动生成,如果需要,我们可以在上述指定目录新建一个 rabbitmq.conf 文件,并重启 RabbitMQ 服务。
消息发送原理

在 AMQP 协议中,RabbitMQ Server 又被称为 Broker
在一个 RabbitMQ Server 中,有且只有一个 Virtual Host ,在一个 Virtual Host 中,存在多个 Exchange 和 Channel ,以及多个 Queue

RabbitMQ 消息发送的步骤:
第一步,生产者将消息生产出来,并将消息发送到 RabbitMQ Server 上,即我们发到 RabbitMQ 中的消息,会首先置于 RabbitMQ Server 中;
第二步,RabbitMQ Server 根据客户端所发来的连接请求,判断将消息传递到哪个 Virtual Host 中,如果我们在连接 RabbitMQ Server 时,没有设置要连接的 Virtual Host 地址,则 RabbitMQ Server 会将我们的消息传递到地址为 “/” 的 Virtual Host 中去;
第三步,在将消息传递到对应的 Virtual Host 中后,Virtual Host 会继续解析我们的连接请求,并在这一步解析出我们需要的 Exchange 的类型,以及 Channel 的名称,Queue 的名称,以及消息和 Exchange 之间是否有 routing_key ,Channel 和 Queue 之间是否有 bidding_key 这些信息;
第四步,Virtual Host 会根据解析出来的这些信息,将消息和 Exchange 进行匹配,相应的,Exchange 也会和对应的 Channel 进行匹配,并最终将 Queue 和 Channel 进行绑定,使消息进入到对应的消息队列中去;
第五步,待消息进入到对应的消息队列中之后,RabbitMQ Server 会返回给我们一个确认应答(确认应答后续会进行介绍),来通知我们,消息已经成功被 RabbitMQ Server 所发送,于是,消费者便会根据一定的策略来从消息队列中获取消费,并最终将该消息消费掉,消息消费之后,也会给我们返回一个确认应答(确认应答后续会进行介绍),告诉我们消息已经成功消费掉了。

消息发送模式
根据 RabbitMQ 所实现的消息投递方式来划分,可以将消息发送模式分为两大类,分别是点对点模式、发布订阅模式;根据 RabbitMQ 所采用的队列方式以及匹配规则的不同,可以将消息发送模式分为五大类,分别是普通队列模式、工作队列模式、发布订阅模式、直接模式、主题模式。
直接模式
直接模式,即直接发送消息模式,指的是将消息直接发送给消费者。
直接模式允许将多个队列绑定到一个交换机上,在生产者发送消息给交换机时,需要携带一个 key ,而这个 key 一般被称为 routing key 或者 binding key,所以直接模式有时也被称为路由模式。
单 key 绑定

从图中我们可以看到,交换机的 type 被声明成了 direct ,这说明我们使用的交换机是直接交换机,即使用的消息发送模式是直接模式;
orange、black、green 分别表示不同的两个 routing key ,orange 这一个 key 绑定了一个队列,black、green、两个不同的 key 也绑定了一个队列,这种现象就是直接模式的第一种业务场景,单 key 绑定。
单 key 绑定的队列,在生产者生产出消息之后,会根据不同 key 指向的不同队列来将消息进行分发,即使是不同的 key 绑定了同一队列。
多重 key 绑定

在图中我们可以看到,Q1、Q2 两个队列,分别绑定到了 routing key 均为 black 的 direct 交换机上,即名称相同的一个 key 绑定到了多个队列上面,这种现象被称为多重 key 绑定。
在多重 key 绑定下,生产者生产的消息均会被发送到相同 key 值所绑定的队列上面
// 生产者 channel.basicPublish(EXCHANGE_NAME, "Routing Key", ...)
// 消费者 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "Routing Key"
发布订阅模式
发布订阅模式,即生产者发布消息,消费者通过订阅的方式来消费消息。
其实,发布订阅模式在我看来,不过是给传统的发送和接收起一个高大上的名字罢了,本质上仍热是消息的生产和消费,只不过这种模式更像与发布和订阅,因此得名发布订阅模式。
群发模式

// 生产者 channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
// 消费者 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "") channel.basicConsume(QUEUE_NAME, false, consumer)
普通队列模式
普通队列模式,即最简单的消息发送模式,不使用任何交换机,由生产者、队列、消费者组合完成消息的发送和接收。普通队列模式,由于其操作简单,所以又被称为简单模式,如下图所示:

工作队列模式
工作队列模式,和普通队列模式有点像,都是不使用任何交换机,由生产者、队列、消费者组合完成消息的发送和接收,只不过工作队列支持存在多个消费者,而普通队列模式只支持一个消费者。

主题模式
主题模式,也被称为通配符模式,官网一般称为主题模式,即交换机与消息队列所绑定的 key 值可以像匹配通配符的方式,来匹配消息队列

主题模式对 routing key 的匹配规则做了改进,上述其他四种模式中有涉及 key 匹配的地方都是完全匹配,即名称必须相等时才能把 key 匹配上,而对于主题模式,则不需要这样。
主题模式将 key 值中的每个单词或者关键词,使用英文状态下的 . 符号进行间隔,如上图所示。上图中为我们列举了主题模式中支持的所有通配符语法,我们一个一个来介绍:
*.orange.* 表示在 orange 的两侧可以匹配一个 key 值,例如 123.orange.456 、abc.orange.456 等,但是,abc.orange.456.123 这个是不可以的。
..rabbit : 用法和上述 orange 相同,例如: 123.abc.rabbit 、abd.acd.rabbit 等。
lazy.# : 表示在 lazy 的右侧,可以匹配多个 key 值可以进行通配符匹配,例如:lazy.abc.123 等。
主题模式的最佳实践?
死信队列
死信交换机 DLX Dead-Letter-Exchange
死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢? 一般消息变成死信消息有如下几种情况:
- 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
- 消息过期
- 队列达到最大长度
当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。
DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
创建一个死信交换机,接着创建一个死信队列,再将死信交换机和死信队列绑定到一起
为消息队列配置死信交换机
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<>();
//设置消息过期时间
args.put("x-message-ttl", 0);
//设置死信交换机
args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
}@Bean
Queue queue() {
Map<String, Object> args = new HashMap<>();
//设置消息过期时间
args.put("x-message-ttl", 0);
//设置死信交换机
args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
}SpringBoot是如何管理RabbitMQ中的Channel?_Abstracted的博客-CSDN博客
SpringBoot 使用了 RabbitMQ 中的 IO多路复用 的理念,并对这种方案在 Web 场景下进行了优化,尽可能的复用空闲中的 channel。 SpringBoot 默认使用 CachingConnectionFactory 对 Connection 和空闲 Channel 进行缓存。