`
jzhihui
  • 浏览: 266047 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Erlang并发机制 - 进程

阅读更多

在了解Erlang的并发机制之前,我们先来看一下ErlangJava的并发性能对比,一个是并发单元的创建时间,一个是并发单元之间的消息通讯时间(纵坐标代表时间,横坐标代表并发数量):

   

(测试程序及说明见这里,原测试时间比较早了,于是在自己的虚拟机上重新跑了下(CenterOS 63G内存);JVM生成线程数量控制,可见这里,)

 

         从上面的测试结果来看,Erlang的并发效率要比Java好很多,不在一个数量级上,ErlangJava1000倍左右(这不能说明在所有场景下Erlang的并发性能都优于Java,比如消息传递方面,消息的大小可能对这个结果也有影响)。那么Erlang的并发机制相比Java到底好在哪里,本文及后续几篇文章的目的就是搞清楚这个问题。

 

         在《Erlang 编程指南》一书中,对Erlang的并发单元,进程的解释如下:

Erlang进程是轻量级进程,它的生成、上下文切换和消息传递是由虚拟机管理的。操作系统线程的Erlang进程之间没有任何联系,这使并发有关的操作不仅独立于底层的操作系统,而且也是非常高效和具有很强可扩展性。”

由此可见,Erlang里的进程跟操作系统里常提到的进程,线程完全没有关系,只是Erlang并发机制里基本并发单元的一个代称。下面详细的说明Erlang进程的创建过程。

Erlang虚拟机的角度来看,Erlang进程就是一个process结构,定义在$OTP_SRC/erts/emulator/beam/erl_process.h中(struct process),该结构中包含进程所使用的堆栈、GC、调度、消息队列等信息。Erlang程序通过erlang:spawn或者相关BIF调用(spawn_optspawn_link等)可以生成一个新的进程。

 

erts中,spawn调用会最终调用到spawn_3

[$OTP_SRC/erts/emulator/beam/bif.c --> spawn_3]

pid = erl_create_process(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, &so);

首先就是调用erl_create_process来生成一个新的进程,会返回创建进程的pid。我们来看看erl_create_process具体都做了哪些事情。

 

[$OTP_SRC/erts/emulator/beam/erl_process.c --> erl_create_process]

// 参数说明
Eterm
erl_create_process(Process* parent, /* Parent of process (default group leader). */
		   Eterm mod,	/* Tagged atom for module. */
		   Eterm func,	/* Tagged atom for function. */
		   Eterm args,	/* Arguments for function (must be well-formed list). */
		   ErlSpawnOpts* so) /* Options for spawn. */

	// 首先调用alloc_process分配一个process结构需要的内存空间
p = alloc_process(); /* All proc locks are locked by this thread on success */
if (!p) {
	// 如果p==null,则说明系统中进程数量已达到上限
		so->error_code = SYSTEM_LIMIT;
		goto error;
}
    /* Scheduler queue mutex should be locked when changeing
     * prio. In this case we don't have to lock it, since
     * noone except us has access to the process.
     */
    
	// 设置process的最小堆大小
if (so->flags & SPO_USE_ARGS) {
//以参数值设置堆属性,一般通过erlang:spawn_opt调用传入参数
		p->min_heap_size  = so->min_heap_size; // 最小堆内存
		p->min_vheap_size = so->min_vheap_size; // 最小虚拟堆内存
//(用于存放二进制数据)
		p->prio           = so->priority; // 进程优先级(max, high, normal, low)
		p->max_gen_gcs    = so->max_gen_gcs; // full gc之前可进行的minor gc的最大次数
} else {
	// 按默认值设置:H_MIN_SIZE=233 (fib(11),erlang里的堆内存按照fib系列增长,
	// 具体可参见[$OTP_SRC/erts/emulator/beam/erl_c.c --> erts_init_gc]里的说明)
		p->min_heap_size  = H_MIN_SIZE;
		p->min_vheap_size = BIN_VH_MIN_SIZE; // 默认值32768(216)
		p->prio           = PRIORITY_NORMAL;
		p->max_gen_gcs    = (Uint16) erts_smp_atomic32_read_nob(&erts_max_gen_gcs);
    }
    
    // 创建进程时传入的module,function,和参数的数量
    p->initial[INITIAL_MOD] = mod;
    p->initial[INITIAL_FUN] = func;
p->initial[INITIAL_ARI] = (Uint) arity;
    
/*
     * Must initialize binary lists here before copying binaries to process.
     */
    p->off_heap.first = NULL;
    p->off_heap.overhead = 0;

    // 计算初始需要的heap大小
heap_need +=
		IS_CONST(parent->group_leader) ? 0 : NC_HEAP_SIZE(parent->group_leader);

    if (heap_need < p->min_heap_size) {
		sz = heap_need = p->min_heap_size;
} else {
	/*
 	 * Find the next heap size equal to or greater than the given size (if offset == 0).
 	 *
 	 * If offset is 1, the next higher heap size is returned (always greater than size).
 	*/
		sz = erts_next_heap_size(heap_need, 0);
}

// 分配进程堆内存
    p->heap = (Eterm *) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP, sizeof(Eterm)*sz);
p->old_hend = p->old_htop = p->old_heap = NULL;
// 进程堆内存里年轻代的标志位:地址小于此标志位的,是较老的年轻代(一般情况
// 下,这些对象至少经过了一次minor gc或者major gc);大于这个地址的是较年轻的
// 年轻代。
p->high_water = p->heap;
	// minor gc的次数
p->gen_gcs = 0;
// 栈顶,紧邻堆
    p->stop = p->hend = p->heap + sz;
    p->htop = p->heap;
    p->heap_sz = sz;

    /* No need to initialize p->fcalls. */
	// 当前模块及函数信息
    p->current = p->initial+INITIAL_MOD;

	// 第一条指令设置为i_apply 
p->i = (BeamInstr *) beam_apply;
// cp保存进入一个函数调用时,当前函数的下一条指令 
    p->cp = (BeamInstr *) beam_apply+1;

	// 消息队列
    p->msg.first = NULL;
    p->msg.last = &p->msg.first;
    p->msg.save = &p->msg.first;
    p->msg.len = 0;
#ifdef ERTS_SMP
    // 消息进入队列
p->msg_inq.first = NULL;
    p->msg_inq.last = &p->msg_inq.first;
    p->msg_inq.len = 0;
p->bound_runq = NULL;

if (so->flags & SPO_LINK) {
	// 进程链接,由spawn_link指定
		if (IS_TRACED_FL(parent, F_TRACE_PROCS)) {
	    	trace_proc(parent, parent, am_link, p->id);
		}
		// 父进程及当前进程互相连接
		erts_add_link(&(parent->nlinks), LINK_PID, p->id);
		erts_add_link(&(p->nlinks), LINK_PID, parent->id);
	}

    /*
     * Test whether this process should be initially monitored by its parent.
     */
    if (so->flags & SPO_MONITOR) {
		Eterm mref;
		//进程监控:单向,由spawn_monitor指定
		mref = erts_make_ref(parent);
		erts_add_monitor(&(parent->monitors), MON_ORIGIN, mref, p->id, NIL);
		erts_add_monitor(&(p->monitors), MON_TARGET, mref, parent->id, NIL);
		so->mref = mref;
}
    /*
     * Schedule process for execution.
     */
if (!((so->flags & SPO_USE_ARGS) && so->scheduler))
	// 如果参数中未指定scheduler,则使用父进程的任务队列
		rq = erts_get_runq_proc(parent);
else {
	// 根据绑定的scheduler,获取任务队列(spawn_opt中可以绑定scheduler,文档
	// 中无说明,具体可见:[$OTP_SRC/erts/emulator/beam/bif.c --> spawn_opt_1])
		int ix = so->scheduler-1;
		ASSERT(0 <= ix && ix < erts_no_run_queues);
		rq = ERTS_RUNQ_IX(ix);
		p->bound_runq = rq;
    }

#ifdef ERTS_SMP
	// 设置当前进程的任务队列
    p->run_queue = rq;
#endif
	// 设置进程的状态为waiting
p->status = P_WAITING;
// 将当前进程添加到任务队列,并将进程的状态设置为runnable
    notify_runq = internal_add_to_runq(rq, p);
	// 唤醒调度器
    smp_notify_inc_runq(notify_runq);
	// 返回进程PID
    res = p->id;

	// 创建成功
VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->id));

进程创建成功后,会将pid返回到spawn_3调用。

 

[$OTP_SRC/erts/emulator/beam/bif.c --> spawn_3]

	if (ERTS_USE_MODIFIED_TIMING()) {
	    BIF_TRAP2(erts_delay_trap, BIF_P, pid, ERTS_MODIFIED_TIMING_DELAY);
	}
	BIF_RET(pid);

BIF_TRAP2Erlang里的Trap机制,关于Trap机制的详细说明见这里。这里的调用属于第三类,主动放弃CPUerts_delay_trap最终会以以pidERTS_MODIFIED_TIMING_DELAY()为参数调用erlang:delay_trapERTS_USE_MODIFIED_TIMING()这个宏成立的条件是modified timing开关打开,具体参数erl+T参数,默认未打开。更详细的说明见这里

 

[$OTP_SRC/ erts/preloaded/src /erlang.erl --> erlang:delay_trap]

%%
%% Trap function used when modified timing has been enabled.
%%
delay_trap(Result, 0) -> erlang:yield(), Result;
delay_trap(Result, Timeout) -> receive after Timeout -> Result end. 
erlang:yield等同于receive after 1 -> Result enddelay_trap的作用就是让当前进程放弃CPU,使其它的进程有机会运行,在spawn调用的场景下,也就是会使新创建的进程有机会被调度到。 
  • 大小: 92 KB
  • 大小: 81.3 KB
1
0
分享到:
评论
1 楼 AsIMovedon 2013-04-10  
很好,通俗易懂。学习了~

相关推荐

    erlang-18.3.4.7-1.el6.x86_64.rpm

    ● 并发性 - Erlang支持超大量级的并发进程,并且不需要操作系统具有并发机制。 ● 分布式 - 一个分布式Erlang系统是多个Erlang节点组成的网络(通常每个处理器被作为一个节点) ● 健壮性 - Erlang具有多种基本的...

    Erlang并发编程,Erlang程序设计,Erlang中文手册

    Erlang并发编程,Erlang程序设计,Erlang中文手册。 学习erlang的好资料。  Erlang是一个结构化,动态类型编程语言,内建并行计算支持。最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此...

    introducing erlang

    ● 并发性 - Erlang支持超大量级的并发进程,并且不需要操作系统具有并发机制。 ● 分布式 - 一个分布式Erlang系统是多个Erlang节点组成的网络(通常每个处理器被作为一个节点) ● 健壮性 - Erlang具有多种基本的...

    Erlang安装手册

     ● 并发性 - Erlang支持超大量级的并发线程,并且不需要操作系统具有并发机制。  ● 分布式 - 一个分布式Erlang系统是多个Erlang节点组成的网络(通常每个处理器被作为一个节点)  ● 健壮性 - Erlang具有多种...

    论文研究-基于Erlang的即时通信系统并发性能研究 .pdf

    基于Erlang的即时通信系统并发性能研究,杨杰,张淼,为了提升即时通信服务的并发性能以及鲁棒性,以适应日益庞大的网络用户量,本文根据Erlang语言的特性,设计了特殊的进程监管机制以

    并发机制:CSP vs Actor模型以及Golang实现

    本文将简单介绍CSP和Actor模型俩流行的并发机制,并比较他们的优缺点,并通过Golang中CSP并发机制实现FutureTask.并行机制有很多像是多线程,CSP,Actor等等.拿多线程来说,就有诸多问题,譬如:死锁,可扩展性差,共享状态....

    cpie-cn_r148.pdf

    并发 第2章串行编程 项式 模式匹配 表达式求值 模块系统 函数定义 原语 算术表达式 变量作用域 第3章列表编程 用于列表处理的BIF 常用列表处理函数 示例 列表的常用递归模式 函数式参数 第4章使用元组...

    erlmysql:Erlang MySQL驱动程序

    它提供了连接池机制以提高并发效率。 客户端是用Erlang编写的,并且非常接近MySQL Connector / C接口。 它可以称为MySQL Connector / Erlang,但在功能上有一些限制(最新版本不支持SSL)。 设计是基于协议的描述从...

    opt_win64_21最新

    Erlang(['ə:læŋ])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。Erlang问世于1987年,经过十年的发展,于1998年...

    lbm_nomq:该项目已被 https 取代

    因此,这种机制非常适合具有许多并发生产者的应用程序,每个生产者产生适量的消息。 简而言之, lbm_nomq允许通过基于主题的逻辑通道向订阅的 MFA 发送术语。 在这种情况下,订阅的 MFA 遵循gen:call/4语义,保证...

    springCloud

    1、提交代码触发post请求给bus/refresh 2、server端接收到请求并发送给Spring Cloud Bus 3、Spring Cloud bus接到消息并通知给其它客户端 4、其它客户端接收到通知,请求Server端获取最新配置 5、全部客户端均获取到...

Global site tag (gtag.js) - Google Analytics