- 浏览: 266081 次
- 性别:
- 来自: 杭州
最新评论
-
pjsong3101:
引用 public static void main ...
JVM中可生成的最大Thread数量 -
lzp459260276:
RabbitMQ源码分析 – 消息生命周期 -
huotianjun:
关于:为什么要每个操作写一个信息,而不是在publish时写一 ...
RabbitMQ源码分析 – 持久化机制 -
puyongjun_1989:
楼主你好, (256 * 1024) / 64 = 409 ...
JVM中可生成的最大Thread数量 -
chuqingq:
很不错,学习!
Erlang并发机制 –进程调度
(注:分析代码基于RabbitMQ 2.8.2)
网络层的启动也是作为上一篇文章中提到的一个启动步骤来启动的,入口为[$RABBIT_SRC/src/rabbit_networking.erl --> boot/0],代码如下:
boot() -> ok = start(), ok = boot_tcp(), ok = boot_ssl().
[$RABBIT_SRC/src/rabbit_networking.erl --> start/0]构建rabbit_client_sup子监控树,[$RABBIT_SRC/src/rabbit_networking.erl --> boot_tcp/0],启动TCP监听,并构建成tcp_listener_sup子监控树,这两个子树都属于rabbit最顶层监控结点rabbit_sup(SSL的类似)。最终的监控树如下:
(注:方框代表supervisor类型,圆角代表worker;虚线灰底的文字代表相应层子进程重启策略;各结点以各自所在的模块命名,实际监控树中的结点名有部分不同。)
[$RABBIT_SRC/src/tcp_listener.erl --> init/1]通过gen_tcp:listen/2启动监听,并根据ConcurrentAcceptorCount参数启动指定数量个tcp_acceptor(当前这个数量是1)。
[$RABBIT_SRC/src/tcp_acceptor.erl]通过prim_inet:async_accept/2开始异步接受来自客户端的连接,主要代码如下:
handle_cast(accept, State) -> ok = file_handle_cache:obtain(), accept(State); handle_info({inet_async, LSock, Ref, {ok, Sock}}, State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) -> %% patch up the socket so it looks like one we got from %% gen_tcp:accept/1 {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), %% handle file_handle_cache:transfer(apply(M, F, A ++ [Sock])), ok = file_handle_cache:obtain(), %% accept more accept(State); accept(State = #state{sock=LSock}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} end.
[$RABBIT_SRC/src/ file_handle_cache.erl]在这里的主要作用是管理每个进程拥有的文件描述符数量,确保每个进程不会超过设定的上限。其中,obtain/0方法增加拥有的文件描述符数量,transfer/1将文件描述符的拥有权转移到另外一个进程(原进程拥有的打开描述符数量减1,转移目标进程拥有的文件描述符数量加1,这里的目标进程是通过callback返回的rabbit_reader进程)。
state结构中的callback是由[$RABBIT_SRC/src/rabbit_networking.erl]在启动TCP监听端口时指定的,其值为[$RABBIT_SRC/src/rabbit_networking.erl --> start_client/1],每个客户端连接上来后,会通过apply方法调用(见上面代码中的apply(M, F, A)),实际调用[$RABBIT_SRC/src/rabbit_networking.erl --> start_client/2],主要代码如下:
start_client(Sock, SockTransform) -> {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Reader), Reader ! {go, Sock, SockTransform}, Reader.
(这里rabbit_tcp_client_sup对应上图中rabbit_client_sup结点)
supervisor:start_child(rabbit_tcp_client_sup, [])会按照上图中rabbit_client_sup下的监控树结构依次构造各个子结点。也就是说在每一次服务端接受一个新的连接时,都会创建一个rabbit_client_sup的子进程(simple_one_for_one),并依次构建各个子结点(上图中黄色区域)。并最终返回创建的rabbit_reader。
rabbit_net:controlling_process/2将上一步返回的rabbit_reader进程作为接收socket消息的进程(调用gen_tcp:controlling_process/2实现,原始接受消息的是tcp_acceptor进程)。
Reader ! {go, Sock, SockTransform}向目标rabbit_reader进程发送消息,指示开始接受从客户端发送来的数据。
rabbit_reader进程在创建后,会通过如下代码等待上述的go消息:
receive {go, Sock, SockTransform} -> start_connection( Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end
(参见[$RABBIT_SRC/src/rabbit_reader.erl --> init/4])
收到go消息后,rabbit_reader开始与客户端进行消息交互。(参见[$RABBIT_SRC/src/rabbit_reader.erl --> start_connection /7])主要代码如下:
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), ConnStr = name(Sock), log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), State = #v1{parent = Parent,…}, // 初始化state,部分代码省略 try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), handshake, 8)), log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr]) catch // 异常处理 after rabbit_net:maybe_fast_close(ClientSock), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done.
从上面的log语句可以看出,这时候与客户端的连接已经确定,要开始AMQP的握手过程。接收消息的主循环由recvloop/2来完成,其中第二个参数State保存当前连接的各种状态。recvloop按照AMQP协议对数据进行解析。
recvloop并没有使用任何一个OTP的行为模式,而是自己设计的一种状态转换的逻辑,该逻辑的实现依赖于AMQP协议本身,其中关键的数据是State里的callback以及recv_len。AMQP协议要求,客户端建立一个连接时,必需向服务端发送8个字节的协议头(包含AMQP4个字符以及客户端使用的AMQP版本),所以rabbit在一开始的时候,设定的recv_len=8,也就是说rabbit希望从客户端收到8个字节的数据,然后对这8个字节的处理对应handshake处理逻辑(见上面代码的recvloop/2调用)。handshake完成后,rabbit会根据AMQP协议定义的frame格式等待客户端发送来的数据:先是7个字符的frame头(recv_len=7, callback=frame_header),然后是长度声明在头中的负载数据(recv_len=PayloadSize+1,callback=frame_payload,每个frame都有个结束符,所以这里负载长度要加1),收到负载数据后,会根据负载数据做对应处理,处理完成后,继续待续下一个frame。所以整个协议解析的过程就如下图所示:
rabbit会根据客户端发送的请求进行相应的操作,比如创建channel,创建exchange,创建queue等等。下篇会分析在创建每个实体时,都会做哪些工作。
发表评论
-
RabbitMQ源码分析 – 持久化机制
2012-08-21 09:34 14614(注:分析代码基于Rabb ... -
RabbitMQ源码分析 - 队列机制
2012-07-09 10:10 13179(注:分析代码基于RabbitMQ 2.8.2) 当r ... -
RabbitMQ源码分析 – 消息生命周期
2012-06-25 10:53 9891(注:分析代码基于Rabb ... -
RabbitMQ源码分析 – 实体初始化
2012-06-11 16:15 6241(注:分析代码基于RabbitMQ 2.8.2) Con ... -
RabbitMQ源码分析 - 启动
2012-05-24 19:41 8715RabbitMQ是一个消息队列的实现,基于AMQP(Ad ... -
Erlang热部署 – 模块更新
2012-05-21 10:55 5093Erlang的热部署做的很完善,参见Release Hand ... -
Erlang并发机制 – 垃圾回收
2012-05-02 10:34 3835Erlang中每个进程都有独立的堆内存,默认的大小是23 ... -
Erlang并发机制 – 消息传递
2012-05-02 10:24 7437Erlang系统中,进程之间的通信是通过消息传递来完成的 ... -
Erlang并发机制 – 任务迁移算法
2012-05-02 10:22 3054一般情况下,在SMP环 ... -
Erlang并发机制 –进程调度
2012-04-10 22:43 8910Erlang调度器主要完成对Erlang进程的调度,它是 ... -
Erlang并发机制 - 进程
2012-03-26 21:24 7755在了解Erlang的并发机制之前,我们先来看一下Erlang与 ... -
CentOS6下编译Erlang R15B with wxWidgets
2012-02-23 16:41 19265如果不需要安装wxWidgets的话,很简单,./con ... -
Tsung源码分析(五):Tsung数据统计
2012-02-22 19:59 2996上一篇说明Tsung的服务器监控机制的时候提到,收集到监 ... -
Tsung源码分析(四):Tsung服务器监控
2012-02-21 22:18 4423Tsung在进行压力测试同时,也可以监控服务器结点上的C ... -
Tsung源码分析(三):Tsung插件式协议支持
2012-02-17 16:15 5083在Websocket for Tsung一文中有提到如 ... -
Tsung源码分析(二):Tsung压力生成过程
2012-02-17 12:55 3654上一篇讲到ts_config_server:newbeams通 ... -
Tsung源码分析(一):Tsung启动过程
2012-02-17 12:51 6767一方面,分析Tsung的架 ... -
Websocket for Tsung
2012-02-11 22:11 5665这篇博文距离上次提到要写差不多快两个月了,一方面时间不多 ... -
Windows下vimerl的配置以及扩展
2011-12-11 22:03 3545最近开始学习Erlang,一方面出于对其主要语言特征(高 ...
相关推荐
RabbitMQ源码和客户端工具RabbitMQ源码和客户端工具
springboot,rabbitmq,maven,postman
NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...
NET Core 使用RabbitMQ
NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...
rabbitmq 数据落地 与本地文件上传到mq
Java rabbitMQ源码
RabbitMQ的五种模式源码+纯手工打的代码
Jmeter-RabbitMQ源码
此资源是消息中间件RabbitMQ的源码安装包,只适用于linux操作系统。
Rabbitmq c++ 封装 全部源码 自己测试过几天几夜,性能很好。 包含内容:布消息 消费消息 读取消息 取消息队列属性 文件列表: rabbitmq cpp rabbitmq h ampq h ampq cpp 具体平台的dll请大家下载后自己编译,源码...
RabbitMQ消息中间件技术精讲(包含源码等) RabbitMQ消息中间件技术精讲(包含源码等) RabbitMQ消息中间件技术精讲(包含源码等)
ARM64架构,银河麒麟linux系统,RabbitMQ的所有离线安装包
rabbitmq 3.11.13 + erlang 25.3 + 两个配置文件( rabbitmq.conf和advanced.config ) 适用于linux 解压即用,没有密码
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,AMQP就是一个协议,是一个高级抽象层消息通信协议。 虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),...
SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材 可以参考博客: https://blog.csdn.net/qq_29914837/article/details/93376741
在Redhat Linux上编译RabbitMQ,过程详见:http://blog.csdn.net/fm0517/article/details/77244781
rabbitmq生产者和消费者demo,主题交换机
2.0版本在《rabbitmq c++ 封装源码》陈梵作者的基础之上扩展了C++接口,更方便使用,但不保证数据不会丢失。
C# rabbitmq项目实战源码,在网上找了大量的MQ资料用C#语言开发的各种场景示例,从路由及列队的配置,到场景代码的开发,使用场景基本上都是通过生产者与消费者,发布订阅模式的示例,程序使用WindowForm开发的重要...