这时候玩家进程的tcp发送数据,已经被我替换成了port_command并运行了一段时间都没问题。但是一些流量比较大的游戏服,活跃玩家到了一定数量以后,消息延迟很大(5-6秒),做任何操作都卡,在出现状况期间,服务器CPU、内存、负载各项指标并未异常,ssh连到服务器操作也很正常,没有任何卡顿现象。同服务器的其它游戏服也都正常,但是出问题的游戏服的整个erlang节点都进入一个“很卡”的状态,体现在我进入erlang shell中进行操作时,输入文字延迟很大。
tcp_send (Socket, Bin) ->
try erlang:port_command(Socket, Bin, [force, nosuspend]) of
false ->
exit({game_tcp_send_error, busy});
true ->
error : Error ->
exit({game_tcp_send_error, {error, einval, Error}})
关于这个问题我之前写了篇文章,系统的介绍了gen_tcp的行为,gen_tcp:send的深度解刨和使用指南(初稿)见 这里
send(S, Packet) when is_port(S) ->
case inet_db:lookup_socket(S) of
{ok, Mod} ->
Mod:send(S, Packet);
Error ->
%% inet_tcp.erl:L50 %% %% Send data on a socket %% send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts). send(Socket, Packet) -> prim_inet:send(Socket, Packet, []). %%prim_inet.erl:L349 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% SEND(insock(), Data) -> ok | {error, Reason} %% %% send Data on the socket (io-list) %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This is a generic "port_command" interface used by TCP, UDP, SCTP, depending %% on the driver it is mapped to, and the "Data". It actually sends out data,-- %% NOT delegating this task to any back-end. For SCTP, this function MUST NOT %% be called directly -- use "sendmsg" instead: %% send(S, Data, OptList) when is_port(S), is_list(OptList) -> ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]), try erlang:port_command(S, Data, OptList) of false -> % Port busy and nosuspend option passed ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []), {error,busy}; true -> receive {inet_reply,S,Status} -> ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]), Status end catch error:_Error -> ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []), {error,einval} end.
我们可以看到gen_tcp:send分为二个步骤 1. port_command提交数据 2. 等待{inet_reply,S,Status}回应。这是一个典型的阻塞操作,在等待的时候,进程被调出。
所以如果系统中有大量的tcp链接要发送数据,这种方式有点低效。 所以很多系统把这个动作改成集中提交数据,集中等待回应。
%%rabbit_writer.erl ... handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); handle_message(shutdown, _State) -> exit(normal); ... internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Content, FrameMax)), ok. port_cmd(Sock, Data) -> try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) end.
port_command(Port, Data, OptionList) -> true|false
Port = port() | atom()
Data = iodata()
OptionList = [Option]
Option = force
Option = nosuspend
Sends data to a port. port_command(Port, Data, []) equals port_command(Port, Data).
If the port command is aborted false is returned; otherwise, true is returned.
If the port is busy, the calling process will be suspended until the port is not busy anymore.
Currently the following Options are valid:
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.
More options may be added in the future.
If Port is not an open port or the registered name of an open port.
If Data is not a valid io list.
If OptionList is not a valid option list.
If the force option has been passed, but the driver of the port does not allow forcing through a busy port.
调用port_command是可能引起经常被suspend的,什么条件呢? 出于性能的考虑, inet会在gen_tcp驱动port中起用一个发送缓存区,当我们的数据超过了缓冲区的高水位线默认情况就会被挂起。
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% SETOPT(insock(), Opt, Value) -> ok | {error, Reason} %% SETOPTS(insock(), [{Opt,Value}]) -> ok | {error, Reason} %% %% set socket, ip and driver option %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% setopt(S, Opt, Value) when is_port(S) -> setopts(S, [{Opt,Value}]). setopts(S, Opts) when is_port(S) -> case encode_opt_val(Opts) of {ok, Buf} -> case ctl_cmd(S, ?INET_REQ_SETOPTS, Buf) of {ok, _} -> ok; Error -> Error end; Error -> Error end. %% Encoding for setopts %% %% encode opt/val REVERSED since options are stored in reverse order %% i.e. the recent options first (we must process old -> new) encode_opt_val(Opts) -> try enc_opt_val(Opts, []) catch Reason -> {error,Reason} end. ... enc_opt_val(Opts, Acc, Opt, Val) when is_atom(Opt) -> Type = type_opt(set, Opt), case type_value(set, Type, Val) of true -> enc_opt_val(Opts, [enc_opt(Opt),enc_value(set, Type, Val)|Acc]); false -> {error,einval} end; ... enc_opt(high_watermark) -> ?INET_LOPT_TCP_HIWTRMRK; enc_opt(low_watermark) -> ?INET_LOPT_TCP_LOWTRMRK;
#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy */ #define INET_LOW_WATERMARK (1024*4) /* 4k pending => allow more */ typedef struct { inet_descriptor inet; /* common data structure (DON\'T MOVE) */ int high; /* high watermark */ int low; /* low watermark */ int send_timeout; /* timeout to use in send */ int send_timeout_close; /* auto-close socket on send_timeout */ int busy_on_send; /* busy on send with timeout! */ int i_bufsz; /* current input buffer size (<= bufsz) */ ErlDrvBinary* i_buf; /* current binary buffer */ char* i_ptr; /* current pos in buf */ char* i_ptr_start; /* packet start pos in buf */ int i_remain; /* remaining chars to read */ int tcp_add_flags;/* Additional TCP descriptor flags */ int http_state; /* 0 = response|request 1=headers fields */ inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */ inet_async_multi_op *multi_last; MultiTimerData *mtd; /* Timer structures for multiple accept */ } tcp_descriptor; static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) { ... case INET_LOPT_TCP_HIWTRMRK: if (desc->stype == SOCK_STREAM) { tcp_descriptor* tdesc = (tcp_descriptor*) desc; if (ival < 0) ival = 0; if (tdesc->low > ival) tdesc->low = ival; tdesc->high = ival; } continue; case INET_LOPT_TCP_LOWTRMRK: if (desc->stype == SOCK_STREAM) { tcp_descriptor* tdesc = (tcp_descriptor*) desc; if (ival < 0) ival = 0; if (tdesc->high < ival) tdesc->high = ival; tdesc->low = ival; } continue; ... }
gen_tcp的默认高低水位线分别为8K/4K, 如何微调参见 节点间通讯的通道微调
$ erl Erlang R14B04 (erts-5.8.5) 1 [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false] Eshell V5.8.5 (abort with ^G) 1> {ok,Sock} = gen_tcp:connect("", 80, [{active,false}]). {ok,#Port<0.595>} 2> inet:getopts(Sock,[high_watermark, low_watermark]). {ok,[{high_watermark,8192},{low_watermark,4096}]} 3> inet:setopts(Sock,[{high_watermark,131072},{low_watermark, 65536}]). ok 4> inet:getopts(Sock,[high_watermark, low_watermark]). {ok,[{high_watermark,131072},{low_watermark,65536}]}
//inet_drv.c:L845 static struct erl_drv_entry tcp_inet_driver_entry = { tcp_inet_init, /* inet_init will add this driver !! */ tcp_inet_start, tcp_inet_stop, tcp_inet_command, #ifdef __WIN32__ tcp_inet_event, NULL, #else tcp_inet_drv_input, tcp_inet_drv_output, #endif "tcp_inet", NULL, NULL, tcp_inet_ctl, tcp_inet_timeout, tcp_inet_commandv, NULL, tcp_inet_flush, NULL, NULL, ERL_DRV_EXTENDED_MARKER, ERL_DRV_EXTENDED_MAJOR_VERSION, ERL_DRV_EXTENDED_MINOR_VERSION, ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY, NULL, tcp_inet_process_exit, inet_stop_select };
int driver_flags
This field is used to pass driver capability information to the runtime system. If the extended_marker field equals ERL_DRV_EXTENDED_MARKER, it should contain 0 or driver flags (ERL_DRV_FLAG_*) ored bitwise. Currently the following driver flags exist:
The runtime system will use port level locking on all ports executing this driver instead of driver level locking when the driver is run in a runtime system with SMP support. For more information see the erl_driver documentation.
Marks that driver instances can handle being called in the output and/or outputv callbacks even though a driver instance has marked itself as busy (see set_busy_port()). Since erts version 5.7.4 this flag is required for drivers used by the Erlang distribution (the behaviour has always been required by drivers used by the distribution).
BIF_RETTYPE port_command_2(BIF_ALIST_2) { return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0); } //erl_bif_port.c:L120: static BIF_RETTYPE do_port_command(Process *BIF_P, Eterm BIF_ARG_1, Eterm BIF_ARG_2, Eterm BIF_ARG_3, Uint32 flags) { ... if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE) && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) { ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP); } else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE) && p->status & ERTS_PORT_SFLG_PORT_BUSY) { if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) { ERTS_BIF_PREP_RET(res, am_false); } else { erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p); if (erts_system_monitor_flags.busy_port) { monitor_generic(BIF_P, am_busy_port, p->id); } ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); } } else { int wres; erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); ERTS_SMP_CHK_NO_PROC_LOCKS; wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2); erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); if (wres != 0) { ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG); } } ... }
1. 如果port_command设置了force标志,但是驱动不支持ERL_DRV_FLAG_SOFT_BUSY, 要返回EXC_NOTSUP错误。
2. 如果设置了NOSUSPEND,但是port已经busy了,返回false,表明发送失败。否则的话就把发送进程suspend,同时告诉system_monitor系统现在有port进入busy_port了。
erlang:system_monitor(MonitorPid, [Option]) -> MonSettings
MonitorPid = pid()
Option = {long_gc, Time} | {large_heap, Size} | busy_port | busy_dist_port
Time = Size = int()
MonSettings = {OldMonitorPid, [Option]}
OldMonitorPid = pid()
Sets system performance monitoring options. MonitorPid is a local pid that will receive system monitor messages, and the second argument is a list of monitoring options:
{long_gc, Time}
If a garbage collection in the system takes at least Time wallclock milliseconds, a message {monitor, GcPid, long_gc, Info} is sent to MonitorPid. GcPid is the pid that was garbage collected and Info is a list of two-element tuples describing the result of the garbage collection. One of the tuples is {timeout, GcTime} where GcTime is the actual time for the garbage collection in milliseconds. The other tuples are tagged with heap_size, heap_block_size, stack_size, mbuf_size, old_heap_size, and old_heap_block_size. These tuples are explained in the documentation of the gc_start trace message (see erlang:trace/3). New tuples may be added, and the order of the tuples in the Info list may be changed at any time without prior notice.
{large_heap, Size}
If a garbage collection in the system results in the allocated size of a heap being at least Size words, a message {monitor, GcPid, large_heap, Info} is sent to MonitorPid. GcPid and Info are the same as for long_gc above, except that the tuple tagged with timeout is not present. Note: As of erts version 5.6 the monitor message is sent if the sum of the sizes of all memory blocks allocated for all heap generations is equal to or larger than Size. Previously the monitor message was sent if the memory block allocated for the youngest generation was equal to or larger than Size.
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port.
If a process in the system gets suspended because it sends to a process on a remote node whose inter-node communication was handled by a busy port, a message {monitor, SusPid, busy_dist_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending through the inter-node communication port Port.
Returns the previous system monitor settings just like erlang:system_monitor/0.
If a monitoring process gets so large that it itself starts to cause system monitor messages when garbage collecting, the messages will enlarge the process’s message queue and probably make the problem worse.
Keep the monitoring process neat and do not set the system monitor limits too tight.
Failure: badarg if MonitorPid does not exist.
static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev) { tcp_descriptor* desc = (tcp_descriptor*)e; desc->inet.caller = driver_caller(desc->inet.port); DEBUGF(("tcp_inet_commanv(%ld) {s=%d\\r\\n", (long)desc->inet.port, desc->inet.s)); if (!IS_CONNECTED(INETP(desc))) { if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_SEND) { desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_CLOSE_SEND; inet_reply_error_am(INETP(desc), am_closed); } else inet_reply_error(INETP(desc), ENOTCONN); } else if (tcp_sendv(desc, ev) == 0) inet_reply_ok(INETP(desc)); DEBUGF(("tcp_inet_commandv(%ld) }\\r\\n", (long)desc->inet.port)); } /* ** Send non-blocking vector data */ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) { ... if ((sz = driver_sizeq(ix)) > 0) { driver_enqv(ix, ev, 0); if (sz+ev->size >= desc->high) { DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\\r\\n", (long)desc->inet.port, desc->inet.s)); desc->inet.state |= INET_F_BUSY; /* mark for low-watermark */ desc->inet.busy_caller = desc->inet.caller; set_busy_port(desc->inet.port, 1); if (desc->send_timeout != INET_INFINITY) { desc->busy_on_send = 1; driver_set_timer(desc->inet.port, desc->send_timeout); } return 1; } } ... } // beam/io.c:L2352 void set_busy_port(ErlDrvPort port_num, int on) { ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num])); if (on) { erts_port_status_bor_set(&erts_port[port_num], ERTS_PORT_SFLG_PORT_BUSY); } else { ErtsProcList* plp = erts_port[port_num].suspended; erts_port_status_band_set(&erts_port[port_num], ~ERTS_PORT_SFLG_PORT_BUSY); erts_port[port_num].suspended = NULL; if (erts_port[port_num].dist_entry) { /* * Processes suspended on distribution ports are * normally queued on the dist entry. */ erts_dist_port_not_busy(&erts_port[port_num]); } /* * Resume, in a round-robin fashion, all processes waiting on the port. * * This version submitted by Tony Rogvall. The earlier version used * to resume the processes in order, which caused starvation of all but * the first process. */ if (plp) { /* First proc should be resumed last */ if (plp->next) { erts_resume_processes(plp->next); plp->next = NULL; } erts_resume_processes(plp); } } }
/* socket ready for ouput: ** 1. TCP_STATE_CONNECTING => non block connect ? ** 2. TCP_STATE_CONNECTED => write output */ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) { ... if (driver_deq(ix, n) <= desc->low) { if (IS_BUSY(INETP(desc))) { desc->inet.caller = desc->inet.busy_caller; desc->inet.state &= ~INET_F_BUSY; set_busy_port(desc->inet.port, 0); /* if we have a timer then cancel and send ok to client */ if (desc->busy_on_send) { driver_cancel_timer(desc->inet.port); desc->busy_on_send = 0; } inet_reply_ok(INETP(desc)); } } ... } //erl_process.c:5038 /* ** Suspend a process ** If we are to suspend on a port the busy_port is the thing ** otherwise busy_port is NIL */ void erts_suspend(Process* process, ErtsProcLocks process_locks, Port *busy_port) { ErtsRunQueue *rq; ERTS_SMP_LC_ASSERT(process_locks == erts_proc_lc_my_proc_locks(process)); if (!(process_locks & ERTS_PROC_LOCK_STATUS)) erts_smp_proc_lock(process, ERTS_PROC_LOCK_STATUS); rq = erts_get_runq_proc(process); erts_smp_runq_lock(rq); suspend_process(rq, process); erts_smp_runq_unlock(rq); if (busy_port) erts_wake_process_later(busy_port, process); if (!(process_locks & ERTS_PROC_LOCK_STATUS)) erts_smp_proc_unlock(process, ERTS_PROC_LOCK_STATUS); } // io.c:474: void erts_wake_process_later(Port *prt, Process *process) { ErtsProcList** p; ErtsProcList* new_p; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); if (prt->status & ERTS_PORT_SFLGS_DEAD) return; for (p = &(prt->suspended); *p != NULL; p = &((*p)->next)) /* Empty loop body */; new_p = erts_proclist_create(process); new_p->next = NULL; *p = new_p; }
port是和进程一样公平调度的. 进程是按照reductions为单位调度的, port是把发送的字节数折合成reductions. 所以如果一个进程发送大量的tcp数据 那么这个进程不是一直会得到执行的. 运行期会强制停止一段时间, 让其他port有机会执行的.
//erl_port_task.c:L45 /* * Costs in reductions for some port operations. */ #define ERTS_PORT_REDS_EXECUTE 0 #define ERTS_PORT_REDS_FREE 50 #define ERTS_PORT_REDS_TIMEOUT 200 #define ERTS_PORT_REDS_INPUT 200 #define ERTS_PORT_REDS_OUTPUT 200 #define ERTS_PORT_REDS_EVENT 200 #define ERTS_PORT_REDS_TERMINATE 100 /* * Run all scheduled tasks for the first port in run queue. If * new tasks appear while running reschedule port (free task is * an exception; it is always handled instantly). * * erts_port_task_execute() is called by scheduler threads between * scheduleing of processes. Sched lock should be held by caller. */ int erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { ... case ERTS_PORT_TASK_TIMEOUT: reds += ERTS_PORT_REDS_TIMEOUT; if (!(pp->status & ERTS_PORT_SFLGS_DEAD)) (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); break; case ERTS_PORT_TASK_INPUT: reds += ERTS_PORT_REDS_INPUT; ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0); /* NOTE some windows drivers use ->ready_input for input and output */ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->event); io_tasks_executed++; break; case ERTS_PORT_TASK_OUTPUT: reds += ERTS_PORT_REDS_OUTPUT; ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event); io_tasks_executed++; break; case ERTS_PORT_TASK_EVENT: reds += ERTS_PORT_REDS_EVENT; ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0); (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data); io_tasks_executed++; break; case ERTS_PORT_TASK_DIST_CMD: reds += erts_dist_command(pp, CONTEXT_REDS-reds); break; ... ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds); return res; } #define ERTS_PORT_REDUCTIONS_EXECUTED(RQ, REDS) \\ do { \\ (RQ)-> += (REDS); \\ (RQ)->check_balance_reds -= (REDS); \\ (RQ)->wakeup_other_reds += (REDS); \\ } while (0)
对策就是如果该进程不能阻塞,那么就添加force标志,强行往缓冲区加入数据,同时设置{send_timeout, Integer}。
- gen_tcp发送缓冲区以及水位线问题分析 (阅读:5314)
- gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析 (阅读:3576)
- 关于请求被挂起页面加载缓慢问题的追查 (阅读:2733)
- gen_tcp容易误用的一点解释 (阅读:2670)
- 后台脚本挂起的几种原因 (阅读:2555)
- gen_tcp如何限制封包大小 (阅读:2565)
- 未公开的gen_tcp:unrecv以及接收缓冲区行为分析 (阅读:1994)
- gen_tcp接受链接时enfile的问题分析及解决 (阅读:1658)
- 未公开的gen_tcp:unrecv以及接收缓冲区行为分析 (阅读:1489)
- 使用 SysRq 键安全重启挂起的 Linux (阅读:1177)
- 作者:Yu Feng 来源: Erlang非业余研究
- 标签: gen_tcp 挂起
- 发布时间:2011-10-14 13:46:07
[72] Java开发岗位面试题归类汇总
[56] 如何拿下简短的域名
[55] android 开发入门
[55] 【社会化设计】自我(self)部分――欢迎区
[55] IOS安全–浅谈关于IOS加固的几种方法
[53] find命令的一点注意事项
[53] Oracle MTS模式下 进程地址与会话信
[51] 图书馆的世界纪录
[49] Go Reflect 性能
[47] 关于恐惧的自白