未公开的gen_tcp:unrecv以及接收缓冲区行为分析
我们先看下它的代码实现,Erlang代码部分:
%%gen_tcp.erl:L299 unrecv(S, Data) when is_port(S) -> case inet_db:lookup_socket(S) of {ok, Mod} -> Mod:unrecv(S, Data); Error -> Error end. %%inet_tcp.erl:L58 unrecv(Socket, Data) -> prim_inet:unrecv(Socket, Data). %%prim_inet.erl:L983 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% UNRECV(insock(), data) -> ok | {error, Reason} %% %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% unrecv(S, Data) -> case ctl_cmd(S, ?TCP_REQ_UNRECV, Data) of {ok, _} -> ok; Error -> Error end.
运行期c代码部分:
//inet_drv.c:L8123 /* TCP requests from Erlang */ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, char** rbuf, int rsize) { ... case TCP_REQ_UNRECV: { DEBUGF(("tcp_inet_ctl(%ld): UNRECV\\r\\n", (long)desc->inet.port)); if (!IS_CONNECTED(INETP(desc))) return ctl_error(ENOTCONN, rbuf, rsize); tcp_push_buffer(desc, buf, len); if (desc->inet.active) tcp_deliver(desc, 0); return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } ... } static int tcp_push_buffer(tcp_descriptor* desc, char* buf, int len) { ErlDrvBinary* bin; if (desc->i_buf == NULL) { bin = alloc_buffer(len); sys_memcpy(bin->orig_bytes, buf, len); desc->i_buf = bin; desc->i_bufsz = len; desc->i_ptr_start = desc->i_buf->orig_bytes; desc->i_ptr = desc->i_ptr_start + len; } else { char* start = desc->i_buf->orig_bytes; int sz_before = desc->i_ptr_start - start; int sz_filled = desc->i_ptr - desc->i_ptr_start; if (len <= sz_before) { sys_memcpy(desc->i_ptr_start - len, buf, len); desc->i_ptr_start -= len; } else { bin = alloc_buffer(desc->i_bufsz+len); sys_memcpy(bin->orig_bytes, buf, len); sys_memcpy(bin->orig_bytes+len, desc->i_ptr_start, sz_filled); free_buffer(desc->i_buf); desc->i_bufsz += len; desc->i_buf = bin; desc->i_ptr_start = bin->orig_bytes; desc->i_ptr = desc->i_ptr_start + sz_filled + len; } } desc->i_remain = 0; return 0; }
实现上很简单,就是透过tcp ctl命令往驱动接收缓冲区里面填数据。
但是什么是gen_tcp接收缓冲区, 它的大小是多大呢?
在回答这个问题之前,我们先看下inet:setopts文档,参见这里
setopts(Socket, Options) -> ok | {error, posix()}
Types:
Socket = term()
Options = [{Opt, Val} | {raw, Protocol, Option, ValueBin}]
Protocol = integer()
OptionNum = integer()
ValueBin = binary()
Opt, Val ― see below
Sets one or more options for a socket. The following options are available:
{active, true | false | once}
If the value is true, which is the default, everything received from the socket will be sent as messages to the receiving process. If the value is false (passive mode), the process must explicitly receive incoming data by calling gen_tcp:recv/2,3 or gen_udp:recv/2,3 (depending on the type of socket).
If the value is once ({active, once}), one data message from the socket will be sent to the process. To receive one more message, setopts/2 must be called again with the {active, once} option.
When using {active, once}, the socket changes behaviour automatically when data is received. This can sometimes be confusing in combination with connection oriented sockets (i.e. gen_tcp) as a socket with {active, false} behaviour reports closing differently than a socket with {active, true} behaviour. To make programming easier, a socket where the peer closed and this was detected while in {active, false} mode, will still generate the message {tcp_closed,Socket} when set to {active, once} or {active, true} mode. It is therefore safe to assume that the message {tcp_closed,Socket}, possibly followed by socket port termination (depending on the exit_on_close option) will eventually appear when a socket changes back and forth between {active, true} and {active, false} mode. However, when peer closing is detected is all up to the underlying TCP/IP stack and protocol.
Note that {active,true} mode provides no flow control; a fast sender could easily overflow the receiver with incoming messages. Use active mode only if your high-level protocol provides its own flow control (for instance, acknowledging received messages) or the amount of data exchanged is small. {active,false} mode or use of the {active, once} mode provides flow control; the other side will not be able send faster than the receiver can read.
{packet, PacketType}(TCP/IP sockets)
Defines the type of packets to use for a socket. The following values are valid:
raw | 0
No packaging is done.
1 | 2 | 4
Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.
In current implementation the 4-byte header is limited to 2Gb.
asn1 | cdr | sunrm | fcgi | tpkt | line
These packet types only have effect on receiving. When sending a packet, it is the responsibility of the application to supply a correct header. On receiving, however, there will be one message sent to the controlling process for each complete packet received, and, similarly, each call to gen_tcp:recv/2,3 returns one complete packet. The header is not stripped off.
The meanings of the packet types are as follows:
asn1 - ASN.1 BER,
sunrm - Sun’s RPC encoding,
cdr - CORBA (GIOP 1.1),
fcgi - Fast CGI,
tpkt - TPKT format [RFC1006],
line - Line mode, a packet is a line terminated with newline, lines longer than the receive buffer are truncated.
http | http_bin
The Hypertext Transfer Protocol. The packets are returned with the format according to HttpPacket described in erlang:decode_packet/3. A socket in passive mode will return {ok, HttpPacket} from gen_tcp:recv while an active socket will send messages like {http, Socket, HttpPacket}.
Note that the packet type httph is not needed when reading from a socket.
{packet_size, Integer}(TCP/IP sockets)
Sets the max allowed length of the packet body. If the packet header indicates that the length of the packet is longer than the max allowed length, the packet is considered invalid. The same happens if the packet header is too big for the socket receive buffer.
{recbuf, Integer}
Gives the size of the receive buffer to use for the socket.
我把和接收缓冲区大小有关的参数都列出来了,结合inet_drv.c的代码:
#define INET_DEF_BUFFER 1460 /* default buffer size */ #define INET_MIN_BUFFER 1 /* internal min buffer */ #define TCP_MAX_PACKET_SIZE 0x4000000 /* 64 M */ /* LOPT is local options */ #define INET_LOPT_BUFFER 20 /* min buffer size hint */ typedef struct { ... unsigned int psize; /* max packet size (TCP only?) */ ... int bufsz; /* minimum buffer constraint */ ... } inet_descriptor; typedef struct { inet_descriptor inet; /* common data structure (DON\'T MOVE) */ ... int i_bufsz; /* current input buffer size (<= bufsz) */ ErlDrvBinary* i_buf; /* current binary buffer */ char* i_ptr; /* current pos in buf */ char* i_ptr_start; /* packet start pos in buf */ int i_remain; /* remaining chars to read */ ... } tcp_descriptor; static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) { ... case INET_LOPT_BUFFER: DEBUGF(("inet_set_opts(%ld): s=%d, BUFFER=%d\\r\\n", (long)desc->port, desc->s, ival)); if (ival < INET_MIN_BUFFER) ival = INET_MIN_BUFFER; desc->bufsz = ival; continue; ... if (type == SO_RCVBUF) { /* make sure we have desc->bufsz >= SO_RCVBUF */ if (ival > desc->bufsz) desc->bufsz = ival; } ... case INET_LOPT_PACKET_SIZE: DEBUGF(("inet_set_opts(%ld): s=%d, PACKET_SIZE=%d\\r\\n", (long)desc->port, desc->s, ival)); desc->psize = (unsigned int)ival; continue; ... } static int inet_fill_opts(inet_descriptor* desc, char* buf, int len, char** dest, int destlen) { ... case INET_OPT_RCVBUF: type = SO_RCVBUF; break; ... }
从文档和代码结合中可以看出,一个TCP报文的最大大小由{packet_size, Integer}决定,最大不超过64M. 每个TCP报文由一定的头,比如(1 | 2 | 4)字节的报文长度,由{packet, PacketType}决定。
那么接收缓冲区的默认大小多大呢?
未公开的{buffer,Integer}选项:
enc_opt(buffer) -> ?INET_LOPT_BUFFER;
以及{recbuf, Integer},接收缓冲区的默认大小取他们中间最大的一个,在默认情况下是1460,一个TCP段的大小。
那么接收缓存区的大小会如何变化呢?
我们知道gen_tcp由主动和被动模式,在主动模式下报文会根据packettype来准备好报文体,扔给宿主进程,在这种情况下,如果接收缓冲区比报文的长度小的话,那么就把缓冲区扩展到该报文的大小,然后等待所有的报文体都接收完毕就好。
那么什么时候,把缓冲区主动搞小呢?
/* ** Deliver all packets ready ** if len == 0 then check start with a check for ready packet */ static int tcp_deliver(tcp_descriptor* desc, int len) { ... while (len > 0) { int code = 0; inet_input_count(INETP(desc), len); /* deliver binary? */ if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */ /* something after? */ if (desc->i_ptr_start + len == desc->i_ptr) { /* no */ code = tcp_reply_binary_data(desc, desc->i_buf, (desc->i_ptr_start - desc->i_buf->orig_bytes), len); tcp_clear_input(desc); } else { /* move trail to beginning of a new buffer */ ErlDrvBinary* bin; char* ptr_end = desc->i_ptr_start + len; int sz = desc->i_ptr - ptr_end; bin = alloc_buffer(desc->i_bufsz); memcpy(bin->orig_bytes, ptr_end, sz); code = tcp_reply_binary_data(desc, desc->i_buf, (desc->i_ptr_start- desc->i_buf->orig_bytes), len); free_buffer(desc->i_buf); desc->i_buf = bin; desc->i_ptr_start = desc->i_buf->orig_bytes; desc->i_ptr = desc->i_ptr_start + sz; desc->i_remain = 0; } } else { code = tcp_reply_data(desc, desc->i_ptr_start, len); /* XXX The buffer gets thrown away on error (code < 0) */ /* Windows needs workaround for this in tcp_inet_event... */ desc->i_ptr_start += len; if (desc->i_ptr_start == desc->i_ptr) tcp_clear_input(desc); else desc->i_remain = 0; }
当接收到的报文提交出去的时候,长度大于75%缓冲区的长度或者缓冲区刚好只有这个报文的时候,就把缓冲区释放掉。
被动模式则由用户指定要接收多少数据,如果指定0,则表明有多少要多少。那么这种情况下,缓冲区的长度会取报文的长度和用户要求的长度中的最大值。
根据分析,我们知道接收缓冲区占用的内存比较复杂,我们如果在程序中要精确的控制内存,需要调整上面的参数。
接下来我们看下分析下unrecv的用途,首先我们参考下misultin小型的erlang web服务器,项目在 这里
这个项目就很灵活的使用了packet类型和active模式的结合来利用erts已有的协议分析,我简单的演示如下:
grep下代码,可以看到:
$ grep -rin setopts . ./misultin_socket.erl:106: inet:setopts(Sock, [{active, once}]), ./misultin_socket.erl:130: inet:setopts(Sock, [{active, once}]), ./misultin_socket.erl:194: inet:setopts(Sock, [{packet, raw}, {active, false}]), ./misultin_socket.erl:202: inet:setopts(Sock, [{packet, http}]), $ grep -rin gen_tcp:recv . ./misultin_socket.erl:195: case gen_tcp:recv(Sock, Len, RecvTimeout) of
各位可以参考下它的实现,很值得学习!
我还是举个例子来演示unrecv的使用:
比如某个报文是用 [报文体]+ 回车行+,类似底下这样的报文
[demo]
line1
line2
line3
由于事先不知道报文的准确长度,我们就设成{packet,raw}, {active, false},
先读入一段报文进行分析,分析出报文的‘[’和‘]’,就知道报文的大小,剩下的报文因为inet_drv支持行分割的报文,我们就无需自己动手分析了。
但是我们预读取的数据可能会超出报文的大小,部分回车行分割的数据已经被读取出来了,利用{packet, line}来分析就不正确了。所以我们用unrecv把这段数据还回去,就可以了,方便之门就打开了。
总结:分析了gen_tcp接收缓冲区的工作原理以及影响大小的因素,还顺便介绍了unrecv的用途。
gen_tcp以及发送缓冲区相关的文章请参见这里 !
建议继续学习:
- gen_tcp发送进程被挂起起因分析及对策 (阅读:36983)
- gen_tcp发送缓冲区以及水位线问题分析 (阅读:5268)
- whatsapp深度使用Erlang有感 (阅读:4633)
- Erlang match_spec引擎介绍和应用 (阅读:4576)
- php-erlang (阅读:4319)
- gen_tcp调用进程收到{empty_out_q, Port}消息奇怪行为分析 (阅读:3555)
- hibernate使用注意事项 (阅读:3228)
- Erlang如何限制节点对集群的访问之net_kernel:allow (阅读:3028)
- Erlang linkin driver用port_control方式时的一些经验分享 (阅读:2975)
- ERLANG OTP源码分析 – gen_server (阅读:2870)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:Yu Feng 来源: Erlang非业余研究
- 标签: Erlang gen_tcp unrecv 缓冲区
- 发布时间:2011-10-04 18:06:22
- [69] Twitter/微博客的学习摘要
- [67] IOS安全–浅谈关于IOS加固的几种方法
- [65] 如何拿下简短的域名
- [65] android 开发入门
- [63] find命令的一点注意事项
- [62] Go Reflect 性能
- [61] 流程管理与用户研究
- [60] Oracle MTS模式下 进程地址与会话信
- [59] 图书馆的世界纪录
- [57] 读书笔记-壹百度:百度十年千倍的29条法则