技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 其他 --> gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析

gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析

浏览:3562次  出处信息
    今天有同学在gmail里面问了一个Erlang的问题,问题描述的非常好, 如下:

     问题的背景是:

     1、我开发了一个服务端程序,接收客户端的连接。同一时刻会有多个客户端来连接,连接后,接收客户端请求后,再发送响应消息,然后客户端主动断连。

     2、服务端监听的socket属性设置如下:

     [binary, {packet, raw},

     {ip, IPAddr}, {backlog, 10000},

     {active, false}, {reuseaddr, true},

     {nodelay, false}, {delay_send, true},

     {recbuf, 128 * 1024}, {sndbuf, 64 * 1024}]

     3、服务器accept监听socket,接收客户端请求,发送响应消息分别是在3个不同的进程中进行。接收请求和发送响应的进程都是重复使用的,每次重新使用的时候传入一个新accept的socket。

     问题的现象是:

     1、单个用户发起呼叫的时候,流程是成功的,服务器能正常响应。但是多个用户一起呼,批量跑的时候,跑一段时间后,部分客户端会发现不能接收到服务器返回的响应。从抓包来看,客户端的请求是发送到服务器端了。

     2、服务器这边发送响应的进程会收到一条{empty_out_q, #Port}这样的消息,而这条消息并不是我开发的代码产生的。

     问题是:

     1、为什么发消息的进程会收到{empty_out_q, #Port}这样的消息?

     2、收到empty_out_q消息,是不是就说明调用gen_tcp:send发送失败?

     3、是不是说设置了delay_send的属性,所以即使send失败,也是异步的,在调用send的时候会马上返回ok,但是后面真的发送失败后,则系统会给调用send方法的进程发送一条{empty_out_q, #Port}这样的消息。

    这个问题非常有意思。

    首先我来解释下erts内部inet_drv工作原理:gen_tcp:send的时候在正常情况下,底层的驱动会马上调用操作系统的send去发送数据,如果一次没发完数据,会把数据暂存在驱动的发送队列里面,同时向epoll等事件检查器登记socket写事件,然后等待socket的可写事件的发生。 delay_send选项的的作用是不尝试立即发送,而是把数据都暂存在驱动的发送队列,然后等待可写, 从而可以批量发送数据,提高效率。

    有了这样的机制后,那么在gen_tcp:close或者shutdown的时候,要保证这些暂存的数据先发送完成。那么在知道目前还有暂存的数据未发送的时候,不会立即执行close或者shutdown操作,而是会先等待这些数据的发送。

    等待过程是这样的,先把调用者进程的pid提交到socket的empty_out_q等待队列去,同时返回目前暂存的数据的字节数。这时候底层的发送过程还在继续,如果在发送的时候,发现数据都发生完毕,就给调用者进程发送一个{empty_out_q, Port}消息,表明可以进行下一步动作。

    参看代码: otp_src_R14B03/erts/preloaded/src/prim_inet.erl: L117

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%% Shutdown(insock(), atom()) -> ok
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% TODO: shutdown equivalent for SCTP
%%
shutdown(S, read) when is_port(S) ->
    shutdown_2(S, 0);
shutdown(S, write) when is_port(S) ->
    shutdown_1(S, 1);
shutdown(S, read_write) when is_port(S) ->
    shutdown_1(S, 2).

shutdown_1(S, How) ->
    case subscribe(S, [subs_empty_out_q]) of
        {ok,[{subs_empty_out_q,N}]} when N > 0 ->
            shutdown_pend_loop(S, N);   %% wait for pending output to be sent
        _Other -> ok
    end,
    shutdown_2(S, How).

shutdown_2(S, How) ->
    case ctl_cmd(S, ?TCP_REQ_SHUTDOWN, [How]) of
   {ok, []} -> ok;
        Error -> Error
    end.

shutdown_pend_loop(S, N0) ->
    receive
        {empty_out_q,S} -> ok
    after ?INET_CLOSE_TIMEOUT ->
            case getstat(S, [send_pend]) of
                {ok,[{send_pend,N0}]} -> ok;
                {ok,[{send_pend,N}]} -> shutdown_pend_loop(S, N);
                _ -> ok
            end
    end.

    如果超过5秒,都没有收到{empty_out_q, Port}消息,那么就看下目前的暂存的数据的字节数, 并且和最初的暂存的数据的字节数比较,如果一直没变的话,那么说明由于种种原因,把这些数据发送出去比较没希望,那么他就果断的继续下一步动作。如果字节数在变少的话,那么就继续等。

    现在问题来了。shutdown 在发现暂存的数据没有希望发出去的时候,选择不作为,那么作为后遗症,caller被登记在通知名单里面。过一段时间,要不数据真的被发出去了,要不就是发生以下事件: 1. socket发现读错误。2. socket发现写错误。 3. 对端关闭。 4. 宿主进程退出,socket作为一个port被强行关闭,这时候需要清空发送缓冲区,这时候会同时给caller发送{empty_out_q, Port}消息。

    有图有真相,我们让代码说话。

    参看代码: otp_src_R14B03/erts/emulator/drivers/common/inet_drv.c:

// L7100 这个函数负责给调用者发送消息empty_out_q
static void
send_empty_out_q_msgs(inet_descriptor* desc)
{
  ErlDrvTermData msg[6];
  int msg_len = 0;

  if(NO_SUBSCRIBERS(&desc->empty_out_q_subs))
    return;

  msg_len = LOAD_ATOM(msg, msg_len, am_empty_out_q);
  msg_len = LOAD_PORT(msg, msg_len, desc->dport);
  msg_len = LOAD_TUPLE(msg, msg_len, 2);

  ASSERT(msg_len == sizeof(msg)/sizeof(*msg));

  send_to_subscribers(desc->port,
                      &desc->empty_out_q_subs,
                      1,
                      msg,
                      msg_len);
}

//那么谁会调用他呢: tcp_clear_output和tcp_inet_output.

static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
else if (IS_CONNECTED(INETP(desc))) {
        for (;;) {
            int vsize;
            int n;
            SysIOVec* iov;

            if ((iov = driver_peekq(ix, &vsize)) == NULL) {
                sock_select(INETP(desc), FD_WRITE, 0);
                send_empty_out_q_msgs(INETP(desc));
                goto done;
            }
            vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
            DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\\r\\n",
                    (long)desc->inet.port, desc->inet.s, vsize));
            if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, iov, vsize, &n, 0))) {
                if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
                    DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\\r\\n",
                            (long)desc->inet.port, vsize, sock_errno()));
                    ret =  tcp_send_error(desc, sock_errno());
                    goto done;
                }
#ifdef __WIN32__
                desc->inet.send_would_block = 1;
#endif
                goto done;
...
}

//以及:

/* clear QUEUED output */
static void tcp_clear_output(tcp_descriptor* desc)
{
    ErlDrvPort ix  = desc->inet.port;
    int qsz = driver_sizeq(ix);

    driver_deq(ix, qsz);
    send_empty_out_q_msgs(INETP(desc));
}

//谁调用 tcp_clear_output呢?
//tcp_inet_flush,
//tcp_recv_closed,
//tcp_recv_error,
//tcp_send_error,

//特别是tcp_recv_closed函数

/* The peer socket has closed, cleanup and send event */
static int tcp_recv_closed(tcp_descriptor* desc)
{
#ifdef DEBUG
    long port = (long) desc->inet.port; /* Used after driver_exit() */
#endif
    DEBUGF(("tcp_recv_closed(%ld): s=%d, in %s, line %d\\r\\n",
            port, desc->inet.s, __FILE__, __LINE__));
    if (IS_BUSY(INETP(desc))) {
        /* A send is blocked */
        desc->inet.caller = desc->inet.busy_caller;
        tcp_clear_output(desc);
        if (desc->busy_on_send) {
            driver_cancel_timer(desc->inet.port);
            desc->busy_on_send = 0;
            DEBUGF(("tcp_recv_closed(%ld): busy on send\\r\\n", port));
        }
        desc->inet.state &= ~INET_F_BUSY;
        set_busy_port(desc->inet.port, 0);
        inet_reply_error_am(INETP(desc), am_closed);
        DEBUGF(("tcp_recv_closed(%ld): busy reply \'closed\'\\r\\n", port));
    }
    if (!desc->inet.active) {
        /* We must cancel any timer here ! */
        driver_cancel_timer(desc->inet.port);
        /* passive mode do not terminate port ! */
        tcp_clear_input(desc);
        if (desc->inet.exitf) {
            tcp_clear_output(desc);
            desc_close(INETP(desc));
        } else {
            desc_close_read(INETP(desc));
        }
        async_error_am_all(INETP(desc), am_closed);
        /* next time EXBADSEQ will be delivered  */
        DEBUGF(("tcp_recv_closed(%ld): passive reply all \'closed\'\\r\\n", port));
    } else {
        tcp_clear_input(desc);
        tcp_closed_message(desc);
        if (desc->inet.exitf) {
            driver_exit(desc->inet.port, 0);
        } else {
            desc_close_read(INETP(desc));
        }
        DEBUGF(("tcp_recv_closed(%ld): active close\\r\\n", port));
    }
    DEBUGF(("tcp_recv_closed(%ld): done\\r\\n", port));
    return -1;
}
/* 我们看到在:
1. 对端关闭的时候,
2. 而且我端是被动接收,
3. socket打开这二个选项的时候最容易{exit_on_close, true}, {delay_send,true}
最容易发生上面的现象
*/

    只有这种情况你的进程会收到该消息。

    从你的描述来看

    “同一时刻会有多个客户端来连接,连接后,接收客户端请求后,再发送响应消息,然后客户端主动断连。”.

    基本上可以推断是,你用shutdown(write 或者 read_write)后,你的客户端由于某种原因断链,你的发送进程收到这样的消息。

    我们来试验下我们的猜想:

$ cat test.erl
-module(test).
-compile(export_all).

start() ->
    start(1234).

start(Port) ->
    register(?MODULE, self()),

    spawn_link(fun ()-> S= listen(Port), accept(S) end),

    receive Any -> io:format("~p~n", [Any]) end.  %% to stop: test!stop.

listen(Port) ->
    Opts = [{active, false},
            binary,
            {backlog, 256},
            {packet, raw},
            {reuseaddr, true}],

    {ok, S} = gen_tcp:listen(Port, Opts),
    S.

accept(S) ->
    case gen_tcp:accept(S) of
        {ok, Socket} -> inet:setopts(Socket, [{exit_on_close, true},
                                              {delay_send,true}]),
                        spawn_opt(?MODULE, entry, [Socket], []);
        Error    -> erlang:error(Error)
    end,
    accept(S).

entry(S)->
        loop(S),
        check_empty_out_q_msg(1000),
        io:format("bye socket ~p~n",[S]),
        ok.

check_empty_out_q_msg(Timeout)->
    receive
        Any -> io:format("bingo, got message ~p~n", [Any]), Any
    after Timeout -> cont end.

loop(S) ->
    check_empty_out_q_msg(0),

    case gen_tcp:recv(S, 0) of
        {ok, <<"start", _/binary>>}->
                io:format("start to reproduce {empty_out_q, Port} message ~n",[]),
                gen_tcp:send(S, lists:duplicate(1024*1024, "A")),
                io:format("sent 1M bytes ~n",[]),

                io:format("sleep 1s ~n",[]),
                receive Any1 -> Any1 after 1000 -> cont end,

                loop(S);

        {ok, _Data} ->
            io:format("shutdown(write) ~n",[]),
            {ok, [{send_pend, N}]}=inet:getstat(S, [send_pend]),
            gen_tcp:shutdown(S, write),
            {ok, [{send_pend, N1}]}=inet:getstat(S, [send_pend]),
            io:format("5s send_pend ~w/~w ~n",[N,N1]),
            loop(S);

        Error ->
            io:format("tcp ~p~n", [Error]),
            Error
    end.

$ cat client.erl
-module(client).
-export([start/0]).

start()->
        {ok,Sock} = gen_tcp:connect("localhost", 1234, [{active,false}]),

        gen_tcp:send(Sock, "start"),
        io:format("send start~n",[]),

        gen_tcp:recv(Sock,1024),
        io:format("drain 1024 bytes~n",[]),                                   

        gen_tcp:send(Sock, "bang"),
        io:format("send bang~n",[]),                                   

        io:format("sleep 10s~n",[]),
        receive
                Any -> Any
        after 10000 -> cont end,

        gen_tcp:shutdown(Sock, write),

        io:format("end~n",[]),
        ok.

$ erlc test.erl client.erl

    在一个终端运行:

$ erl -noshell -s test
start to reproduce {empty_out_q, Port} message
sent 1M bytes
sleep 1s
shutdown(write)
5s send_pend 851968/851968
tcp {error,closed}
bingo, got message {empty_out_q,#Port<0.456>}
bye socket #Port<0.456>

    在另外一个终端运行:

$ erl -noshell -s client
send start
drain 1024 bytes
send bang
sleep 10s
end

    中间我们可以看到:

$ ss
State       Recv-Q Send-Q                                                 Local Address:Port                                                     Peer Address:Port
ESTAB       0      130944                                                     127.0.0.1:search-agent                                                     127.0.0.1:43273
...

    发送队列大量的数据阻塞。

    结果验证了我们的猜想!这个故事告诉我们:源码是最权威的。

建议继续学习:

  1. gen_tcp发送进程被挂起起因分析及对策    (阅读:36992)
  2. gen_tcp发送缓冲区以及水位线问题分析    (阅读:5293)
  3. whatsapp深度使用Erlang有感    (阅读:4671)
  4. Erlang match_spec引擎介绍和应用    (阅读:4607)
  5. php-erlang    (阅读:4329)
  6. hibernate使用注意事项    (阅读:3241)
  7. Erlang linkin driver用port_control方式时的一些经验分享    (阅读:2982)
  8. Erlang如何限制节点对集群的访问之net_kernel:allow    (阅读:3067)
  9. ERLANG OTP源码分析 – gen_server    (阅读:2884)
  10. erlang学习手记    (阅读:2716)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1