技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 源码分析 --> gen_tcp接收缓冲区易混淆概念纠正

gen_tcp接收缓冲区易混淆概念纠正

浏览:755次  出处信息

Erlang的每个TCP网络链接是由相应的gen_tcp对象来表示的,说白了就是个port, 实现Erlang网络相关的逻辑,其实现代码位于erts/emulator/drivers/common/inet_drv.c

参照inet:setopts文档,它有三个buffer相关的选项,非常让人费解:

{buffer, Size}

Determines the size of the user-level software buffer used by the driver. Not to be confused with sndbuf and recbuf options which correspond to the kernel socket buffers. It is recommended to have val(buffer) >= max(val(sndbuf),val(recbuf)). In fact, the val(buffer) is automatically set to the above maximum when sndbuf or recbuf values are set.

{recbuf, Size}

Gives the size of the receive buffer to use for the socket.

{sndbuf, Size}

Gives the size of the send buffer to use for the socket.

其中sndbuf, recbuf选项比较好理解, 就是设置gen_tcp所拥有的socket句柄的内核的发送和接收缓冲区,从代码可以验证:

/* inet_drv.c */
#define INET_OPT_SNDBUF     6   /* set send buffer size */
#define INET_OPT_RCVBUF     7   /* set receive buffer size */
static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_OPT_SNDBUF:    type = SO_SNDBUF;
            DEBUGF(("inet_set_opts(%ld): s=%d, SO_SNDBUF=%d\r\n",
                    (long)desc->port, desc->s, ival));
            break;
        case INET_OPT_RCVBUF:    type = SO_RCVBUF;
            DEBUGF(("inet_set_opts(%ld): s=%d, SO_RCVBUF=%d\r\n",
                    (long)desc->port, desc->s, ival));
            break;
...
        res = sock_setopt           (desc->s, proto, type, arg_ptr, arg_sz);
...
}

那buffer是什么呢,他们三者之间的关系? 从文档的描述来看:

It is recommended to have val(buffer) >= max(val(sndbuf),val(recbuf)). In fact, the val(buffer) is automatically set to the above maximum when sndbuf or recbuf values are set.


再对照源码:

/* inet_drv.c */
#define INET_LOPT_BUFFER      20  /* min buffer size hint */
static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
       case INET_LOPT_BUFFER:
            DEBUGF(("inet_set_opts(%ld): s=%d, BUFFER=%d\r\n",
                    (long)desc->port, desc->s, ival));
            if (ival < INET_MIN_BUFFER) ival = INET_MIN_BUFFER;
            desc->bufsz = ival;
            continue;
  
        DEBUGF(("inet_set_opts(%ld): s=%d returned %d\r\n",
                (long)desc->port, desc->s, res));
        if (type == SO_RCVBUF) {
            /* make sure we have desc->bufsz >= SO_RCVBUF */
            if (ival > desc->bufsz)
                desc->bufsz = ival;
        }
...
}

我们从源码看到在实现上inet:setopts的二点要素:

1. make sure we have desc->bufsz >= SO_RCVBUF

2. desc->bufsz min buffer size hint

但是关系还是没搞明白。

好吧,通读inet_drv源码,我们可以看出gen_tcp接收包的流程:

1. 当socket上面有数据的时候,epoll会通知到port,最终导致tcp_inet_drv_input被调用。

2. tcp_inet_drv_input 发现如果连接已建立,就会调用tcp_recv来处理网络封包。

3. tcp_recv在调用sock_recv真正准备接收数据包前:

  a. 如果发现接收缓冲区是空的话,会分配一个缓冲区。如果包大小已知,缓冲区大小就是包大小,否则的话为desc->bufsz。

  b. 如果缓冲区非空,这时候看看是否已经收了一个以上完整的包,如果是就通过tcp_deliver往上层投递包,投递后如果缓冲区里面除了完整包以外,没有其他数据的话,就会调用tcp_clear_input把输入缓冲区释放掉。

  c. 如果缓冲区非空,而且缓冲区的大小无法容纳包的话,就会调用tcp_expand_buffer来把缓冲区扩大到包大小。

4. 接收好网络封包后,根据packet类型进行进一步整理,投递给上层,同时释放接收缓存区。

/* clear CURRENT input buffer */
static void tcp_clear_input(tcp_descriptor* desc)
{
    if (desc->i_buf != NULL)
        free_buffer(desc->i_buf);
    desc->i_buf = NULL;
    desc->i_remain    = 0;
    desc->i_ptr       = NULL;
    desc->i_ptr_start = NULL;
    desc->i_bufsz     = 0;
}
/*                                                                                                                        
** Set new size on buffer, used when packet size is determined                                                            
** and the buffer is to small.                                                                                            
** buffer must have a size of at least len bytes (counting from ptr_start!)                                               
*/
static int tcp_expand_buffer(tcp_descriptor* desc, int len)
{
   ...
    if (desc->i_bufsz >= ulen) /* packet will fit */
        return 0;
    else if (desc->i_buf->orig_size >= ulen) { /* buffer is large enough */
        desc->i_bufsz = ulen;  /* set "virtual" size */
        return 0;
    }
  
    offs1 = desc->i_ptr_start - desc->i_buf->orig_bytes;
    offs2 = desc->i_ptr - desc->i_ptr_start;
  
    if ((bin = driver_realloc_binary(desc->i_buf, ulen)) == NULL)
        return -1;
  
    desc->i_buf = bin;
    desc->i_ptr_start = bin->orig_bytes + offs1;
    desc->i_ptr       = desc->i_ptr_start + offs2;
...
}
/*                                                                                                                        
** Deliver all packets ready                                                                                              
** if len == 0 then check start with a check for ready packet                                                             
*/
static int tcp_deliver(tcp_descriptor* desc, int len)
{
...
    while (len > 0) {
        int code;
  
        inet_input_count(INETP(desc), len);
  
        /* deliver binary? */
        if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */
            code = tcp_reply_binary_data(desc, desc->i_buf,
                                         (desc->i_ptr_start -
                                          desc->i_buf->orig_bytes),
                                         len);
            if (code < 0)
                return code;
  
            /* something after? */
            if (desc->i_ptr_start + len == desc->i_ptr) { /* no */
                tcp_clear_input(desc);
            }
            else { /* move trail to beginning of a new buffer */
                ErlDrvBinary* bin = alloc_buffer(desc->i_bufsz);
                char* ptr_end = desc->i_ptr_start + len;
                int sz = desc->i_ptr - ptr_end;
  
                memcpy(bin->orig_bytes, ptr_end, sz);
                free_buffer(desc->i_buf);
                desc->i_buf = bin;
                desc->i_ptr_start = desc->i_buf->orig_bytes;
                desc->i_ptr = desc->i_ptr_start + sz;
                desc->i_remain = 0;
...
}
}

除了上面的逻辑外,这里需要强调几点:

1. 默认情况下gen_tcp建立的时候,接收缓冲区是空的。

2. 接收完整的包投递后,释放接收缓冲区。

3. 接收缓冲区大小由包的大小决定,如果包未知,由desc->bufsz决定。

4. INET_LOPT_BUFFER仅仅影响接收缓冲区,发送无需缓冲区,因为发送的时候,sendv可以直接发送队列里面的数据。

5. INET_LOPT_BUFFER只是给个缓冲区大小的hint, 而非强制。

分析到这里为止,我们可以把这三个缓冲区的概念搞清楚了。接下来就是如何用好这些缓冲区的实践了:

1. INET_LOPT_BUFFER由于指示的是inet_drv这个层面接收缓冲区的默认大小,所以这个缓冲区最好是比操作内核SO_RCVBUF指示的接收缓冲区要大。

2. INET_LOPT_BUFFER只是个hint, 在包大小未知的情况下,影响接收缓冲区的大小,而如果要接收的包大于接收缓冲区的时候,就要扩展缓冲区,通过realloc来实现的。所以通过统计包的平均大小,设置一个比较合理的hint, 减少expand缓冲区的发生。inets:getstat(Socket, [recv_avg]). 可以帮我们统计到平均包大小。

这里还需要指出个问题,通过前面的分析,我们知道接收缓冲区不停的分配,释放,这对内存分配器造成很大的压力。 所以inet_drv实现了一套小型的内存分配池。为了减少冲突,每个CPU一个分配池. 每个池维护最近使用的buffer, 达到最快分配到buffer的目的。

参看代码如下:

static ErlDrvBinary* alloc_buffer(ErlDrvSizeT minsz)
{
    InetDrvBufStk *bs = get_bufstk();
    if (bs && bs->buf.pos > 0) {
        long size;
        ErlDrvBinary* buf = bs->buf.stk[--bs->buf.pos];
        size = buf->orig_size;
        bs->buf.mem_size -= size;
  
        if (size >= minsz)
            return buf;
  
        driver_free_binary(buf);
    }
  
    return driver_alloc_binary(minsz);
}
  
static void release_buffer(ErlDrvBinary* buf)
{
...
    bs = get_bufstk();
    if (!bs
        || (bs->buf.mem_size + size > BUFFER_STACK_MAX_MEM_SIZE)
        || (bs->buf.pos >= BUFFER_STACK_SIZE)) {
    free_binary:
        driver_free_binary(buf);
    }
    else {
        bs->buf.mem_size += size;
        bs->buf.stk[bs->buf.pos++] = buf;
    }
...
}

有了高速的内存分配器,gen_tcp的接收缓冲区的管理的代价就不算太大。gen_tcp这样设计接收缓冲区的目的是为了能够在大量网络链接的情况下,尽可能的节约内存,典型的用时间换空间的设计。

小结: 源码是最好的答案,文档不是。

建议继续学习:

  1. gen_tcp发送进程被挂起起因分析及对策    (阅读:36961)
  2. gen_tcp发送缓冲区以及水位线问题分析    (阅读:5199)
  3. gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析    (阅读:3530)
  4. gen_tcp容易误用的一点解释    (阅读:2626)
  5. gen_tcp如何限制封包大小    (阅读:2446)
  6. pdflush 相关    (阅读:2341)
  7. 思考mysql内核之初级系列4--innodb缓冲区管理    (阅读:2337)
  8. RAID卡MTRR的RAID模式write-combining    (阅读:2135)
  9. 未公开的gen_tcp:unrecv以及接收缓冲区行为分析    (阅读:1945)
  10. gen_tcp接受链接时enfile的问题分析及解决    (阅读:1611)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2024 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1