gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析
问题的背景是:
1、我开发了一个服务端程序,接收客户端的连接。同一时刻会有多个客户端来连接,连接后,接收客户端请求后,再发送响应消息,然后客户端主动断连。
2、服务端监听的socket属性设置如下:
[binary, {packet, raw},
{ip, IPAddr}, {backlog, 10000},
{active, false}, {reuseaddr, true},
{nodelay, false}, {delay_send, true},
{recbuf, 128 * 1024}, {sndbuf, 64 * 1024}]
3、服务器accept监听socket,接收客户端请求,发送响应消息分别是在3个不同的进程中进行。接收请求和发送响应的进程都是重复使用的,每次重新使用的时候传入一个新accept的socket。
问题的现象是:
1、单个用户发起呼叫的时候,流程是成功的,服务器能正常响应。但是多个用户一起呼,批量跑的时候,跑一段时间后,部分客户端会发现不能接收到服务器返回的响应。从抓包来看,客户端的请求是发送到服务器端了。
2、服务器这边发送响应的进程会收到一条{empty_out_q, #Port}这样的消息,而这条消息并不是我开发的代码产生的。
问题是:
1、为什么发消息的进程会收到{empty_out_q, #Port}这样的消息?
2、收到empty_out_q消息,是不是就说明调用gen_tcp:send发送失败?
3、是不是说设置了delay_send的属性,所以即使send失败,也是异步的,在调用send的时候会马上返回ok,但是后面真的发送失败后,则系统会给调用send方法的进程发送一条{empty_out_q, #Port}这样的消息。
这个问题非常有意思。
首先我来解释下erts内部inet_drv工作原理:gen_tcp:send的时候在正常情况下,底层的驱动会马上调用操作系统的send去发送数据,如果一次没发完数据,会把数据暂存在驱动的发送队列里面,同时向epoll等事件检查器登记socket写事件,然后等待socket的可写事件的发生。 delay_send选项的的作用是不尝试立即发送,而是把数据都暂存在驱动的发送队列,然后等待可写, 从而可以批量发送数据,提高效率。
有了这样的机制后,那么在gen_tcp:close或者shutdown的时候,要保证这些暂存的数据先发送完成。那么在知道目前还有暂存的数据未发送的时候,不会立即执行close或者shutdown操作,而是会先等待这些数据的发送。
等待过程是这样的,先把调用者进程的pid提交到socket的empty_out_q等待队列去,同时返回目前暂存的数据的字节数。这时候底层的发送过程还在继续,如果在发送的时候,发现数据都发生完毕,就给调用者进程发送一个{empty_out_q, Port}消息,表明可以进行下一步动作。
参看代码: otp_src_R14B03/erts/preloaded/src/prim_inet.erl: L117
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% Shutdown(insock(), atom()) -> ok %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% TODO: shutdown equivalent for SCTP %% shutdown(S, read) when is_port(S) -> shutdown_2(S, 0); shutdown(S, write) when is_port(S) -> shutdown_1(S, 1); shutdown(S, read_write) when is_port(S) -> shutdown_1(S, 2). shutdown_1(S, How ) -> case subscribe(S, [subs_empty_out_q]) of {ok,[{subs_empty_out_q,N}]} when N > 0 -> shutdown_pend_loop(S, N); %% wait for pending output to be sent _ Other -> ok end , shutdown_2(S, How ). shutdown_2(S, How ) -> case ctl_cmd(S, ?TCP_REQ_SHUTDOWN , [ How ]) of {ok, []} -> ok; Error -> Error end . shutdown_pend_loop(S, N0 ) -> receive {empty_out_q,S} -> ok after ?INET_CLOSE_TIMEOUT -> case getstat(S, [send_pend]) of {ok,[{send_pend, N0 }]} -> ok; {ok,[{send_pend,N}]} -> shutdown_pend_loop(S, N); _ -> ok end end . |
如果超过5秒,都没有收到{empty_out_q, Port}消息,那么就看下目前的暂存的数据的字节数, 并且和最初的暂存的数据的字节数比较,如果一直没变的话,那么说明由于种种原因,把这些数据发送出去比较没希望,那么他就果断的继续下一步动作。如果字节数在变少的话,那么就继续等。
现在问题来了。shutdown 在发现暂存的数据没有希望发出去的时候,选择不作为,那么作为后遗症,caller被登记在通知名单里面。过一段时间,要不数据真的被发出去了,要不就是发生以下事件: 1. socket发现读错误。2. socket发现写错误。 3. 对端关闭。 4. 宿主进程退出,socket作为一个port被强行关闭,这时候需要清空发送缓冲区,这时候会同时给caller发送{empty_out_q, Port}消息。
有图有真相,我们让代码说话。
参看代码: otp_src_R14B03/erts/emulator/drivers/common/inet_drv.c:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | // L7100 这个函数负责给调用者发送消息empty_out_q static void send_empty_out_q_msgs(inet_descriptor* desc) { ErlDrvTermData msg[6]; int msg_len = 0; if (NO_SUBSCRIBERS(&desc->empty_out_q_subs)) return ; msg_len = LOAD_ATOM(msg, msg_len, am_empty_out_q); msg_len = LOAD_PORT(msg, msg_len, desc->dport); msg_len = LOAD_TUPLE(msg, msg_len, 2); ASSERT(msg_len == sizeof (msg)/ sizeof (*msg)); send_to_subscribers(desc->port, &desc->empty_out_q_subs, 1, msg, msg_len); } //那么谁会调用他呢: tcp_clear_output和tcp_inet_output. static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) { ... else if (IS_CONNECTED(INETP(desc))) { for (;;) { int vsize; int n; SysIOVec* iov; if ((iov = driver_peekq(ix, &vsize)) == NULL) { sock_select(INETP(desc), FD_WRITE, 0); send_empty_out_q_msgs(INETP(desc)); goto done; } vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize; DEBUGF(( "tcp_inet_output(%ld): s=%d, About to send %d items\\r\\n" , ( long )desc->inet.port, desc->inet.s, vsize)); if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, iov, vsize, &n, 0))) { if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) { DEBUGF(( "tcp_inet_output(%ld): sock_sendv(%d) errno = %d\\r\\n" , ( long )desc->inet.port, vsize, sock_errno())); ret = tcp_send_error(desc, sock_errno()); goto done; } #ifdef __WIN32__ desc->inet.send_would_block = 1; #endif goto done; ... } //以及: /* clear QUEUED output */ static void tcp_clear_output(tcp_descriptor* desc) { ErlDrvPort ix = desc->inet.port; int qsz = driver_sizeq(ix); driver_deq(ix, qsz); send_empty_out_q_msgs(INETP(desc)); } //谁调用 tcp_clear_output呢? //tcp_inet_flush, //tcp_recv_closed, //tcp_recv_error, //tcp_send_error, //特别是tcp_recv_closed函数 /* The peer socket has closed, cleanup and send event */ static int tcp_recv_closed(tcp_descriptor* desc) { #ifdef DEBUG long port = ( long ) desc->inet.port; /* Used after driver_exit() */ #endif DEBUGF(( "tcp_recv_closed(%ld): s=%d, in %s, line %d\\r\\n" , port, desc->inet.s, __FILE__, __LINE__)); if (IS_BUSY(INETP(desc))) { /* A send is blocked */ desc->inet.caller = desc->inet.busy_caller; tcp_clear_output(desc); if (desc->busy_on_send) { driver_cancel_timer(desc->inet.port); desc->busy_on_send = 0; DEBUGF(( "tcp_recv_closed(%ld): busy on send\\r\\n" , port)); } desc->inet.state &= ~INET_F_BUSY; set_busy_port(desc->inet.port, 0); inet_reply_error_am(INETP(desc), am_closed); DEBUGF(( "tcp_recv_closed(%ld): busy reply \'closed\'\\r\\n" , port)); } if (!desc->inet.active) { /* We must cancel any timer here ! */ driver_cancel_timer(desc->inet.port); /* passive mode do not terminate port ! */ tcp_clear_input(desc); if (desc->inet.exitf) { tcp_clear_output(desc); desc_close(INETP(desc)); } else { desc_close_read(INETP(desc)); } async_error_am_all(INETP(desc), am_closed); /* next time EXBADSEQ will be delivered */ DEBUGF(( "tcp_recv_closed(%ld): passive reply all \'closed\'\\r\\n" , port)); } else { tcp_clear_input(desc); tcp_closed_message(desc); if (desc->inet.exitf) { driver_exit(desc->inet.port, 0); } else { desc_close_read(INETP(desc)); } DEBUGF(( "tcp_recv_closed(%ld): active close\\r\\n" , port)); } DEBUGF(( "tcp_recv_closed(%ld): done\\r\\n" , port)); return -1; } /* 我们看到在: 1. 对端关闭的时候, 2. 而且我端是被动接收, 3. socket打开这二个选项的时候最容易{exit_on_close, true}, {delay_send,true} 最容易发生上面的现象 */ |
只有这种情况你的进程会收到该消息。
从你的描述来看
“同一时刻会有多个客户端来连接,连接后,接收客户端请求后,再发送响应消息,然后客户端主动断连。”.
基本上可以推断是,你用shutdown(write 或者 read_write)后,你的客户端由于某种原因断链,你的发送进程收到这样的消息。
我们来试验下我们的猜想:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | $ cat test .erl -module( test ). -compile(export_all). start() -> start(1234). start(Port) -> register(?MODULE, self()), spawn_link(fun ()-> S= listen(Port), accept(S) end), receive Any -> io: format ( "~p~n" , [Any]) end. %% to stop: test !stop. listen(Port) -> Opts = [{active, false }, binary, {backlog, 256}, {packet, raw}, {reuseaddr, true }], {ok, S} = gen_tcp:listen(Port, Opts), S. accept(S) -> case gen_tcp:accept(S) of {ok, Socket} -> inet:setopts(Socket, [{exit_on_close, true }, {delay_send, true }]), spawn_opt(?MODULE, entry, [Socket], []); Error -> erlang:error(Error) end, accept(S). entry(S)-> loop(S), check_empty_out_q_msg(1000), io: format ( "bye socket ~p~n" ,[S]), ok. check_empty_out_q_msg(Timeout)-> receive Any -> io: format ( "bingo, got message ~p~n" , [Any]), Any after Timeout -> cont end. loop(S) -> check_empty_out_q_msg(0), case gen_tcp:recv(S, 0) of {ok, << "start" , _ /binary >>}-> io: format ( "start to reproduce {empty_out_q, Port} message ~n" ,[]), gen_tcp:send(S, lists:duplicate(1024*1024, "A" )), io: format ( "sent 1M bytes ~n" ,[]), io: format ( "sleep 1s ~n" ,[]), receive Any1 -> Any1 after 1000 -> cont end, loop(S); {ok, _Data} -> io: format ( "shutdown(write) ~n" ,[]), {ok, [{send_pend, N}]}=inet:getstat(S, [send_pend]), gen_tcp: shutdown (S, write), {ok, [{send_pend, N1}]}=inet:getstat(S, [send_pend]), io: format ( "5s send_pend ~w/~w ~n" ,[N,N1]), loop(S); Error -> io: format ( "tcp ~p~n" , [Error]), Error end. $ cat client.erl -module(client). - export ([start /0 ]). start()-> {ok,Sock} = gen_tcp:connect( "localhost" , 1234, [{active, false }]), gen_tcp:send(Sock, "start" ), io: format ( "send start~n" ,[]), gen_tcp:recv(Sock,1024), io: format ( "drain 1024 bytes~n" ,[]), gen_tcp:send(Sock, "bang" ), io: format ( "send bang~n" ,[]), io: format ( "sleep 10s~n" ,[]), receive Any -> Any after 10000 -> cont end, gen_tcp: shutdown (Sock, write), io: format ( "end~n" ,[]), ok. $ erlc test .erl client.erl |
在一个终端运行:
1 2 3 4 5 6 7 8 9 | $ erl -noshell -s test start to reproduce {empty_out_q, Port} message sent 1M bytes sleep 1s shutdown (write) 5s send_pend 851968 /851968 tcp {error,closed} bingo, got message {empty_out_q, #Port<0.456>} bye socket #Port<0.456> |
在另外一个终端运行:
1 2 3 4 5 6 | $ erl -noshell -s client send start drain 1024 bytes send bang sleep 10s end |
中间我们可以看到:
1 2 3 4 | $ ss State Recv-Q Send-Q Local Address:Port Peer Address:Port ESTAB 0 130944 127.0.0.1:search-agent 127.0.0.1:43273 ... |
发送队列大量的数据阻塞。
结果验证了我们的猜想!这个故事告诉我们:源码是最权威的。
建议继续学习:
- gen_tcp发送进程被挂起起因分析及对策 (阅读:37018)
- gen_tcp发送缓冲区以及水位线问题分析 (阅读:5336)
- whatsapp深度使用Erlang有感 (阅读:4717)
- Erlang match_spec引擎介绍和应用 (阅读:4645)
- php-erlang (阅读:4354)
- hibernate使用注意事项 (阅读:3272)
- Erlang linkin driver用port_control方式时的一些经验分享 (阅读:3011)
- Erlang如何限制节点对集群的访问之net_kernel:allow (阅读:3144)
- ERLANG OTP源码分析 – gen_server (阅读:2937)
- erlang学习手记 (阅读:2747)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:Yu Feng 来源: Erlang非业余研究
- 标签: Erlang gen_tcp
- 发布时间:2011-09-07 23:05:13
-
[88] memory prefetch浅析
-
[46] 基本排序算法的PHP实现
-
[44] find命令的一点注意事项
-
[35] Oracle bbed工具的编译
-
[29] Inline Form Labels
-
[29] JS中如何判断字符串类型的数字
-
[29] 卡诺模型―设计品质与设计价值的思考
-
[27] 8大实用又重要Mac使用技巧
-
[26] 两行 JavaScript 代码
-
[24] 小屏幕移动设备网页设计注意事项