技术头条 - 一个快速在微博传播文章的方式     
您现在的位置首页 --> 其他 --> gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析

gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析

浏览:3594次  出处信息
    今天有同学在gmail里面问了一个Erlang的问题,问题描述的非常好, 如下:

     问题的背景是:

     1、我开发了一个服务端程序,接收客户端的连接。同一时刻会有多个客户端来连接,连接后,接收客户端请求后,再发送响应消息,然后客户端主动断连。

     2、服务端监听的socket属性设置如下:

     [binary, {packet, raw},

     {ip, IPAddr}, {backlog, 10000},

     {active, false}, {reuseaddr, true},

     {nodelay, false}, {delay_send, true},

     {recbuf, 128 * 1024}, {sndbuf, 64 * 1024}]

     3、服务器accept监听socket,接收客户端请求,发送响应消息分别是在3个不同的进程中进行。接收请求和发送响应的进程都是重复使用的,每次重新使用的时候传入一个新accept的socket。

     问题的现象是:

     1、单个用户发起呼叫的时候,流程是成功的,服务器能正常响应。但是多个用户一起呼,批量跑的时候,跑一段时间后,部分客户端会发现不能接收到服务器返回的响应。从抓包来看,客户端的请求是发送到服务器端了。

     2、服务器这边发送响应的进程会收到一条{empty_out_q, #Port}这样的消息,而这条消息并不是我开发的代码产生的。

     问题是:

     1、为什么发消息的进程会收到{empty_out_q, #Port}这样的消息?

     2、收到empty_out_q消息,是不是就说明调用gen_tcp:send发送失败?

     3、是不是说设置了delay_send的属性,所以即使send失败,也是异步的,在调用send的时候会马上返回ok,但是后面真的发送失败后,则系统会给调用send方法的进程发送一条{empty_out_q, #Port}这样的消息。

    这个问题非常有意思。

    首先我来解释下erts内部inet_drv工作原理:gen_tcp:send的时候在正常情况下,底层的驱动会马上调用操作系统的send去发送数据,如果一次没发完数据,会把数据暂存在驱动的发送队列里面,同时向epoll等事件检查器登记socket写事件,然后等待socket的可写事件的发生。 delay_send选项的的作用是不尝试立即发送,而是把数据都暂存在驱动的发送队列,然后等待可写, 从而可以批量发送数据,提高效率。

    有了这样的机制后,那么在gen_tcp:close或者shutdown的时候,要保证这些暂存的数据先发送完成。那么在知道目前还有暂存的数据未发送的时候,不会立即执行close或者shutdown操作,而是会先等待这些数据的发送。

    等待过程是这样的,先把调用者进程的pid提交到socket的empty_out_q等待队列去,同时返回目前暂存的数据的字节数。这时候底层的发送过程还在继续,如果在发送的时候,发现数据都发生完毕,就给调用者进程发送一个{empty_out_q, Port}消息,表明可以进行下一步动作。

    参看代码: otp_src_R14B03/erts/preloaded/src/prim_inet.erl: L117

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%% Shutdown(insock(), atom()) -> ok
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% TODO: shutdown equivalent for SCTP
%%
shutdown(S, read) when is_port(S) ->
    shutdown_2(S, 0);
shutdown(S, write) when is_port(S) ->
    shutdown_1(S, 1);
shutdown(S, read_write) when is_port(S) ->
    shutdown_1(S, 2).
 
shutdown_1(S, How) ->
    case subscribe(S, [subs_empty_out_q]) of
        {ok,[{subs_empty_out_q,N}]} when N > 0 ->
            shutdown_pend_loop(S, N);   %% wait for pending output to be sent
        _Other -> ok
    end,
    shutdown_2(S, How).
 
shutdown_2(S, How) ->
    case ctl_cmd(S, ?TCP_REQ_SHUTDOWN, [How]) of
   {ok, []} -> ok;
        Error -> Error
    end.
 
shutdown_pend_loop(S, N0) ->
    receive
        {empty_out_q,S} -> ok
    after ?INET_CLOSE_TIMEOUT ->
            case getstat(S, [send_pend]) of
                {ok,[{send_pend,N0}]} -> ok;
                {ok,[{send_pend,N}]} -> shutdown_pend_loop(S, N);
                _ -> ok
            end
    end.

    如果超过5秒,都没有收到{empty_out_q, Port}消息,那么就看下目前的暂存的数据的字节数, 并且和最初的暂存的数据的字节数比较,如果一直没变的话,那么说明由于种种原因,把这些数据发送出去比较没希望,那么他就果断的继续下一步动作。如果字节数在变少的话,那么就继续等。

    现在问题来了。shutdown 在发现暂存的数据没有希望发出去的时候,选择不作为,那么作为后遗症,caller被登记在通知名单里面。过一段时间,要不数据真的被发出去了,要不就是发生以下事件: 1. socket发现读错误。2. socket发现写错误。 3. 对端关闭。 4. 宿主进程退出,socket作为一个port被强行关闭,这时候需要清空发送缓冲区,这时候会同时给caller发送{empty_out_q, Port}消息。

    有图有真相,我们让代码说话。

    参看代码: otp_src_R14B03/erts/emulator/drivers/common/inet_drv.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// L7100 这个函数负责给调用者发送消息empty_out_q
static void
send_empty_out_q_msgs(inet_descriptor* desc)
{
  ErlDrvTermData msg[6];
  int msg_len = 0;
 
  if(NO_SUBSCRIBERS(&desc->empty_out_q_subs))
    return;
 
  msg_len = LOAD_ATOM(msg, msg_len, am_empty_out_q);
  msg_len = LOAD_PORT(msg, msg_len, desc->dport);
  msg_len = LOAD_TUPLE(msg, msg_len, 2);
 
  ASSERT(msg_len == sizeof(msg)/sizeof(*msg));
 
  send_to_subscribers(desc->port,
                      &desc->empty_out_q_subs,
                      1,
                      msg,
                      msg_len);
}
 
//那么谁会调用他呢: tcp_clear_output和tcp_inet_output.
 
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
else if (IS_CONNECTED(INETP(desc))) {
        for (;;) {
            int vsize;
            int n;
            SysIOVec* iov;
 
            if ((iov = driver_peekq(ix, &vsize)) == NULL) {
                sock_select(INETP(desc), FD_WRITE, 0);
                send_empty_out_q_msgs(INETP(desc));
                goto done;
            }
            vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
            DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\\r\\n",
                    (long)desc->inet.port, desc->inet.s, vsize));
            if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, iov, vsize, &n, 0))) {
                if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
                    DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\\r\\n",
                            (long)desc->inet.port, vsize, sock_errno()));
                    ret =  tcp_send_error(desc, sock_errno());
                    goto done;
                }
#ifdef __WIN32__
                desc->inet.send_would_block = 1;
#endif
                goto done;
...
}
 
//以及:
 
/* clear QUEUED output */
static void tcp_clear_output(tcp_descriptor* desc)
{
    ErlDrvPort ix  = desc->inet.port;
    int qsz = driver_sizeq(ix);
 
    driver_deq(ix, qsz);
    send_empty_out_q_msgs(INETP(desc));
}
 
//谁调用 tcp_clear_output呢?
//tcp_inet_flush,
//tcp_recv_closed,
//tcp_recv_error,
//tcp_send_error,
 
//特别是tcp_recv_closed函数
 
/* The peer socket has closed, cleanup and send event */
static int tcp_recv_closed(tcp_descriptor* desc)
{
#ifdef DEBUG
    long port = (long) desc->inet.port; /* Used after driver_exit() */
#endif
    DEBUGF(("tcp_recv_closed(%ld): s=%d, in %s, line %d\\r\\n",
            port, desc->inet.s, __FILE__, __LINE__));
    if (IS_BUSY(INETP(desc))) {
        /* A send is blocked */
        desc->inet.caller = desc->inet.busy_caller;
        tcp_clear_output(desc);
        if (desc->busy_on_send) {
            driver_cancel_timer(desc->inet.port);
            desc->busy_on_send = 0;
            DEBUGF(("tcp_recv_closed(%ld): busy on send\\r\\n", port));
        }
        desc->inet.state &= ~INET_F_BUSY;
        set_busy_port(desc->inet.port, 0);
        inet_reply_error_am(INETP(desc), am_closed);
        DEBUGF(("tcp_recv_closed(%ld): busy reply \'closed\'\\r\\n", port));
    }
    if (!desc->inet.active) {
        /* We must cancel any timer here ! */
        driver_cancel_timer(desc->inet.port);
        /* passive mode do not terminate port ! */
        tcp_clear_input(desc);
        if (desc->inet.exitf) {
            tcp_clear_output(desc);
            desc_close(INETP(desc));
        } else {
            desc_close_read(INETP(desc));
        }
        async_error_am_all(INETP(desc), am_closed);
        /* next time EXBADSEQ will be delivered  */
        DEBUGF(("tcp_recv_closed(%ld): passive reply all \'closed\'\\r\\n", port));
    } else {
        tcp_clear_input(desc);
        tcp_closed_message(desc);
        if (desc->inet.exitf) {
            driver_exit(desc->inet.port, 0);
        } else {
            desc_close_read(INETP(desc));
        }
        DEBUGF(("tcp_recv_closed(%ld): active close\\r\\n", port));
    }
    DEBUGF(("tcp_recv_closed(%ld): done\\r\\n", port));
    return -1;
}
/* 我们看到在:
1. 对端关闭的时候,
2. 而且我端是被动接收,
3. socket打开这二个选项的时候最容易{exit_on_close, true}, {delay_send,true}
最容易发生上面的现象
*/

    只有这种情况你的进程会收到该消息。

    从你的描述来看

    “同一时刻会有多个客户端来连接,连接后,接收客户端请求后,再发送响应消息,然后客户端主动断连。”.

    基本上可以推断是,你用shutdown(write 或者 read_write)后,你的客户端由于某种原因断链,你的发送进程收到这样的消息。

    我们来试验下我们的猜想:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
$ cat test.erl
-module(test).
-compile(export_all).
 
start() ->
    start(1234).
 
start(Port) ->
    register(?MODULE, self()),
 
    spawn_link(fun ()-> S= listen(Port), accept(S) end),
 
    receive Any -> io:format("~p~n", [Any]) end.  %% to stop: test!stop.
 
listen(Port) ->
    Opts = [{active, false},
            binary,
            {backlog, 256},
            {packet, raw},
            {reuseaddr, true}],
 
    {ok, S} = gen_tcp:listen(Port, Opts),
    S.
 
accept(S) ->
    case gen_tcp:accept(S) of
        {ok, Socket} -> inet:setopts(Socket, [{exit_on_close, true},
                                              {delay_send,true}]),
                        spawn_opt(?MODULE, entry, [Socket], []);
        Error    -> erlang:error(Error)
    end,
    accept(S).
 
entry(S)->
        loop(S),
        check_empty_out_q_msg(1000),
        io:format("bye socket ~p~n",[S]),
        ok.
 
check_empty_out_q_msg(Timeout)->
    receive
        Any -> io:format("bingo, got message ~p~n", [Any]), Any
    after Timeout -> cont end.
 
loop(S) ->
    check_empty_out_q_msg(0),
 
    case gen_tcp:recv(S, 0) of
        {ok, <<"start", _/binary>>}->
                io:format("start to reproduce {empty_out_q, Port} message ~n",[]),
                gen_tcp:send(S, lists:duplicate(1024*1024, "A")),
                io:format("sent 1M bytes ~n",[]),
 
                io:format("sleep 1s ~n",[]),
                receive Any1 -> Any1 after 1000 -> cont end,
 
                loop(S);
 
        {ok, _Data} ->
            io:format("shutdown(write) ~n",[]),
            {ok, [{send_pend, N}]}=inet:getstat(S, [send_pend]),
            gen_tcp:shutdown(S, write),
            {ok, [{send_pend, N1}]}=inet:getstat(S, [send_pend]),
            io:format("5s send_pend ~w/~w ~n",[N,N1]),
            loop(S);
 
        Error ->
            io:format("tcp ~p~n", [Error]),
            Error
    end.
 
$ cat client.erl
-module(client).
-export([start/0]).
 
start()->
        {ok,Sock} = gen_tcp:connect("localhost", 1234, [{active,false}]),
 
        gen_tcp:send(Sock, "start"),
        io:format("send start~n",[]),
 
        gen_tcp:recv(Sock,1024),
        io:format("drain 1024 bytes~n",[]),                                  
 
        gen_tcp:send(Sock, "bang"),
        io:format("send bang~n",[]),                                  
 
        io:format("sleep 10s~n",[]),
        receive
                Any -> Any
        after 10000 -> cont end,
 
        gen_tcp:shutdown(Sock, write),
 
        io:format("end~n",[]),
        ok.
 
$ erlc test.erl client.erl

    在一个终端运行:

1
2
3
4
5
6
7
8
9
$ erl -noshell -s test
start to reproduce {empty_out_q, Port} message
sent 1M bytes
sleep 1s
shutdown(write)
5s send_pend 851968/851968
tcp {error,closed}
bingo, got message {empty_out_q,#Port<0.456>}
bye socket #Port<0.456>

    在另外一个终端运行:

1
2
3
4
5
6
$ erl -noshell -s client
send start
drain 1024 bytes
send bang
sleep 10s
end

    中间我们可以看到:

1
2
3
4
$ ss
State       Recv-Q Send-Q                                                 Local Address:Port                                                     Peer Address:Port
ESTAB       0      130944                                                     127.0.0.1:search-agent                                                     127.0.0.1:43273
...

    发送队列大量的数据阻塞。

    结果验证了我们的猜想!这个故事告诉我们:源码是最权威的。

建议继续学习:

  1. gen_tcp发送进程被挂起起因分析及对策    (阅读:37018)
  2. gen_tcp发送缓冲区以及水位线问题分析    (阅读:5336)
  3. whatsapp深度使用Erlang有感    (阅读:4717)
  4. Erlang match_spec引擎介绍和应用    (阅读:4645)
  5. php-erlang    (阅读:4354)
  6. hibernate使用注意事项    (阅读:3272)
  7. Erlang linkin driver用port_control方式时的一些经验分享    (阅读:3011)
  8. Erlang如何限制节点对集群的访问之net_kernel:allow    (阅读:3144)
  9. ERLANG OTP源码分析 – gen_server    (阅读:2937)
  10. erlang学习手记    (阅读:2747)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1