RabbitMQ相关总结
Broker
异步调用中用Broker进行事件订阅和调用,完成解耦
(图片来源网络,侵删)
没有强依赖,不用担心级联失败
流量削峰
MQ 的下载
1.可以使用命令拉取镜像
docker pull rabbitmq:3-management
2.也可以直接去官网下载tar包,然后上传到虚拟机上面
spring AMQP 消息队列
Basic Queue 简单队列模型
只需要简单的引入amqp依赖,
(图片来源网络,侵删)
org.springframework.boot spring-boot-starter-amqp
然后配置接收和发送端的地址
spring: rabbitmq: host: 192.168.xxx.xxx # 自己主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: xxxxxx # 用户名 password: xxxxxx # 密码
然后调用方法发送或结合搜消息即可
发送:
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } }
接收:
package cn.itcast.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } }
当然了,如果不是在父工程里面配置的依赖则需要在单个项目里面单独配置
Work Queue 一队列,多消费者
假设编辑五百条消息:
(图片来源网络,侵删)
/** * workQueue * 向队列中不停发送消息,模拟消息堆积。 */ @Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i定义两个接受者用不同效率接收:
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }然后可以看出接受者接收消息并未在预定时间被消费,原因是被队列平均分配了,只要重新定制规则即可:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息交换机
Fanout
创建FanoutExchange交换机和Queue队列,然后交换机和队列绑定:
package cn.itcast.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }绑定完成后就可以写消费者和生产者代码了
生产者
@Test public void testFanoutExchange() { // 队列名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }消费者
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }交换机的作用是什么?
* 接收publisher发送的消息
* 将消息按照规则路由到与之绑定的队列
* 不能缓存消息,路由失败,消息丢失
* FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
* Queue
* FanoutExchange
* Binding
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。
还没有评论,来说两句吧...