- 浏览: 266052 次
- 性别:
- 来自: 杭州
最新评论
-
pjsong3101:
引用 public static void main ...
JVM中可生成的最大Thread数量 -
lzp459260276:
RabbitMQ源码分析 – 消息生命周期 -
huotianjun:
关于:为什么要每个操作写一个信息,而不是在publish时写一 ...
RabbitMQ源码分析 – 持久化机制 -
puyongjun_1989:
楼主你好, (256 * 1024) / 64 = 409 ...
JVM中可生成的最大Thread数量 -
chuqingq:
很不错,学习!
Erlang并发机制 –进程调度
(注:分析代码基于RabbitMQ 2.8.2)
当rabbit无法直接将消息投递到消费者时,需要暂时将消息入队列,以便重新投递。但是,将消息入队列并不意味着,就是将消息扔在一个队列中就不管了,rabbit提供了一套状态转换的机制来管理消息。
在rabbit中,队列中的消息可能会处理以下四种状态:
alpha:消息内容以及消息在队列中的位置(消息索引)都保存在内存中;
beta:消息内容只在磁盘上,消息索引只在内存中;
gamma:消息内容只在磁盘上,消息索引在内存和磁盘上都存在;
delta:消息的内容和索引都在磁盘上。
注意:对于持久化消息,消息内容和消息索引都必须先保存在磁盘上,然后才处于上述状态中的一种。其中gamma很少被用到,只有持久化的消息才会出现这种状态。
rabbit提供这种分类的主要作用是满足不同的内存和CPU需求。alpha最耗内存,但很少消耗CPU;delta基本不消耗内存,但是要消耗更多的CPU以及磁盘I/O操作(delta需要两次I/O操作,一次读索引,一次读消息内容;delta及gamma只需要一次I/O操作来读取消息内容)。
rabbit在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果内存中的消息数量大于这个数量,就会引起消息的状态转换,转换主要两种:alpha -> beta, beta -> delta。alpha -> beta的转换会将消息的内容写到磁盘(如果是持久化消息,在这一步转换后,消息将会处于gamma状态),beta -> delta的转换会更进一步减少内存消耗,将消息索引也写到磁盘。
运行时怎么体现消息的各种状态呢?在队列的状态vqstate(参见[$RABBIT_SRC/src/rabbit_variable_queue.erl])结构中,存在q1,q2,delta,q3,q4五个队列(使用q1~q4使用Erlang的queue模块,delta存储结构依赖于消息索引的实现),其中q1,q4只包含alpha状态的消息,q2,q3只包含beta和gamma状态的消息,delta包含delta状态的消息。
一般情况下消息会以p1->p2->delta->q3->q4的顺序进行状态变换:消息进入队列时,处于alpha状态并保存在内存中(q1或q4),然后某个时刻发现内存不足,被转换到beta状态(q2,q3)(这时候其实有两个转换q1->q2,q4->q3),如果还是内存不足,被转换到delta状态(delta)(q2->delta,q3->delta);当从队列中消费消息时,会先从处于alpha状态的内存队列(q4)中获取消息,如果q4为空,则从beta状态的队列(q3)中获取消息,如果q3也为空,则会从delta状态的消息队列中读取消息,并将之转移到q3。
上述步骤只是个一般步骤,实际运行时,中间的步骤都是可以跳过的,比如消息可能在一开始被放在q4队列中、q1队列中的元素会直接跳到q4队列中(这两种情况下,q3,delta,q2队列都为空);当delta队列为空时,q2队列可以直接跳到q3队列,q1队列也可以直接跳到q3。
上述这些状态转换主要发生的两个地方为:从队列中获取消息时([$RABBIT_SRC/src/rabbit_variable_queue.erl –> queue_out/1])以及减少内存使用量时([$RABBIT_SRC/src/rabbit_variable_queue.erl –> reduce_memery_use/1])。下面详细说明下这两个操作的逻辑。
$RABBIT_SRC/src/rabbit_variable_queue.erl -> queue_out/1 queue_out(State = #vqstate { q4 = Q4 }) -> case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of {empty, _State1} = Result -> Result; {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end; {{value, MsgStatus}, Q4a} -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> fetch_from_q3/1 fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4 }) -> case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; {{value, MsgStatus}, Q3a} -> State1 = State #vqstate { q3 = Q3a }, State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> true = ?QUEUE:is_empty(Q2), %% ASSERTION true = ?QUEUE:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; // A {true, false} -> maybe_deltas_to_betas(State1); // B {false, _} -> State1 end, {loaded, {MsgStatus, State2}} end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> maybe_deltas_to_betas/1 maybe_deltas_to_betas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, index_state = IndexState, pending_ack = PA, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, end_seq_id = DeltaSeqIdEnd } = Delta, DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, PA, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, case ?QUEUE:len(Q3a) of 0 -> maybe_deltas_to_betas( State1 #vqstate { delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); Q3aLen -> Q3b = ?QUEUE:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> State1 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, q3 = ?QUEUE:join(Q3b, Q2) }; // C N when N > 0 -> Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, end_seq_id = DeltaSeqIdEnd }), State1 #vqstate { delta = Delta1, q3 = Q3b } end end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> reduce_memory_use/1 reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun push_betas_to_deltas/2, fun limit_ram_acks/2, State), State1 reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, rates = #rates { avg_ingress = AvgIngress, avg_egress = AvgEgress }, ack_rates = #rates { avg_ingress = AvgAckIngress, avg_egress = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; S1 -> {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) end, {S1, State}, case (AvgAckIngress - AvgAckEgress) > (AvgIngress - AvgEgress) of true -> [AckFun, AlphaBetaFun]; false -> [AlphaBetaFun, AckFun] end), {true, State2} end, case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), permitted_beta_count(State1)) of ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; _ -> {Reduce, State1} end chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> 0; chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
$RABBIT_SRC/src/rabbit_variable_queue.erl -> push_alphas_to_betas/2 push_alphas_to_betas(Quota, State) -> {Quota1, State1} = push_alphas_to_betas( fun ?QUEUE:out/1, fun (MsgStatus, Q1a, State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } end, Quota, State #vqstate.q1, State), {Quota2, State2} = push_alphas_to_betas( fun ?QUEUE:out_r/1, fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } end, Quota1, State1 #vqstate.q4, State1), {Quota2, State2}.
发表评论
-
RabbitMQ源码分析 – 持久化机制
2012-08-21 09:34 14612(注:分析代码基于Rabb ... -
RabbitMQ源码分析 – 消息生命周期
2012-06-25 10:53 9890(注:分析代码基于Rabb ... -
RabbitMQ源码分析 – 实体初始化
2012-06-11 16:15 6241(注:分析代码基于RabbitMQ 2.8.2) Con ... -
RabbitMQ源码分析 – 网络层
2012-05-30 13:31 8457(注:分析代码基于Rabb ... -
RabbitMQ源码分析 - 启动
2012-05-24 19:41 8712RabbitMQ是一个消息队列的实现,基于AMQP(Ad ... -
Erlang热部署 – 模块更新
2012-05-21 10:55 5092Erlang的热部署做的很完善,参见Release Hand ... -
Erlang并发机制 – 垃圾回收
2012-05-02 10:34 3835Erlang中每个进程都有独立的堆内存,默认的大小是23 ... -
Erlang并发机制 – 消息传递
2012-05-02 10:24 7434Erlang系统中,进程之间的通信是通过消息传递来完成的 ... -
Erlang并发机制 – 任务迁移算法
2012-05-02 10:22 3052一般情况下,在SMP环 ... -
Erlang并发机制 –进程调度
2012-04-10 22:43 8910Erlang调度器主要完成对Erlang进程的调度,它是 ... -
Erlang并发机制 - 进程
2012-03-26 21:24 7753在了解Erlang的并发机制之前,我们先来看一下Erlang与 ... -
CentOS6下编译Erlang R15B with wxWidgets
2012-02-23 16:41 19263如果不需要安装wxWidgets的话,很简单,./con ... -
Tsung源码分析(五):Tsung数据统计
2012-02-22 19:59 2996上一篇说明Tsung的服务器监控机制的时候提到,收集到监 ... -
Tsung源码分析(四):Tsung服务器监控
2012-02-21 22:18 4423Tsung在进行压力测试同时,也可以监控服务器结点上的C ... -
Tsung源码分析(三):Tsung插件式协议支持
2012-02-17 16:15 5083在Websocket for Tsung一文中有提到如 ... -
Tsung源码分析(二):Tsung压力生成过程
2012-02-17 12:55 3654上一篇讲到ts_config_server:newbeams通 ... -
Tsung源码分析(一):Tsung启动过程
2012-02-17 12:51 6766一方面,分析Tsung的架 ... -
Websocket for Tsung
2012-02-11 22:11 5665这篇博文距离上次提到要写差不多快两个月了,一方面时间不多 ... -
Windows下vimerl的配置以及扩展
2011-12-11 22:03 3545最近开始学习Erlang,一方面出于对其主要语言特征(高 ...
相关推荐
RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...
RabbitMQ源码 rabbitmq-server-generic-unix-3.8.8.tar.xz
rabbitmq-server-3.10.5-1.el8.noarch.rpm
这里提供了rabbitmq-server-3.7.3.exe百度网盘下载,官网下载实在是太慢了,亲测有效! rabbitmq-server-3.7.3.exe rabbitmq-server-3.7.3.exe rabbitmq-server-3.7.3.exe
rabbitmq-server-3.4.1-1.noarch.rpm rabbitmq-server-3.4.1-1.noarch.rpm
linux rabbitmq安装包 rabbitmq-server-generic-unix-3.6.1.tar 实测
rabbitmq-server-3.8.8-1.el7.noarch
rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-...
下载好的包,因为下载太慢 rabbitmq-server-generic-unix-3.7.8.tar.xz
JAVA开发工具 rabbitmq-server-3.7.5.tar.xz,RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件
rabbitmq安装包,linux离线安装。后面会有教程及配置。先在windows解压再使用。软件名称:rabbitmq-server-generic-unix-3.5.7.tar.gz
最新版linux rabbitmq-server-generic-unix-3.9.1.tar.xz最新版linux rabbitmq-server-generic-unix-3.9.1.tar.xz
rabbitmq-server-3.7.10-1.el7.noarch.rpm包 配合erlang在RHEL Linux 7.x, CentOS 7.x, Fedora 19+ (supports systemd)等系统运行
rabbitmq-server-3.8.3.exe和erlang22.2.exe 2020年3月最新版本, 64位
rabbitmq-server-3.9.7-1.el7.noarch.rpm
rabbitmq-server-generic-unix-3.7.18.tar.xz版本服务器安装 有需要的可以去官网查看一下rabbitmq版本对erlang版本的一个支持情况,官网地址:http://www.rabbitmq.com/which-erlang.html
rabbitmq离线安装 - 语言库 erlang-21.2.6-1.el7.x86_64.rpm - 依赖 socat-1.7.3.2-2.el7.x86_64.rpm - rabbitmq 服务器 rabbitmq-server-3.7.13-1.el7.noarch.rpm
最新版linux rabbitmq-server-generic-unix-3.9.0.tar.xz最新版linux rabbitmq-server-generic-unix-3.9.0.tar.xz
rabbitmq-server-3.8.0-1.el6.noarch.rpm安装包下载 使用命令安装:yum -y install rabbitmq-server-3.6.6-1.el6.noarch.rpm 启动rabbitmq服务: 前台运行:rabbitmq-server start (用户关闭连接后,自动结束进程) ...
最新版linux rabbitmq-server-generic-unix-3.8.7.tar.xz