RabbitMQ
RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。
1. 安装
相关端口
- 5672 - RabbitMQ的通讯端口
- 25672 - RabbitMQ的节点间的CLI通讯端口
- 15672 - RabbitMQ的HTTP_API端口,管理员用户才可以访问,用于管理RabbitMQ,需要启动Management插件。
- 1883, 8883 - MQTT插件启动时的端口。
- 61613, 61614 - STOMP客户端插件启动时候的端口。
- 15674, 15675 - 基于websocket的STOMP端口和MOTT端口。
1.1. Linux
安装 Erlang。
RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境。
rpm -Uvh erlang*.rpm yum install -y erlang erl -v
安装socat插件以来
yum install socat -y
官网下载RabbitMQ安装包,解压安装。
rpm -Uvh rabbitmq-server*.rpm yum install rabbitmq-server -y
启动服务
systemctl start rabbitmq-server systemctl status rabbitmq-server
开机自启
systemctl enable rabbitmq-server
配置
RabbitMQ默认情况下有一个配置文件,定义了RabbitMQ的相关配置信息,默认情况下能够满足日常的开发需求,如果有修改的需要可以自建配置文件覆盖。
配置文件
/etc/rabbitmq/rabbitmq.conf
.
1.2. Docker
# 运行
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
# 查看日志
docker logs -f myrabbit
2. 基本使用
2.1. 开启Web管理界面
装插件重启。
rabbitmq-plugins enable rabbitmq_management systemctl restart rabbitmq-server
http://ip:15672
默认账号密码为guest
,但该账号仅限本机使用,有administrator权限。
2.2. 用户管理
# 新增用户
rabbitmqctl add_user <username> <password>
# 分配权限
rabbitmqctl set_user_tags <username> <tags>
# 为用户添加资源权限(.*表示所有权限,以下命令相当于给予administrator拥有的权限)
rabbitmqctl set_permissions -p / <username> ".*" ".*" ".*"
# 修改密码
rabbitmqctl change_password <username> <new_password>
# 删除用户
rabbitmqctl delete_users <username>
# 查看用户清淡
rabbitmqctl list_users
角色分类
- None
- 不能访问management plugin
Management
- 列出自己可以通过AMQP登入的虚拟机。
- 查看自己的虚拟机节点Virtual Hosts的Queues, Exchanges和Bindings信息。
- 查看和关闭自己的Channels和Connections.
- 查看有关自己的虚拟机节点virtual Hosts的统计信息。包括其他用户在这个节点virtual Hosts的活动信息。
Policymaker
- 包含Management权限。
- 查看、创建和删除自己的Virtual Hosts所属的Policies和Parameters信息。
- Monitoring
- 包含Management所有权限。
- 罗列出所有的Virtual Hosts,包括不能登陆的Virtual Hosts。
- 查看其他用户的Connections和Channels信息。
- 查看节点级别的数据,如Clustering和Memory使用情况。
- 查看所有的Virtual Hosts的全局统计信息。
- Administrator
- 最高权限。
- 可以创建和删除Virtual Hosts。
- 可以查看、创建和删除Users。
- 查看创建Permissions。
- 关闭所有用户的Connections。
2.3. 详细配置
rabbitmq:
addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
# port:
##集群配置 addresses之间用逗号隔开
# addresses: ip:port,ip:port
password: admin
username: 123456
virtual-host: / # 连接到rabbitMQ的vhost
requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s
publisher-confirms: #是否启用 发布确认
publisher-reurns: # 是否启用发布返回
connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时
cache:
channel.size: # 缓存中保持的channel数量
channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
connection.size: # 缓存的连接数,只有是CONNECTION模式时生效
connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION
listener:
simple.auto-startup: # 是否启动时自动启动容器
simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
simple.concurrency: # 最小的消费者数量
simple.max-concurrency: # 最大的消费者数量
simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒
simple.retry.enabled: # 监听重试是否可用
simple.retry.max-attempts: # 最大重试次数
simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
simple.retry.multiplier: # 应用于上一重试间隔的乘数
simple.retry.max-interval: # 最大重试时间间隔
simple.retry.stateless: # 重试是有状态or无状态
template:
mandatory: # 启用强制信息;默认false
receive-timeout: # receive() 操作的超时时间
reply-timeout: # sendAndReceive() 操作的超时时间
retry.enabled: # 发送重试是否可用
retry.max-attempts: # 最大重试次数
retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
retry.multiplier: # 应用于上一重试间隔的乘数
retry.max-interval: #最大重试时间间隔
3. 核心组成部分
核心概念:
- Server:(Broker节点)用于接收客户端的连接,实现AMQP的实体服务。
- Connection:连接,应用程序与Broker的网络连接TCP/IP(三次握手四次挥手)。
- Channel:网络信道。几乎所有的操作都在Channel中进行。Channel是消息进行读写的信道,客户端可以建立多个Channel,每个Channel代表一个会话任务。
- Message:消息,是服务与应用程序之间传送的数据。由Properties和Body组成,Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性。Body是消息体的内容。
- Virtual Host:虚拟地址,用于进行逻辑隔离。是最上层的消息路由,一个虚拟机可以有若干个Exchange和Queue,但不可以有相同名字的Exchange。
- Exchange:交换机,用于接收消息并根据Routing Key将消息发送到绑定的队列,不具备消息存储的能力。
- Bindings:Exchange和Queue之间的虚拟连接。
- Routing Key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定的消息。
- Queue:队列,也称为Message Queue消息队列,用于保存消息并将它们转发给消费者。
4. 使用场景
主要有三个:解耦、削峰、异步。
可以解决同步异步的问题。部分任务适合异步执行,但是异步需要维护线程池,并且代码耦合在业务中。维护成本和耦合度都很高,同时还有一些风险比如消息的可靠性、系统的高可用性等。使用消息中间件可以解决上述问题。
- 分布式事务的可靠消费和可靠生产。
- 索引、缓存、静态话处理的数据同步。
- 流量监控。
- 日志监控(ELK)。
- 下单、订单分发、抢票。
5. Simple简单模式
5.1. 说明
图上省略了默认交换机,插入消息到队列都是由交换机完成的。
大致过程:
连接工厂ConnectionFactory -> 连接Connection -> 通道Channel -> 交换机 queueDeclare -> 发布publish/消费consume -> 关闭通道Channel -> 关闭Connection
5.2. 代码
导依赖。
<!-- rabbitmq依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency>
生产者。
public class Producer { public static void main(String[] args) { // 所有的中间件技术都是基于tcp/ip协议基础之上,构建的新型协议规范 // rabbitmq遵循amqp协议 // ip port // 1. 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2. 创建连接Connection connection = connectionFactory.newConnection("生产者"); // 3. 通过连接获取通道Channel channel = connection.createChannel(); // 4. 通过创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息 String queueName = "queue1"; /** * @params1 队列名称 * @params2 是否要持久化,mq服务重启后会不会还在。如不持久化,也会存盘,但会随着mq服务重启而丢失 * @params3 排他性,是否是一个独占队列 * @params4 是否自动删除,随着最后一个消费者消费完消息后是否把队列删除 * @params5 携带附属参数 */ channel.queueDeclare(queueName, false, false, false, null); // 5.准备消息内容 String message = "hello RabbitMQ!"; // 6. 发送消息 /** * @params1 交换机 * @params2 队列,路由key * @params3 消息的状态控制 * @params4 消息主题 */ channel.basicPublish("", queueName, null, message.getBytes()); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { // 7. 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } // 8. 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }
消费者
public class Consumer { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者"); channel = connection.createChannel(); String queueName = "queue1"; channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息:" + new String(message.getBody(), Charset.defaultCharset())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败"); } }); System.out.println("开始接收消息"); System.in.read(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } System.out.println("发送完毕"); } } }
6. Fanout模式
6.1. 说明
发布与订阅
而后在此交换机发送的所有消息,都会发送到所绑定的所有队列中。
6.2. 代码
生产者
public class Producer { public static void main(String[] args) { // 所有的中间件技术都是基于tcp/ip协议基础之上,构建的新型协议规范 // rabbitmq遵循amqp协议 // ip port // 1. 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2. 创建连接Connection connection = connectionFactory.newConnection("生产者"); // 3. 通过连接获取通道Channel channel = connection.createChannel(); // 4.准备消息内容 String message = "hello RabbitMQ!"; // 5. 准备交换机 String exchangeName = "fanout-exchange"; String routeKey = ""; // 6. 发送消息 /** * @params1 交换机 * @params2 队列,路由key * @params3 消息的状态控制 * @params4 消息主题 */ channel.basicPublish(exchangeName, routeKey, null, message.getBytes()); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { // 7. 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } // 8. 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } System.out.println("发送完毕"); } } }
消费者
public class Consumer { private static Runnable runnable = new Runnable() { @Override public void run() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); // 获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者"); channel = connection.createChannel(); channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(queueName + "收到消息:" + new String(message.getBody(), Charset.defaultCharset())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败"); } }); System.out.println(queueName + "开始接收消息"); System.in.read(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }; public static void main(String[] args) { new Thread(runnable, "queue1").start(); new Thread(runnable, "queue2").start(); new Thread(runnable, "queue3").start(); } }
7. Direct模式
7.1. 说明
路由模式
相比于Fanout模式增加了Routing Key区分要发送的队列。
不同队列可以拥有相同的Routing Key。
之后,客户端根据约定的Routing Key在对应的Queue中取数据即可。
7.2. 代码
生产者
public class Producer { public static void main(String[] args) { // 所有的中间件技术都是基于tcp/ip协议基础之上,构建的新型协议规范 // rabbitmq遵循amqp协议 // ip port // 1. 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2. 创建连接Connection connection = connectionFactory.newConnection("生产者"); // 3. 通过连接获取通道Channel channel = connection.createChannel(); // 4.准备消息内容 String message = "hello RabbitMQ!"; // 5. 准备交换机 String exchangeName = "direct-exchange"; String routeKey = "sms"; // 6. 发送消息 /** * @params1 交换机 * @params2 队列,路由key * @params3 消息的状态控制 * @params4 消息主题 */ channel.basicPublish(exchangeName, routeKey, null, message.getBytes()); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { // 7. 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } // 8. 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } System.out.println("发送完毕"); } } }
消费者
public class Consumer { private static Runnable runnable = new Runnable() { @Override public void run() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); // 获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者"); channel = connection.createChannel(); channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(queueName + "收到消息:" + new String(message.getBody(), Charset.defaultCharset())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败"); } }); System.out.println(queueName + "开始接收消息"); System.in.read(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }; public static void main(String[] args) { new Thread(runnable, "queue1").start(); new Thread(runnable, "queue2").start(); new Thread(runnable, "queue3").start(); } }
8. Topic模式
8.1. 说明
* -> 只匹配一级
# -> 可以匹配0级到多级
8.2. 代码
生产者
public class Producer { public static void main(String[] args) { // 所有的中间件技术都是基于tcp/ip协议基础之上,构建的新型协议规范 // rabbitmq遵循amqp协议 // ip port // 1. 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2. 创建连接Connection connection = connectionFactory.newConnection("生产者"); // 3. 通过连接获取通道Channel channel = connection.createChannel(); // 4.准备消息内容 String message = "hello RabbitMQ!"; // 5. 准备交换机 String exchangeName = "topic-exchange"; String routeKey = "com.order.test.xxx"; // 6. 发送消息 /** * @params1 交换机 * @params2 队列,路由key * @params3 消息的状态控制 * @params4 消息主题 */ channel.basicPublish(exchangeName, routeKey, null, message.getBytes()); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { // 7. 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } // 8. 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } System.out.println("发送完毕"); } } }
消费者
public class Consumer { private static Runnable runnable = new Runnable() { @Override public void run() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); // 获取队列的名称 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者"); channel = connection.createChannel(); channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(queueName + "收到消息:" + new String(message.getBody(), Charset.defaultCharset())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("接收消息失败"); } }); System.out.println(queueName + "开始接收消息"); System.in.read(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }; public static void main(String[] args) { new Thread(runnable, "queue1").start(); new Thread(runnable, "queue2").start(); new Thread(runnable, "queue3").start(); } }
9. Headers模式
9.1. 说明
可以在队列上设置参数赋值,插入时会插入到满足参数的队列中。
10. Work模式
Work模式由两种:
- 轮询模式(Round-Robin):多个消费者接入的时候,每个消费者消费的数量相同。
- 公平分发:根据消费者的消费能力分发,处理得快的分配的多,按劳分配。
10.1. 轮询模式
平均分配。
默认就是轮询模式。
代码
生产者
public class Producer { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("生产者"); channel = connection.createChannel(); for (int i = 0; i < 20; i++) { String msg = "msg " + i; channel.basicPublish("", "queue1", null, msg.getBytes()); } System.out.println("消息发送成功"); } catch (Exception e) { e.printStackTrace(); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }
消费者1
public class Consumer1 { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者1"); channel = connection.createChannel(); channel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("work1收到消息:" + new String(message.getBody())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } } ); System.out.println("work1 开始接收消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }
消费者2
public class Consumer2 { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者2"); channel = connection.createChannel(); channel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("work2收到消息:" + new String(message.getBody())); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } } ); System.out.println("work2 开始接收消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }
10.2. 公平分发模式
在轮询模式的基础上设置手动应答+QOS
channel.basicQos(1)
指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。队列中没有被消费的消息不会被删除,还是存在于队列中。
QoS(Quality of Service,服务质量)指一个网络能够利用各种基础技术,为指定的网络通信提供更好的服务能力,是网络的一种安全机制, 是用来解决网络延迟和阻塞等问题的一种技术。
代码
生产者
public class Producer { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("生产者"); channel = connection.createChannel(); for (int i = 0; i < 20; i++) { String msg = "msg " + i; channel.basicPublish("", "queue1", null, msg.getBytes()); } System.out.println("消息发送成功"); } catch (Exception e) { e.printStackTrace(); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }
消费者1
public class Consumer1 { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者1"); channel = connection.createChannel(); channel.basicQos(1); Channel finalChannel = channel; channel.basicConsume("queue1", false, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("work1收到消息:" + new String(message.getBody())); try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); } finalChannel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } } ); System.out.println("work1 开始接收消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }
消费者2
public class Consumer2 { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.104"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者2"); channel = connection.createChannel(); channel.basicQos(1); Channel finalChannel = channel; channel.basicConsume("queue1", false, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("work2收到消息:" + new String(message.getBody())); try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } finalChannel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } } ); System.out.println("work2 开始接收消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if (null != channel && channel.isOpen()) { try { channel.close(); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } } if (null != connection && connection.isOpen()) { try { connection.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } }
11. 集成Spring Boot
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
server:
port: 8080
spring:
# 配置rabbitmq
rabbitmq:
password: admin
username: admin
host: 192.168.1.104
port: 5672
virtual-host: /
11.1. Fanout模式
生产者
通过Spring Boot配置类配置交换机、声明队列,完成绑定。
```java @Configuration public class RabbitMqConfiguration {
// 1. 声明注册fanout模式的交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_order_exchange", true, false); } // 2. 声明队列 sms.fanout.queue email.fanout.queue, wechat.fanout.queue @Bean public Queue smsQueue() { return new Queue("sms.fanout.queue", true); } @Bean public Queue emailQueue() { return new Queue("email.fanout.queue", true); } @Bean public Queue wechatQueue() { return new Queue("wechat.fanout.queue", true); } // 3. 完成绑定关系(队列和交换机完成绑定关系) @Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); } @Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); } @Bean public Binding wechatBinding() { return BindingBuilder.bind(wechatQueue()).to(fanoutExchange()); }
}
```
调用
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; /** * 用户下单 * @param userId * @param produceId * @param num */ public void makeOrder(String userId, String produceId, Integer num) { // 1. 根据商品id查询库存是否充足 // 2. 保存订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功: " + orderId); // 3. 通过MQ来完成消息的分发 /** * @param1 交换机 * @param2 路由key/queue队列名称 * @param3 消息内容 */ String exchangeName = "fanout_order_exchange"; String routingKey = ""; rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId); } }
消费者
消费者1
@Component // 监听器 @RabbitListener(queues = {"email.fanout.queue"}) public class EmailConsumer { // 处理器,标记消息的落脚点 @RabbitHandler public void receviceMessage(String message) { System.out.println("email fanout receive => " + message); } }
消费者2
@Component @RabbitListener(queues = {"sms.fanout.queue"}) public class SmsConsumer { @RabbitHandler public void receiveMessage(String message) { System.out.println("sms fanout receive => " + message); } }
消费者3
@Component @RabbitListener(queues = {"wechat.fanout.queue"}) public class WechatConsumer { @RabbitHandler public void receviceMessage(String message) { System.out.println("wechat fanout receive => " + message); } }
11.2. Direct模式
生产者配置
@Configuration
public class DirectRabbitMqConfiguration {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-order-exchange", true, false);
}
@Bean
public Queue smsQueue() {
return new Queue("sms.direct.queue", true);
}
@Bean
public Queue wechatQueue() {
return new Queue("wechat.direct.queue", true);
}
@Bean
public Queue emailQueue() {
return new Queue("email.direct.queue", true);
}
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding wechatBinding() {
return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
}
}
11.3. Topic模式
生产者
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; String exchangeName = "topic-order-exchange"; public void makeOrder(String userId, String productId, Integer num) { String orderId = UUID.randomUUID().toString(); System.out.println("订单生产成功 => " + orderId); // *.email.test // #.sms.# // com.# rabbitTemplate.convertAndSend(exchangeName, "com.email.test", orderId); } }
消费者
@Component // bindings 用来绑定队列和交换机之间的关系 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "email.topic.queue", autoDelete = "false"), key = {"*.email.test"}, exchange = @Exchange(value = "topic-order-exchange", type = ExchangeTypes.TOPIC) ))
12. 高级
12.1. TTL
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取。过了时间后消息将被自动删除。
RabbitMQ可以对消息和队列设置TTL。
目前有两种方法可以设置TTL。
通过队列属性设置,队列中所有的消息都有相同的过期时间。
Map<String,Object> map = new HashMap<>(); map.put("x-message-ttl",5000); channel.queueDeclare("ttl.queue", true, false, false, map);
对消息进行单独设置,每条消息的TTL可以不同。
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .deliveryMode(2) // 传送方式 .priority(1) .contentEncoding("UTF-8") // 编码方式 .expiration("3000") // 过期时间 .headers(headers).build(); //自定义属性
上述二者的差异:带TTL的消息队列在消息过期后,可以设置死信队列将该消息投递到死信队列中。而对消息单独设置TTL则不行。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message会进行删除,消费者将无法再收到该消息。
参数x-message-ttl
的值 必须是非负 32 位整数 (0 <= n <= 232-1) ,以毫秒为单位表示 TTL 的值。这样,值 6000 表示存在于 队列 中的当前 消息 将最多只存活 6 秒钟。
12.2. 死信队列
可靠消费的机制。
DLX,全称Dead-Letter-Exchange。当消息在一个队列中变为死信(dead message)时,能重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称为死信队列。
死信发生的原因:
- 消息被拒绝
- TTL消息过期
- Lim队列达到最长长度后,继续接受的消息成为死信
x-max-length
DLX是一个正常的交换机,和一般的交换机没有区别,能在任何队列上被指定,实际上是设置某一队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
使用:在队列上设置队列参数x-dead-letter-exchange
指定对应的交换机即可。
Map<String,Object> map = new HashMap<>();
map.put("x-message-ttl",5000); // 需要有TTL的队列过期后才会投递
map.put("x-dead-letter-exchange", "dead_direct_exchange"); // 配置死信的交换机
map.put("x-dead-letter-routing-key", "dead") // 配置死信的路由key
channel.queueDeclare("ttl.queue", true, false, false, map);
12.3. 消息确认机制
消息确认机制,具体如下。
spring:
rabbitmq:
# ...
publisher-confirm-type: correlated
- NONE值是禁用发布确认模式,是默认值。
- CORRELATED值是发布消息成功到交换器后会触发回调方法。
- SIMPLE值经测试有两种效果。其一效果和CORRELATED值一样会触发回调方法;其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。
确认是否成功后的回调方法:
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息确认成功!!!!");
}else{
System.out.println("消息确认失败!!!!");
}
}
}
调用:
rabbitTemplate.setConfirmCallback(new MessageConfirmCallback());
rabbitTemplate.convertAndSend("topic_order_ex","com.email.sms.xxx",orderId);
12.4. 持久化机制
非持久消息,当内存不够时,消息和数据转移也会到磁盘,但重启后丢失。
持久化和非持久化的消息都可以写入磁盘中。
持久化即将消息写入到磁盘。
分类
队列持久化
- 对应
channel.queueDeclare()
方法的durable
属性。
- 对应
消息持久化
消息持久化是通过消息的属性deliveryMode来设置是否持久化,在发送消息时通过basicPublish的参数传入。
// 参数1:交换机的名字 // 参数2:队列或者路由key // 参数3:是否进行消息持久化 // 参数4:发送消息的内容 channel.basicPublish(exchangeName, routingKey1, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
交换机持久化
- 需在定义时设置持久化标识。对应
channel.exchangeDeclare()
方法的durable
属性。
- 需在定义时设置持久化标识。对应
12.5. 内存磁盘监控
通过命令修改阈值在Broker重启后会失效,通过配置则不会。
内存
内存预警
当内存使用超过配置的阈值或者磁盘空间剩余空间对于配置的阈值时,RabbitMQ会暂时阻塞客户端的连接,并且停止接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。
配置方式
vm_memory_high_watermark.relative=0.4
内存极限相对大小(默认0.4,取值一般在0.4~0.7之间,建议不超过0.7)vm_memory_high_watermark.absolute=2GB
内存极限绝对大小
命令方式
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute <value>
feation/value
为内存阈值,默认情况为0.4/2GB。代表当RabbitMQ的内存超过40%时,就会产生警告并且阻塞所有生产者的连接,在页面上会爆红或者爆黄。
内存换页
在某个Broker节点及内存阻塞生产者之前,会尝试将队列中的消息换页到磁盘以释放内存空间。持久化和非持久化的消息都会写入到磁盘中,其中持久化消息本身在磁盘中有一个副本,所以在转移的过程中持久化的消息会优先从内存中清除掉。
默认情况下,内存达到阈值(前面设置的内存极限大小)的50%时就会进行换页。
vm_memory_high_watermark_paging_ratio=0.7
(设置小于1的值,等于1为达到阈值后才开始换页)
磁盘
配置方式
disk_free_limit.relative=3.0
磁盘极限相对大小disk_free_limit.absolute=50MB
磁盘极限绝对大小(默认50MB)
命令方式
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
当磁盘可用空间低于50MB时,也会产生警告并阻塞所有生产者的连接。
12.6. 集群
RabbitMQ基于Erlang,Erlang天生具备分布式特性(通过同步Erlang集群各节点的Magic cookie实现)。不需要通过ZooKeeper来实现HA方案和保存集群的元数据。以此保证可靠性,水平扩展和达到增加消息吞吐能力的目的。
主节点:
# 停止应用 -n ==> --node
rabbitmqctl -n rabbit-1 stop_app
# 清除节点上的历史数
rabbitmqctl -n rabbit-1 reset
# 启动应用
rabbitmqctl -n rabbit-1 start_app
从节点:
# 复制主节点的cookie并复制到对应的目录下(节点之间通过cookie确定相互是否可通信)
# cookie存放在/var/lib/rabbitmq/.erlang.cookie
# 停止应用
rabbitmqctl -n rabbit-2 stop_app
# 清除节点上的历史数据
rabbitmqctl -n rabbit-2 reset
# 将rabbit2节点加入到rabbit1(主节点)集群当中【Server-node服务器的ip地址或主机名】
rabbitmqctl -n rabbit-2 join_cluster rabbit-1@<IpAddr Or Hostname>
# 启动应用
rabbitmqctl -n rabbit-2 start_app
# 验证是否成功
rabbitmqctl cluster_status -n rabbit-1
# 对主从添加web监控插件 -n rabbit-1
rabbitmq-plugins enable rabbitmq_management
# 对主从节点都要设置用户密码和权限等讯息 -n rabbit-1
# 新增用户
rabbitmqctl add_user admin admin
# 分配权限
rabbitmqctl set_user_tags admin administrator
# 为用户添加资源权限(.*表示所有权限,以下命令相当于给予administrator拥有的权限)
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
12.7. 日志监控
可以对消息的投递和消费过程进行监控,而tracing日志监控插件帮我们很好的实现了该功能 消息中心的消息追踪需要使用Trace实现,Trace是RabbitMQ用于记录每一次发送的消息,方便使用Rabbitmq的开发者调试、排错。可通过插件形式提供可视化界面。Trace启动后会自动创建系统Exchange:amq.rabbitmq.trace ,每个队列会自动绑定该Exchange,绑定后发送到队列的消息都会记录到Trace日志。
以下是trace的相关命令和使用(要使用需要先rabbitmq启用插件,再打开开关才能使用):
命令集 | 描述 |
---|---|
rabbitmq-plugins list | 查看插件列表 |
rabbitmq-plugins enable rabbitmq_tracing | rabbitmq启用trace插件 |
rabbitmqctl trace_on | 打开trace的开关 |
rabbitmqctl trace_on -p itcast | 打开trace的开关(itcast为需要日志追踪的vhost) |
rabbitmqctl trace_off | 关闭trace的开关 |
rabbitmq-plugins disable rabbitmq_tracing | rabbitmq关闭Trace插件 |
rabbitmqctl set_user_tags heima administrator | 只有administrator的角色才能查看日志界面 |
安装插件并开启 trace_on 之后,会多个 exchange:amq.rabbitmq.trace ,类型为:topic,同时在admin面板下有Tracing入口。
12.8. 分布式事务
开启事务
@Transactional(rollbackFor = Exception.class)
可靠生产
为了保证数据一定送到MQ中。
在同一事务中,增加一个冗余表的记录订单数据每条数据和是否发送成功的状态。
利用RabbitMQ提供的publisher/comfirm机制开启确认机制后,如果消息正常发送到MQ就会取到回执信息,然后把状态修改为已发送的状态。
可靠消费
利用RabbitMQ的ACK机制,由消费者自身控制消息的重发、清除和丢弃。
考虑幂等性问题,因为定时重发会造成消息的重复发送,可以使用唯一主键,或者redis的分布式锁。
利用MQ提供的重试机制,或者try/catch+basicNack(tag, false, false) + 死信队列。
死信队列报错就人工处理。
13. 其他
RabbitMQ为什么是基于信道去处理而不是连接?
使用的是TCP的长连接,可以有多个信道。连接复用。一个线程可以对应一个信道,此举可以减少TCP连接的开销。
不存在没有交换机的队列。在没有指定交换机的情况下会使用默认的交换机。
供给/消费不存在的队列不会自动创建,都会报异常。
可以在程序运行时自动创建交换机、队列并且绑定二者关系。
// 5. 准备交换机、队列和绑定关系 String exchangeName = "direct-message-exchange"; String routeKey = "order"; String exchangeType = "direct"; // 声明交换机 channel.exchangeDeclare(exchangeName, exchangeType, true); // 声明队列 channel.queueDeclare("queue5", true, false, false, null); channel.queueDeclare("queue6", true, false, false, null); channel.queueDeclare("queue7", true, false, false, null); // 绑定队列 channel.queueBind("queue5", exchangeName, "order"); channel.queueBind("queue6", exchangeName, "order"); channel.queueBind("queue7", exchangeName, "course"); // 6. 发送消息 /** * @params1 交换机 * @params2 队列,路由key * @params3 消息的状态控制 * @params4 消息主题 */ channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
一般会手动设置应答,解决死锁问题。
autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。
durable:是否持久化,默认是false。持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效。
绑定关系时,最好在消费者绑定,因为消费者是最先启动的服务。如果启动时队列不存在,则可能造成异常。
queue队列在消费者创建还是生产者创建?
一般在rabbitmq的操作面板创建比较稳妥,或者谁先启动谁创建,如在消费者创建(如消费者服务先启动,反之亦然)。