(注:分析代码基于RabbitMQ 2.8.2)
Connection
在上篇文章中提到,客户端连上rabbit后,需要向rabbit发送AMQP协议头,rabbit在收到协议头后,开始在0号channel 上跟客户端进行交互(AMQP中一个连接可以多路复用,1~65535为可用的channel编号,0号channel,也就是frame中channel的索引为0,被认为是全局于整个连接)。
根据AMQP协议,经过connection.start -> connection.start_ok -> connection.secure -> connection.secure_ok -> connection.tune -> connection.tune_ok(这时rabbit会建立一个心跳进程)-> connection.open -> connection.open_ok后,客户端与rabbit之间就认为已经建立了连接(相关代码参见[$RABBIT_SRC/src/rabbit_reader.erl --> handle_method0/2])。
(粗体指令由rabbit服务器发出)
Channel
当客户端发来的frame中,channel的索引不为0时,rabbit认为这些数据从属于某个channel。如果该channel进程不存在,则会创建一个channel进程(rabbit_channel,具体参见[$RABBIT_SRC/src/rabbit_reader.erl --> create_channel/2),并由此进程负责该channel上的所有数据([$RABBIT_SRC/src/rabbit_reader.erl --> process_channel_frame/3)。
根据AMQP协议,经过channel.open -> channel.open_ok([$RABBIT_SRC/src/rabbit_channel.erl --> handle_method/3)后,客户端就可以开始在该channel上发送数据了。
Exchange
当rabbit收到来自客户端的exchange.declare指令时,rabbit会根据客户端的参数创建一个exchange。首先rabbit会向mnesia的表rabbit_exchange写入一条记录,包含客户端请求的exchange类型信息(默认4种类型:direct,topic,fanout,head)及相关参数,如果exchange是需要持久化的(durable),则还需要向rabbit_durable_exchange表中写入相同信息。然后,rabbit会通过rabbit_event发送exchange_created的事件(统计作用)。
Queue
queue在创建时,需要确定要创建queue的类型(rabbit里称为backing_queue):一般情况下,queue只在当前结点(客户端所连接的结点)创建,对应backing_queue为rabbit_variable_queue;当有
HA策略时,queue需要在集群中的多个结点上创建(这时候,有master结点和slave结点之分,master结点未必是当前结点),master结点创建队列对应backing_queue为rabbit_mirror_queue_master,slave结点对应backing_queue为rabbit_mirror_queue_slave(master,slave实际最终也会创建一个rabbit_variable_queue)。
创建一个队列首先会创建一个rabbit_amqqueue_process进程。然后同exchange类似,都需要先在mnesia表里写入queue的基本信息(rabbit_queue,rabbit_durable_queue)。然后初始化对应的backing_queue,最后发送queue_create事件。
我们来看一下backing_queue为rabbit_variable_queue时,初始化需要做什么:1)初始化queue的索引(rabbit_queue_index:init/2)或者从以前的队列恢复索引(rabbit_queue_index:recover/5,durable队列);2)创建message store或者恢复message store(rabbit_msg_store:client_init/4,恢复仅对于durable队列)。
rabbit_mirror_queue_master初始化:1)创建一个rabbit_mirror_queue_coordinator及相应GM(Guaranteed Multicast);2)获取该队列相关的镜像节点,并调用rabbit_mirror_queue_misc:add_mirror/2启动镜像队列进程;3)在当前结点(或者master结点)初始化一个rabbit_variable_queue队列;4)通过GM向所有镜像广播3中初始化队列的长度。
rabbit_mirror_queue_misc:add_mirror/2启动镜像队列进程时,启动的是一个rabbit_mirror_queue_slave进程,相比rabbit_variable_queue,它只是多了个初始化GM的工作。最后也会初始化一个rabbit_variable_queue队列。
Binding
用于将queue绑定到一个exchange。主要涉及到几个数据表写入(见下表,true或者false代表相应对象是不是durable),无其它复杂逻辑,写入完成后会发送binding_created事件。
exchage queue table
true true rabbit_durable_route
rabbit_semi_durable_route
rabbit_route
rabbit_reverse_route
false true rabbit_semi_durable_route
rabbit_route
rabbit_reverse_route
true/false false rabbit_route
rabbit_reverse_route
topic类型的exchange还需要将binding信息写入以下数据表:rabbit_topic_trie_edge,rabbit_topic_trie_binding,rabbit_topic_trie_node(基于
trie数据结构,用于route key的匹配)。
(本文只是个流程上的梳理,文中的一些概念及作用后续会再详细分析)
分享到:
相关推荐
RabbitMQ源码和客户端工具RabbitMQ源码和客户端工具
springboot,rabbitmq,maven,postman
NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...
NET Core 使用RabbitMQ
NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...
rabbitmq 数据落地 与本地文件上传到mq
Java rabbitMQ源码
RabbitMQ的五种模式源码+纯手工打的代码
主要介绍了springboot实现rabbitmq的队列初始化和绑定,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
此资源是消息中间件RabbitMQ的源码安装包,只适用于linux操作系统。
Jmeter-RabbitMQ源码
RabbitMQ消息中间件技术精讲(包含源码等) RabbitMQ消息中间件技术精讲(包含源码等) RabbitMQ消息中间件技术精讲(包含源码等)
Rabbitmq c++ 封装 全部源码 自己测试过几天几夜,性能很好。 包含内容:布消息 消费消息 读取消息 取消息队列属性 文件列表: rabbitmq cpp rabbitmq h ampq h ampq cpp 具体平台的dll请大家下载后自己编译,源码...
ARM64架构,银河麒麟linux系统,RabbitMQ的所有离线安装包
rabbitmq 3.11.13 + erlang 25.3 + 两个配置文件( rabbitmq.conf和advanced.config ) 适用于linux 解压即用,没有密码
SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材 可以参考博客: https://blog.csdn.net/qq_29914837/article/details/93376741
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,AMQP就是一个协议,是一个高级抽象层消息通信协议。 虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),...
在Redhat Linux上编译RabbitMQ,过程详见:http://blog.csdn.net/fm0517/article/details/77244781
rabbitmq生产者和消费者demo,主题交换机
2.0版本在《rabbitmq c++ 封装源码》陈梵作者的基础之上扩展了C++接口,更方便使用,但不保证数据不会丢失。