技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 算法 --> 未公开的gen_tcp:unrecv以及接收缓冲区行为分析

未公开的gen_tcp:unrecv以及接收缓冲区行为分析

浏览:1956次  出处信息
    gen_tcp:unrecv是个未公开的函数,作用是往tcp的接收缓冲区里面填入指定的数据。别看这小小的函数,用起来很舒服的。

     我们先看下它的代码实现,Erlang代码部分:

%%gen_tcp.erl:L299
unrecv(S, Data) when is_port(S) ->
    case inet_db:lookup_socket(S) of
        {ok, Mod} ->
            Mod:unrecv(S, Data);
        Error ->
            Error
    end.
%%inet_tcp.erl:L58
unrecv(Socket, Data) -> prim_inet:unrecv(Socket, Data).

%%prim_inet.erl:L983
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%% UNRECV(insock(), data) -> ok | {error, Reason}
%%
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
unrecv(S, Data) ->
    case ctl_cmd(S, ?TCP_REQ_UNRECV, Data) of
        {ok, _} -> ok;
        Error  -> Error
    end.

    运行期c代码部分:

    

//inet_drv.c:L8123
/* TCP requests from Erlang */
static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len,
                        char** rbuf, int rsize)
{
...
    case TCP_REQ_UNRECV: {
        DEBUGF(("tcp_inet_ctl(%ld): UNRECV\\r\\n", (long)desc->inet.port));
        if (!IS_CONNECTED(INETP(desc)))
            return ctl_error(ENOTCONN, rbuf, rsize);
        tcp_push_buffer(desc, buf, len);
        if (desc->inet.active)
            tcp_deliver(desc, 0);
        return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
    }
...
}

static int tcp_push_buffer(tcp_descriptor* desc, char* buf, int len)
{
    ErlDrvBinary* bin;

    if (desc->i_buf == NULL) {
        bin = alloc_buffer(len);
        sys_memcpy(bin->orig_bytes, buf, len);
        desc->i_buf = bin;
        desc->i_bufsz = len;
        desc->i_ptr_start = desc->i_buf->orig_bytes;
        desc->i_ptr = desc->i_ptr_start + len;
    }
    else {
        char* start =  desc->i_buf->orig_bytes;
        int sz_before = desc->i_ptr_start - start;
        int sz_filled = desc->i_ptr - desc->i_ptr_start;

        if (len <= sz_before) {
            sys_memcpy(desc->i_ptr_start - len, buf, len);
            desc->i_ptr_start -= len;
        }
        else {
            bin = alloc_buffer(desc->i_bufsz+len);
            sys_memcpy(bin->orig_bytes, buf, len);
            sys_memcpy(bin->orig_bytes+len, desc->i_ptr_start, sz_filled);
            free_buffer(desc->i_buf);
            desc->i_bufsz += len;
            desc->i_buf = bin;
            desc->i_ptr_start = bin->orig_bytes;
            desc->i_ptr = desc->i_ptr_start + sz_filled + len;
        }
    }
    desc->i_remain = 0;
    return 0;
}

    实现上很简单,就是透过tcp ctl命令往驱动接收缓冲区里面填数据。

     但是什么是gen_tcp接收缓冲区, 它的大小是多大呢?

     在回答这个问题之前,我们先看下inet:setopts文档,参见这里

    setopts(Socket, Options) -> ok | {error, posix()}

    Types:

    Socket = term()

     Options = [{Opt, Val} | {raw, Protocol, Option, ValueBin}]

     Protocol = integer()

     OptionNum = integer()

     ValueBin = binary()

     Opt, Val ― see below

     Sets one or more options for a socket. The following options are available:

    {active, true | false | once}

     If the value is true, which is the default, everything received from the socket will be sent as messages to the receiving process. If the value is false (passive mode), the process must explicitly receive incoming data by calling gen_tcp:recv/2,3 or gen_udp:recv/2,3 (depending on the type of socket).

    If the value is once ({active, once}), one data message from the socket will be sent to the process. To receive one more message, setopts/2 must be called again with the {active, once} option.

    When using {active, once}, the socket changes behaviour automatically when data is received. This can sometimes be confusing in combination with connection oriented sockets (i.e. gen_tcp) as a socket with {active, false} behaviour reports closing differently than a socket with {active, true} behaviour. To make programming easier, a socket where the peer closed and this was detected while in {active, false} mode, will still generate the message {tcp_closed,Socket} when set to {active, once} or {active, true} mode. It is therefore safe to assume that the message {tcp_closed,Socket}, possibly followed by socket port termination (depending on the exit_on_close option) will eventually appear when a socket changes back and forth between {active, true} and {active, false} mode. However, when peer closing is detected is all up to the underlying TCP/IP stack and protocol.

    Note that {active,true} mode provides no flow control; a fast sender could easily overflow the receiver with incoming messages. Use active mode only if your high-level protocol provides its own flow control (for instance, acknowledging received messages) or the amount of data exchanged is small. {active,false} mode or use of the {active, once} mode provides flow control; the other side will not be able send faster than the receiver can read.

     {packet, PacketType}(TCP/IP sockets)

     Defines the type of packets to use for a socket. The following values are valid:

    raw | 0

     No packaging is done.

    1 | 2 | 4

     Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.

    In current implementation the 4-byte header is limited to 2Gb.

    asn1 | cdr | sunrm | fcgi | tpkt | line

     These packet types only have effect on receiving. When sending a packet, it is the responsibility of the application to supply a correct header. On receiving, however, there will be one message sent to the controlling process for each complete packet received, and, similarly, each call to gen_tcp:recv/2,3 returns one complete packet. The header is not stripped off.

    The meanings of the packet types are as follows:

     asn1 - ASN.1 BER,

     sunrm - Sun’s RPC encoding,

     cdr - CORBA (GIOP 1.1),

     fcgi - Fast CGI,

     tpkt - TPKT format [RFC1006],

     line - Line mode, a packet is a line terminated with newline, lines longer than the receive buffer are truncated.

    http | http_bin

     The Hypertext Transfer Protocol. The packets are returned with the format according to HttpPacket described in erlang:decode_packet/3. A socket in passive mode will return {ok, HttpPacket} from gen_tcp:recv while an active socket will send messages like {http, Socket, HttpPacket}.

    Note that the packet type httph is not needed when reading from a socket.

    {packet_size, Integer}(TCP/IP sockets)

     Sets the max allowed length of the packet body. If the packet header indicates that the length of the packet is longer than the max allowed length, the packet is considered invalid. The same happens if the packet header is too big for the socket receive buffer.

    {recbuf, Integer}

     Gives the size of the receive buffer to use for the socket.

    我把和接收缓冲区大小有关的参数都列出来了,结合inet_drv.c的代码:

#define INET_DEF_BUFFER     1460        /* default buffer size */
#define INET_MIN_BUFFER     1           /* internal min buffer */

#define TCP_MAX_PACKET_SIZE 0x4000000  /* 64 M */

/* LOPT is local options */
#define INET_LOPT_BUFFER      20  /* min buffer size hint */

typedef struct {
...
       unsigned int psize;         /* max packet size (TCP only?) */
...
       int   bufsz;                /* minimum buffer constraint */
...
} inet_descriptor;

typedef struct {
    inet_descriptor inet;       /* common data structure (DON\'T MOVE) */
  ...
    int   i_bufsz;              /* current input buffer size (<= bufsz) */
    ErlDrvBinary* i_buf;        /* current binary buffer */
    char*         i_ptr;        /* current pos in buf */
    char*         i_ptr_start;  /* packet start pos in buf */
    int           i_remain;     /* remaining chars to read */
   ...
} tcp_descriptor;

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_LOPT_BUFFER:
            DEBUGF(("inet_set_opts(%ld): s=%d, BUFFER=%d\\r\\n",
                    (long)desc->port, desc->s, ival));
            if (ival < INET_MIN_BUFFER) ival = INET_MIN_BUFFER;
            desc->bufsz = ival;
            continue;
...
	if (type == SO_RCVBUF) {
            /* make sure we have desc->bufsz >= SO_RCVBUF */
            if (ival > desc->bufsz)
                desc->bufsz = ival;
        }
...
        case INET_LOPT_PACKET_SIZE:
            DEBUGF(("inet_set_opts(%ld): s=%d, PACKET_SIZE=%d\\r\\n",
                    (long)desc->port, desc->s, ival));
            desc->psize = (unsigned int)ival;
            continue;
...
}

static int inet_fill_opts(inet_descriptor* desc,
                          char* buf, int len, char** dest, int destlen)
{
...
case INET_OPT_RCVBUF:
            type = SO_RCVBUF;
            break;
...
}

    从文档和代码结合中可以看出,一个TCP报文的最大大小由{packet_size, Integer}决定,最大不超过64M. 每个TCP报文由一定的头,比如(1 | 2 | 4)字节的报文长度,由{packet, PacketType}决定。

    那么接收缓冲区的默认大小多大呢?

     未公开的{buffer,Integer}选项:

enc_opt(buffer)          -> ?INET_LOPT_BUFFER;

    以及{recbuf, Integer},接收缓冲区的默认大小取他们中间最大的一个,在默认情况下是1460,一个TCP段的大小。

    那么接收缓存区的大小会如何变化呢?

     我们知道gen_tcp由主动和被动模式,在主动模式下报文会根据packettype来准备好报文体,扔给宿主进程,在这种情况下,如果接收缓冲区比报文的长度小的话,那么就把缓冲区扩展到该报文的大小,然后等待所有的报文体都接收完毕就好。

    那么什么时候,把缓冲区主动搞小呢?

/*
** Deliver all packets ready
** if len == 0 then check start with a check for ready packet
*/
static int tcp_deliver(tcp_descriptor* desc, int len)
{
...
 while (len > 0) {
        int code = 0;

        inet_input_count(INETP(desc), len);

        /* deliver binary? */
        if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */
            /* something after? */
            if (desc->i_ptr_start + len == desc->i_ptr) { /* no */
                code = tcp_reply_binary_data(desc, desc->i_buf,
                                             (desc->i_ptr_start -
                                              desc->i_buf->orig_bytes),
                                             len);
                tcp_clear_input(desc);
            }
     else { /* move trail to beginning of a new buffer */
                ErlDrvBinary* bin;
                char* ptr_end = desc->i_ptr_start + len;
                int sz = desc->i_ptr - ptr_end;

                bin = alloc_buffer(desc->i_bufsz);
                memcpy(bin->orig_bytes, ptr_end, sz);

                code = tcp_reply_binary_data(desc, desc->i_buf,
                                             (desc->i_ptr_start-
                                              desc->i_buf->orig_bytes),
                                             len);
                free_buffer(desc->i_buf);
                desc->i_buf = bin;
                desc->i_ptr_start = desc->i_buf->orig_bytes;
                desc->i_ptr = desc->i_ptr_start + sz;
                desc->i_remain = 0;
            }
        }
   else {
            code = tcp_reply_data(desc, desc->i_ptr_start, len);
            /* XXX The buffer gets thrown away on error  (code < 0)    */
            /* Windows needs workaround for this in tcp_inet_event...  */
            desc->i_ptr_start += len;
            if (desc->i_ptr_start == desc->i_ptr)
                tcp_clear_input(desc);
            else
                desc->i_remain = 0;

        }

    当接收到的报文提交出去的时候,长度大于75%缓冲区的长度或者缓冲区刚好只有这个报文的时候,就把缓冲区释放掉。

    被动模式则由用户指定要接收多少数据,如果指定0,则表明有多少要多少。那么这种情况下,缓冲区的长度会取报文的长度和用户要求的长度中的最大值。

    根据分析,我们知道接收缓冲区占用的内存比较复杂,我们如果在程序中要精确的控制内存,需要调整上面的参数。

    接下来我们看下分析下unrecv的用途,首先我们参考下misultin小型的erlang web服务器,项目在 这里

     这个项目就很灵活的使用了packet类型和active模式的结合来利用erts已有的协议分析,我简单的演示如下:

     grep下代码,可以看到:

$ grep -rin setopts .
./misultin_socket.erl:106:	inet:setopts(Sock, [{active, once}]),
./misultin_socket.erl:130:	inet:setopts(Sock, [{active, once}]),
./misultin_socket.erl:194:      inet:setopts(Sock, [{packet, raw}, {active, false}]),
./misultin_socket.erl:202:	inet:setopts(Sock, [{packet, http}]),
$ grep -rin gen_tcp:recv  .
./misultin_socket.erl:195:      case gen_tcp:recv(Sock, Len, RecvTimeout) of

    各位可以参考下它的实现,很值得学习!

    我还是举个例子来演示unrecv的使用:

     比如某个报文是用 [报文体]+ 回车行+,类似底下这样的报文

    [demo]

     line1

     line2

     line3

    由于事先不知道报文的准确长度,我们就设成{packet,raw}, {active, false},

     先读入一段报文进行分析,分析出报文的‘[’和‘]’,就知道报文的大小,剩下的报文因为inet_drv支持行分割的报文,我们就无需自己动手分析了。

     但是我们预读取的数据可能会超出报文的大小,部分回车行分割的数据已经被读取出来了,利用{packet, line}来分析就不正确了。所以我们用unrecv把这段数据还回去,就可以了,方便之门就打开了。

    总结:分析了gen_tcp接收缓冲区的工作原理以及影响大小的因素,还顺便介绍了unrecv的用途。

    gen_tcp以及发送缓冲区相关的文章请参见这里

建议继续学习:

  1. gen_tcp发送进程被挂起起因分析及对策    (阅读:36971)
  2. gen_tcp发送缓冲区以及水位线问题分析    (阅读:5220)
  3. whatsapp深度使用Erlang有感    (阅读:4569)
  4. Erlang match_spec引擎介绍和应用    (阅读:4528)
  5. php-erlang    (阅读:4308)
  6. gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析    (阅读:3541)
  7. hibernate使用注意事项    (阅读:3218)
  8. Erlang linkin driver用port_control方式时的一些经验分享    (阅读:2965)
  9. Erlang如何限制节点对集群的访问之net_kernel:allow    (阅读:2971)
  10. ERLANG OTP源码分析 – gen_server    (阅读:2856)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2024 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1