河南省建设监理协会官网,网站关键词推广优化,怎么做网络推广优化,wordpress命令安装目录结构SpringBoot整合RabbitMQ学习笔记 以下三种类型的消息#xff0c;生产者和消费者需各自启动一个服务#xff0c;模拟生产者服务发送消息#xff0c;消费者服务监听消息#xff0c;分布式开发。 一 Fanout类型信息 . RabbitMQ创建交换机和队列 在RabbitMQ控制台#xff0c;新…SpringBoot整合RabbitMQ学习笔记 以下三种类型的消息生产者和消费者需各自启动一个服务模拟生产者服务发送消息消费者服务监听消息分布式开发。 一 Fanout类型信息 . RabbitMQ创建交换机和队列 在RabbitMQ控制台新建交换机hmall.fanout新建两个队列fanout.queue1和fanout.queue2并将连个队列和交换机进行绑定即可。 操作如下图所示 一下操作可以通过代码实现具体参考配置类 1创建队列
2创建交换机 3绑定 2. 代码实现 1引入依赖
dependencygroupIdorg.springframework.book/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency2配置MQ配置信息
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.fanoutproducer:queue1: fanout.queue1
3声明队列和交换机配置类
Component
public class FanoutConfg{Value(${spring.rabbitmq.exchange})private String exchangeValue(${spring.rabbitmq.producer.queue1})private String queueName1// 声明fanout交换机Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(exchanage);}// 声明队列Beanpublic Queue fanoutQueue1(){return new Queue(queueName1);}//绑定队列和交换机Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder,build(fanoutQueue1).to(fanoutExchange);}
}4生产者
Component
public class RabbitMqProduce {Autowiredprivate RabbitTemplate rabbitTemplete;value(${spring.rabbitmq.producer.queue})private String queueName;/*** 入参说明* 第一个参数queueName队列名称* 第二个参数路由键fanout类型不需要路由键* 第三个参数msg 消息题内容*/public void send(String msg){rabbitTemplete.covertAndSend(queueName,null,msg);}}4消费者
Component
public class RabbitMqListener {RabbitListener(queues${spring.rabbitmq.producer.queue})public void counsume(String msg){System.out.pringln(消费者收到 fanout.queue队列发的消息,msg);}
}5测试类
SpringBootTest
public class SpringBootTest{AUtowiredprivate RabbitMqProduce producer;Testpublic void testSendFanoutMsg(){producer.send(fanout类型发送消息);}
}二 direct类型发送消息 控制台操作 1交换机和队列的创建参考fanout的操作 2绑定与fanout不同的是 给交换机绑定队列的同时需要指定路由键如下图所示 代码实现 1依赖引入参考fanout类型的消息 2mq消息配置
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.directproducer:queue1: direct.queue1queue2: direct.queue2routingKey1: redroutingKey2: red2
3MQ配置类 以下配置可以在消费者注解上实现
Component
public class RabbitMqConfig {Autowiredprivate RabbitTemplate rabbitTemplete;value(${spring.rabbitmq.producer.exchange})private String exchange;value(${spring.rabbitmq.producer.queue1})private String queueName1;value(${spring.rabbitmq.producer.queue2})private String queueName2;value(${spring.rabbitmq.producer.routingKey1})private String routingKey1;value(${spring.rabbitmq.producer.routingKey2})private String routingKey2;// 创建交换机Bean(directExchange)public Exchange getExchange(){return ExchangeBuilder.topicExchange(exchange) // 交换机类型交换机名称.durable(true) //ture为持久化存到磁盘false存到内存.build();}// 创建队列Bean(directQueue1)public Queue getDirectQueue1(){retuen new Queue(queueName1);}// 交换机绑定队列beanpublic Binging bindDirectQueue1(Qualifier(directExchange) Exchange exchange,Qualifier(directQueue1) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey1).noargs(); }// 创建队列Bean(directQueue2)public Queue getDirectQueue2(){retuen new Queue(queueName2);}// 交换机绑定队列beanpublic Binging bindDirectQueue2(Qualifier(directExchange) Exchange exchange,Qualifier(directQueue2) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey2).noargs(); }}4生产者发送消息
Component
public class RabbitMqProduce {Autowiredprivate RabbitTemplate rabbitTemplete;value(${spring.rabbitmq.producer.queue1})private String queueName1;value(${spring.rabbitmq.producer.queue2})private String queueName2;value(${spring.rabbitmq.producer.routingKey2})private String routingKey1;value(${spring.rabbitmq.producer.routingKey1})private String routingKey2;/*** 入参说明* 第一个参数queueName队列名称* 第二个参数路由键fanout类型不需要路由键* 第三个参数msg 消息题内容*/public void sendQueue1(String msg){rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);}public void sendQueue2(String msg){rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);}}5消费者监听消息 第一种已经编写了配置类
Component
public class RabbitMqListener {RabbitListener(queues${spring.rabbitmq.producer.queue1})public void counsume(String msg){System.out.pringln(消费者收到 direct.queue1队列发的消息,msg);}RabbitListener(queues${spring.rabbitmq.producer.queue2})public void counsume(String msg){System.out.pringln(消费者收到 direct.queue2队列发的消息,msg);}
}第二种在注解上配置交换机和队列以及路由键
Component
public class RabbitMqListener {RabbitListener(bindings QueueBinding(value Queue(name${spring.rabbitmq.producer.queue1},durabletrue),exchange Exchange(name${spring.rabbitmq.producer.exchange),typeExchangeType.DIRECT),key {${spring.rabbitmq.producer.routingKey1},${spring.rabbitmq.producer.routingKey2}} ))public void counsume(String msg){System.out.pringln(消费者收到 direct.queue1队列发的消息,msg);}RabbitListener(bindings QueueBinding(value Queue(name${spring.rabbitmq.producer.queue2},durabletrue),exchange Exchange(name${spring.rabbitmq.producer.exchange),typeExchangeType.DIRECT),key {${spring.rabbitmq.producer.routingKey1},${spring.rabbitmq.producer.routingKey2}} ))public void counsume(String msg){System.out.pringln(消费者收到 direct.queue2队列发的消息,msg);}
}6测试类
SpringBootTest
public class SpringBootTest{AUtowiredprivate RabbitMqProduce producer;Testpublic void testSendDirectMsg(){producer.send(direct类型发送消息);}
}三 Topic类型消息 控制台操作 参考前面的创建交换机队列以及绑定关系操作代码实现 1依赖引入参考fanout类型的消息 2mq消息配置 路由键使用通配符进行匹配,#代表多个*代表一个
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: hmall.topicproducer:queue1: topic.queue1queue2: topic.queue2routingKey1: china.#routingKey2: #.news
3MQ配置类
Component
public class RabbitMqConfig {Autowiredprivate RabbitTemplate rabbitTemplete;value(${spring.rabbitmq.producer.exchange})private String exchange;value(${spring.rabbitmq.producer.queue1})private String queueName1;value(${spring.rabbitmq.producer.queue2})private String queueName2;value(${spring.rabbitmq.producer.routingKey1})private String routingKey1;value(${spring.rabbitmq.producer.routingKey2})private String routingKey2;// 创建交换机Bean(topicExchange)public Exchange getExchange(){return ExchangeBuilder.topicExchange(exchange) // 交换机类型交换机名称.durable(true) //ture为持久化存到磁盘false存到内存.build();}// 创建队列Bean(topicQueue1)public Queue getDirectQueue1(){retuen new Queue(queueName1);}// 交换机绑定队列beanpublic Binging bindDirectQueue1(Qualifier(topicExchange) Exchange exchange,Qualifier(topicQueue1) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey1).noargs(); }// 创建队列Bean(topicQueue2)public Queue getDirectQueue2(){retuen new Queue(queueName2);}// 交换机绑定队列beanpublic Binging bindDirectQueue2(Qualifier(topicExchange) Exchange exchange,Qualifier(topicQueue2) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(routingKey2).noargs(); }}4生产者发送消息
Component
public class RabbitMqProduce {Autowiredprivate RabbitTemplate rabbitTemplete;value(${spring.rabbitmq.producer.queue1})private String queueName1;value(${spring.rabbitmq.producer.queue2})private String queueName2;value(${spring.rabbitmq.producer.routingKey2})private String routingKey1;value(${spring.rabbitmq.producer.routingKey1})private String routingKey2;/*** 入参说明* 第一个参数queueName队列名称* 第二个参数路由键fanout类型不需要路由键* 第三个参数msg 消息题内容*/public void sendQueue1(String msg){rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);}public void sendQueue2(String msg){rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);}}5消费者监听消息
Component
public class RabbitMqListener {RabbitListener(queues${spring.rabbitmq.producer.queue1})public void counsume(String msg){System.out.pringln(消费者收到 topic.queue1队列发的消息,msg);}RabbitListener(queues${spring.rabbitmq.producer.queue2})public void counsume(String msg){System.out.pringln(消费者收到 topic.queue2队列发的消息,msg);}
}6测试类
SpringBootTest
public class SpringBootTest{AUtowiredprivate RabbitMqProduce producer;Testpublic void testSendDirectMsg(){producer.send(direct类型发送消息);}
}四 消息转换器
MQ会把消息体变成字节码 解决办法使用消息转换器实现如下
在生产者和消费者两个服务引入依赖
dependencygroupIdcom.fasterxml.jackson/groupIdartifactIdjasckson-databind/artifactId
/dependency在生产者和消费者两个服务编写消息转换器配置
Component
public class JacksonMessageConvertor{Beanpublic MessageCoverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}
}消息体 对于生产者来说是map类型的则生成者接收的时候也是map类型 例如
Component
public class RabbitMqListener {RabbitListener(queues${spring.rabbitmq.producer.queue1})public void counsume(MapString,Objecct msg){System.out.pringln(消费者收到 topic.queue1队列发的消息,msg);}}五 案例演示
支付服务支付成功后通知交易服务进行后续操作 生产者和消费者两个服务都需要进行1,23步骤
添加依赖
!--mq依赖--
dependencygroupIdorg.springframework.book/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
!--消息转换器依赖--
dependencygroupIdcom.fasterxml.jackson/groupIdartifactIdjasckson-databind/artifactId
/dependency添加MQ配置信息
spring:rabbitmq:host: 192.168.150.101 #主机ipport: 5672 #端口virtual-host: /hmall #虚拟主机username: hmall #用户名password: 123 #密码exchange: pay.topicqueue: mark.order.pay.queueroutKingKey: pay.success
消息转换器配置类
Component
public class JacksonMessageConvertor{Beanpublic MessageCoverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}
}生产者 1生产者的配置
Component
public class Rabbitroducer {Autowiredprivate RabbitTemplate rabbitTemplete;value(${spring.rabbitmq.queue})private String queueName;value(${spring.rabbitmq.routingKey})private String routingKey;/*** 入参说明* 第一个参数queueName队列名称* 第二个参数路由键fanout类型不需要路由键* 第三个参数msg 消息题内容*/public void sendMsg(String msg){// 发送消息rabbitTemplete.covertAndSend(queueName,routingKey, msg);}
}2业务代码支付成功发送消息
public class payOrderServiceImpl impletement PayOrderService{Autowridprivate RabbitProducer payProducer;OveriridTransactional(rollback Exception.class)public void payOrder(PayOrderDto payOrder){// 一些列操作最终交易成功// 发送消息通知try{payProducer.send(payOrder.getId());}catch(AmqpException e){log.error(交易成功发送消息异常{},e.getMessages(););}}
}消费者
Component
public class PaySatusListener {Autowiredprivate OrderService orderService;RabbitListener(bindings QueueBinding(value Queue(name${spring.rabbitmq.queue},durabletrue),exchange Exchange(name${spring.rabbitmq.exchange),typeExchangeType.TOPIC),key {${spring.rabbitmq.routingKey}} ))public void listenOrderPay(Long orderId){//标记订单为已支付orderService.markOrderPaySuccess(orderId);}}