做网赌网站得多少钱,校园论坛网站怎么做,东莞网站开发方案,中卫网红大型蹦床设备消息分发
概念
RabbitMQ队列拥有多个消费者时#xff0c;队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展#xff0c;如果现在负载加重#xff0c;那么只需要创建更多的消费者来消费处理消息即可。
默…消息分发
概念
RabbitMQ队列拥有多个消费者时队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展如果现在负载加重那么只需要创建更多的消费者来消费处理消息即可。
默认情况下RabbitMQ是以轮询的方法进行分发的而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下如果某些消费者消费速度慢而某些消费者消费速度快就可能导致某些消费者消息积压某些消费者空闲进而应用整体的吞吐量下降。
在工作模式一文中书写RPC模式的代码时已经写了一行代码channel.basicQos(1)来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。
比如消费端调用了channel.basicQos(5)RabbitMQ就会为该消费者计数发送一条消息计数加一消费一条消息计数减一。当到达了设定的上限之后RabbitMQ就不会再向该消费者发送消息了知道消费者确认了某条消息之后才会继续发送。 当channel.basicQos(int prefetchCount)中的形参个数为0时表示的是没有上限。 应用场景
限流非公平分发负载均衡
限流
在学习消息分发之前当消息到达队列之后如果有对应的消费者存在那么队列就会一股脑把所有消息全部发送过去从而造成瞬间压力进而可能造成服务宕机产生严重的影响。因此我们就要进行限流限制消费者接收消息的数量。
限流通过设置prefetchCount参数同时也必须要设置消息应答方式为手动应答。
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制为手动确认prefetch: 5 # 最多拉取5条消息
Configuration
public class QosConfig {Bean(qosQueue)public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}Bean(qosExchange)public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();}Bean(qosQueueBind)public Binding qosQueueBind(Qualifier(qosQueue) Queue queue, Qualifier(qosExchange) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(qos).noargs();}}
RestController
RequestMapping(/qos)
public class QosController {Resourceprivate RabbitTemplate rabbitTemplate;RequestMappingpublic void qosQueue() {for (int i 0; i 10; i) {this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, qos, hello qos i);System.out.println(第 i 次发送消息成功!);}}}
Configuration
public class QosListener {RabbitListener(queues Constants.QOS_QUEUE)public void qosListener(String msg, Channel channel, Message message) throws IOException {System.out.println(接收的消息为 msg);// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} 启动程序之后可以看到出现如上结果明显看到我们发送了10条信息但是由于限流的原因当消费者接收了5条消息之后并且没有去应答因此程序就不再继续接收消息而是等待这5条消息应答之后才会去继续接收消息。
负载均衡
在有两个消费者的情况下一个消费者处理任务非常快一个消费者处理任务非常慢就会造成一个消费者会一直很忙而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息他不考虑消费者未确认消息的数量。我们可以使用prefetch1的方式来进行设置告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前都不向其发送新的消息。这样做就可以使得有消息时所有消费者都处理忙碌的状态。
实现负载均衡功能的代码和实现限流的代码类似只需要将配置文件中的prefetch修改为1即可。
事务
RabbitMQ也实现了事务机制允许开发者确保消息的接收和发送是原子性的要么全部成功要把全部失败。
Component
public class RabbitTemplateConfig {Bean(transactionRabbitTemplate)public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); // 开启事务return rabbitTemplate;}}
Configuration
public class TransactionConfig {Bean(transactionQueue)public Queue transactionQueue() {return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();}Bean(transactionExchange)public Exchange transactionExchange() {return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();}Bean(transactionQueueBind)public Binding transactionQueueBind(Qualifier(transactionQueue) Queue queue,Qualifier(transactionExchange) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(transaction).noargs();}}
RestController
RequestMapping(/transaction)
public class TransactionController {Resource(name transactionRabbitTemplate)private RabbitTemplate rabbitTemplate;TransactionalRequestMappingpublic void transactionQueue() {System.out.println(发送成功);this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, transaction, hello transaction);int i 1 / 0;this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, transaction, hello transaction);}}
RabbitMQ和Redis中的事务相对来说都是比较简单的并不和MySQL包含那么多的性质。因此在对事务的介绍中并没有大幅度进行介绍。