RabbitMQ
# 初识MQ
# 同步通讯和异步通讯
# 同步调用的问题
每加一次需求代码都需要修改
再加上是同步调用 用户必须等待订单服务完成才能执行仓储服务执行完才能执行短信服务 总耗时500ms很恐怖 性能太差
在整个过程中有很多资源的浪费 要是卡住在仓储服务支付服务就渐渐被耗尽了
总结:
# 异步调用方案
优势一:服务解耦
现在加需求就不需要再修改支付服务里面的代码 例如添加了优惠卷服务 因为我们呢现在支付服务不负责调用服务了 而是发消息 那么后面添加服务就只需要订阅事件就好了
解除服务的话我们也就取消订阅就ok了
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量消峰
总结:
# 我的理解
传统的微服务调用用的是openFeign之类的通信手段,这个实际上在不对代码做处理的时候是同步的。
但是我们的消息队列是可以声明“监听器”,监听到消息队列中有消息就执行业务逻辑的。这个实际上就是一种响应式的架构。不需要我们主动的去通信等待另一个微服务执行完毕,而是发个消息给消息队列,而微服务自己监听到消息就执行逻辑。
这种响应式的架构实际上就是异步的,非阻塞的。快速的。
# 什么是MQ
# RabbitMQ概述
一般来说我创建一个用户他有自己的虚拟主机 然后我再创建一个用户他还是有一个虚拟主机 虚拟主机之间是相互隔离的互不干扰
总结:
# 常见消息模型
要向mq发送消息必须要和它先建立连接 用到连接工厂
这边有itcast的虚拟主机的访问权
创建通道
有了通道可以向队列里面发送消息了
那么我们需要的是创建一个队列
生产者向队列里面发送消息
这个时候我们的发送者已经任务结束了 连接都断开了, 这样就解除耦合了 变成异步了
接收消息的是Consumer
同样的是先创建连接
创建通道 创建队列(这里创建的原因是因为 我们的生产者和消费者启动的顺序是不确定的 万一消费者先启动找不到队列 )
然后就是接收消息了 里面的匿名内部类的对象的方法是我们的处理操作(handleDelivery) 我们把这个行为给挂在了队列上 一旦有消息它就会进行操作处理 最后一个参数byte[] body就是消息体
这是一个异步的机制,这行代码(接收到消息)在等待接收消息后面打印 该处理消息处理消息 我还是继续去执行我后面的业务去了
总结
# SpringAMQP(大大简化api)
大大的简化我们的api
AMQP: 是消息队列的规范
SpringAMQP:是Spring对AMQP的实现(就像是redis里面spring提供的模板)
官网讲的特征:
SpringAMQP实现HelloWorld种的基础消息队列功能
流程如下:
总结
子啊Consumer中编写消费逻辑,监听simple.queue 接收的是String类型的消息 将来都是Spring帮助我们完成任务 十分的优雅
总结
# WorkQueue 工作队列(模型)
队列后面跟了两个消费者,消息将来给谁?
rabbitmq中消息是阅后即焚
消息一旦让消费者1消费了消费者2就肯定是拿不到了。
两个消费者实际上是合作关系,共同处理。
假设只有一个消费者,它的处理速度是:40条消息/s,但是发布者却每s发布了50条。这个就暂时是搞不定了,每s有10条消息多出来没人处理,就只能堆积在队列当中,队列在内存中是有存储上限的,这样下去一定会把队列给堆满,这样再有消息就进不去了。如果进不去,消息就会被丢弃,这样就出问题了。两个消费者,每s都处理40条消息的话之前的情况就是可以轻松应对了。
所以WorkQueue实际上就还是普通的消息队列,只是挂了两个消费者。可以提高消息处理速度,避免队列消息堆积。
消费者1消费的快,消费者2消费的慢,就少消费一点,毕竟能者多劳,但是我们现在是平均分布。这个并不是我们想要的。
这个就是我们的消息预取机制。什么叫做消息预取?当我们大量的消息到达队列的时候队列会将消息进行投递,consumer1和consumer2的通道(channel)会把消息先拿过来,这个叫做消息预取。管他能不能处理,先拿过来再说,
这下就两边各自处理了50条消息了,于是两个人就平均分配了所有的消息,每个人25条,但是消费者1处理的快,很快搞定了,消费者2处理的慢,就花了很长时间才搞定
消费预取限制:
我们这里就是预取1条消息,消费完成以后再去拿,不至于一下预取一大堆结果处理不完。
这个就起到了一个能者多劳的效果了。
最后我们做一个总结:
这个模式可以提高整个队列的速度。
这里我们发现在代码中创建队列或者交换机之类的也是非常简单(包括这些绑定交换机到队列之类的操作,都是可以在一个config类中解决的,只要将amqp包中的对象给注册到SpringIOC容器中就好了)
@Configuration
public class FanoutConfig {
// itcast.fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
// fanout.queue1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
// 绑定队列1到交换机
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
// fanout.queue2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
// 绑定队列2到交换机
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
@Bean
public Queue objectQueue(){
return new Queue("object.queue");
}
@Bean
public Queue simpleQueue(){
return new Queue("simple.queue");
}
}
# 发布订阅模型介绍
刚才我们的消息都是阅后即焚的(一旦消费完就会从队列中删除),这个就无法达到给多个消费者消费了。
这个就无法满足我们课程开始时提出的需求:当用户支付完成了,你得去通知订单服务,仓储服务,短信服务。让这些服务各自去完成自己的业务。
现在的消息要被3个服务都收到。
那么就需要用到发布订阅模型了。实现方式实际上就是加上了一个交换机(exchange)。这个模型和计算机网络实际上是一样的。
这个模型我们不关心消费者怎么绑定。我们关心的是消息如何从发布者到达队列。
消息发布到交换机,然后交换机将消息转发到队列当中。消息发布者不需要知道投递到队列的细节,这些都是交换机决定的,交换机转发消息给多个队列,这个就实现了我们的一份消息多个消费者消费的需求了!
到底交换机是发送给一个还是多个呢?
exchange只负责转发,不管消息是否丢失,只有队列是存消息的。
# 发布订阅-Fanout Exchange
SpringAMQP提供了声明交换机,队列,绑定关系的API,例如:
连绑定都是需要声明的,利用BindingBuilder这个提供给我们的工厂。(绑定它(队列)到它(交换机))
这里我们是通过声明Bean的方式去写的。
将来它读取到这些Bean以后就会帮我们,向rabbitmq去声明队列,交换机,绑定关系。全部都由Spring帮我们去做。
交换机点到这个里面来,看到这个bindings
队列就已经绑定上了。
这里以前是发送到队列,现在是发送到交换机。
然后发送消息,两个队列都收到了,这个就是fanout exchange
总结:
# 发布订阅-DirectExchange
会将消息根据路由规则路由到指定的Queue
这个也就被称为路由模式(routes)
我们现在两个队列都有自己的bindingkey了,将来发布者发送消息的时候也要指定一个RoutingKey
这个时候Exchange将消息路由到队列的时候要比对bindingkey了。
这里要是我们发送的消息是bindingkey是red的话就和我们之前的fanoutexchange一样了,就都是广播了。
所以我们可以用DirectExchange来模拟FanoutExchange,它比FanoutExchange更加灵活。这种灵活性也是有代价的,不要忘了在消息中加上bindingkey。
这里我们不再使用Bean的方式来声明,毕竟需要声明这么多东西。
接下来就是基于@RabbitListener注解来声明这些组件。
之前我们写的所有消费者当中都有这个注解。但是这个注解上可以同时完成队列和交换机的声明。不用再创建Bean了。
(消费者实际上才是在代码中指定组件逻辑的,发布者只需要知道往哪个交换机or队列中发送消息就ok了)
这里我们原来在主键中写的是queues=...
而这里我们是写bindings=@QueueBinding(...)
这样我们就完成了绑定的声明了,并且还声明了bindingkeys。
同样的代码copy一份来声明queue2到交换机的绑定。key为red,yellow
这里多了一个exchange叫做itcast.direct,然后进去看到queue1绑定了什么,queue2绑定了什么key。
接着是消息发送的方法。
这下就blue就收到了。
# 发布订阅-TopicExchange
这个模式和DirectExchange很像,但是routingkey必须是多个单词的列表,并且用.分割
左下角我们看到这个routingkey的逻辑实际上和话题的非常像
那么我们去订阅的话,比如我想关系下新闻,中国的,日本的,那么我们得绑定两个key(在Direct中)。
而在TopicExchange中我们支持通配符
于是我们只要写china.#就可以收到所有的中国相关的话题。而我们写#.news就可以收到所有和新闻相关的话题。
这个实际上就是简化了bindingkey的写法,原来用多个key的,现在只需要用一个key就可以了。
这下就写完成了
itcast.exchange
这里我们就可以测试这里的服务了。
这个变化也没有很大
总结:
这个bindingkey可以支持通配符。然后routingkey需要用.来分割单词列表。
# 消息转换器
我们在使用rabbitTemplate的时候消息这边接收的类一直是Object,这个说明我们可以发送任意的对象。
先在config里面声明一个队列
然后我们测试发送一个对象
@Test
public void testSendObjectQueue(){
Map<String,Object> msg=new HashMap<>();
msg.put("name","pixel-revolve");
msg.put("age",20);
rabbitTemplate.convertAndSend("object.queue",msg);
}
去到rabbitmq里面,我们看到消息,还有消息类型是java序列化类型。
它只支持字节。这种原生的序列化方式并不是那么好,性能差,安全性有问题,数据长度太长了,消息体越大,传输的效率也就越慢,而且还占用额外的内存空间。所以我们非常不推荐这种方式。
Spring的消息对象处理使用MessageConverter来处理的。
我们使用jackson的方式来实现是比较喜欢的。
同样的我们在consumer中引入jackson的依赖,并且定义一样的MessageConverter
这里我们就接收到了,这个方式就是比较方便的
总结: