原作者:Buddhika Chamith
随着所有的在高可用服务器设计上的炒作,以及nodejs背后的风行,我想关注一些IO的设计模式,却一起没有足够的时间。现在正在完成的一些研究,我想最好记下这些资料以备查。让我们跳上IO bus兜风去。
在许多web server上,典型的一个连接一个thread的基础,这种类型是IO操作阻塞着应用程序直到完成。
当阻塞式的read方法或write方法被调用时,将有一次上下文切换至kernel中,IO操作会发生,数据会被复制进kernel的buffer中。然后,kernel buffer会把数据转给用户空间里的应用程序级别的buffer,并且应用程序的thread会被标识为runnable的,此时应用程序会解锁可以从用户空间的buffer中读取数据。
1 2 3 4 5 | ServerSocket server = new ServerSocket(port); while(true) { Socket connection = server.accept(); spawn-Thread-and-process(connection); } |
这个模型下,设备(网卡)或者连接被设置为非阻塞的,read()和write()操作将不会被阻塞。通常意味着,如果操作不能立即得到结论,将会返回,带一个error code以指出操作会阻塞(POSIX标准是EWOULDBLOCK)或者是设备临时不可用(POSIX标准是EAGAIN)。由应用程序去检测,直到设备准备好了并且所有数据被读到。尽量如此,这也不是非常高效,因为每次调用都会激起一次上下文切换给kernel,并且不会考虑数据有没被读到。
Java已经抽象出来平台特殊性系统调用的不同,实现了NIO API。Socket 文件描述符被用Channels和Selector抽象,封装到selection系统调用中。应用程序感兴趣的收集就绪事件,注册到Channel(通常在ServerSocketChannel上accept()就得到一个SocketChannel),注册的内容是Selector,会得到SelectionKey,这个SelectionKey就是作为一个handle,这个handle的作用是hold住Channel和注册信息。然后阻塞的select()调用被设置在Selector,它会返回一系列的SelectionKey,然后一个接一个地被程序所指定的事件处理器所处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | elector selector =; channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while(true) { int readyChannels =; if(readyChannels == 0) continue; Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key =; if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } } |
就绪事件只能做到通知你设备\socket准备好做事情的程度。应用程序依然不得不做脏活,为了从设备/socket中读数据(更准确地说是通过系统调用指示操作系统),通过设备的各种思路将数据扔到用户空间的buffer。把任务代理给操作系统在后台运行,一旦完成了让它再通知你,包括从设备到kernel的buffer再最终到应用程序级别的buffer传送所有的数据,这样岂不是很爽?这就是经常被提到的异步IO模型背后的基础想法。所以需要操作系统层支持AIO操作。在Linux下从2.6开始在aio POSIX API中被支持,Windows下用I/O Completion Ports支持。
JAVA NIO2在AsynchronousChannel API中一点点支持此模型。
为了支持就绪和完成事件通知,不同的操作系统提供了各种各样的系统调用。就绪事件 select()和poll()可以在Linux类的系统中使用。尽管如此,更新的epoll()变种更好,因为它比select()和poll()更有效率。当监控的文件描述符增长时,选择时间在线性增长,这一点上导致了select()不行。在复写文件描述符数组这事上已经臭名昭著。所以每次一被调用,描述符数组就需要从一个单独的拷贝上重新构建。无论如何这都不是一个优雅的解决方案。
相比的系统调用还有BSD口味的kqueue,Solaris由于版本不同有/dev/poll或者”Event Completion”。Windows下等价的是“I/O Completion Ports”。
至少在Linux下AIO模型的情况却大不同。Linux中aio的支持看上去埋头在一些意见困扰中,实际地在kernel层面使用就绪事件,同时在应用程序层面提供异步完成事件的抽象。尽管如此,Windows看上去通过“I/O Completion Ports”支持这个得了第一名。
dispatcher:为注册、删除定义接口,分发事件处理器起作用,作用是响应连接事件,包括连接被接受、数据输入输出、一组连接上的超时事件。为了服务一个客户端连接,相关的事件处理器(比如接受事件处理器)会被注册给被接受的客户端通道(在client socket其下包装),注册内容是分接器(demultiplexer),就绪事件类的都被会注册,以监听此特定的channel。然后,分配器线程会调出阻塞的就绪选择操作,这些操作在demultiplexer之上,主要为剩下的注册通道。一旦一个或多个被注册的通道准备好IO,分配器会服务给相关的每个准备好的通道一对一的用注册的事件处理器返回“Handle”。很重要的是,这些事件处理器不会hold住分配器线程,但是会延迟分配器服务其他准备好的连接。因为常见的在事件处理器里的逻辑,包括传送数据从/去准备好的连接,这些连接会阻塞,一直到所有的数据在用户空间和内核数据缓存中被送完,一般情况下,这些处理器跑在一个线程池的不同的线程里。
Handle:当一个channel被注册了分接器(demultiplexer)就会返回一个handle,handle概括了连接通道和就绪信息。靠分接器就绪选择操作,一系列的准备好的Handle会被返回。Java NIO里对等的叫SelectionKey。
Demultiplexer:(分接器:54chen专门瞎翻)等待在一个或多个注册的连接通道里的就绪事件。Java NIO里叫Selector。
Event Handler:指接口具有的hook方法,以分配连接事件。这些方法需要被应用程序指定的事件处理器所实现。
Concrete Event Handler:(具体的事件处理器)包括从连接中读写数据的逻辑,并且要做一些必须的过程,或者初始化客户端连接传过的接收协议,这些协议来自通过的Handle。
一个简单的echo server实现,下面的例子显示了这种模式(没有事件处理器线程池)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | public class ReactorInitiator { private static final int NIO_SERVER_PORT = 9993; public void initiateReactiveServer(int port) throws Exception { ServerSocketChannel server =; server.socket().bind(new InetSocketAddress(port)); server.configureBlocking(false); Dispatcher dispatcher = new Dispatcher(); dispatcher.registerChannel(SelectionKey.OP_ACCEPT, server); dispatcher.registerEventHandler( SelectionKey.OP_ACCEPT, new AcceptEventHandler( dispatcher.getDemultiplexer())); dispatcher.registerEventHandler( SelectionKey.OP_READ, new ReadEventHandler( dispatcher.getDemultiplexer())); dispatcher.registerEventHandler( SelectionKey.OP_WRITE, new WriteEventHandler());; // Run the dispatcher loop } public static void main(String[] args) throws Exception { System.out.println("Starting NIO server at port : " + NIO_SERVER_PORT); new ReactorInitiator(). initiateReactiveServer(NIO_SERVER_PORT); } } public class Dispatcher { private Map<Integer, EventHandler> registeredHandlers = new ConcurrentHashMap<Integer, EventHandler>(); private Selector demultiplexer; public Dispatcher() throws Exception { demultiplexer =; } public Selector getDemultiplexer() { return demultiplexer; } public void registerEventHandler( int eventType, EventHandler eventHandler) { registeredHandlers.put(eventType, eventHandler); } // Used to register ServerSocketChannel with the // selector to accept incoming client connections public void registerChannel( int eventType, SelectableChannel channel) throws Exception { channel.register(demultiplexer, eventType); } public void run() { try { while (true) { // Loop indefinitely; Set<SelectionKey> readyHandles = demultiplexer.selectedKeys(); Iterator<SelectionKey> handleIterator = readyHandles.iterator(); while (handleIterator.hasNext()) { SelectionKey handle =; if (handle.isAcceptable()) { EventHandler handler = registeredHandlers.get(SelectionKey.OP_ACCEPT); handler.handleEvent(handle); // Note : Here we don't remove this handle from // selector since we want to keep listening to // new client connections } if (handle.isReadable()) { EventHandler handler = registeredHandlers.get(SelectionKey.OP_READ); handler.handleEvent(handle); handleIterator.remove(); } if (handle.isWritable()) { EventHandler handler = registeredHandlers.get(SelectionKey.OP_WRITE); handler.handleEvent(handle); handleIterator.remove(); } } } } catch (Exception e) { e.printStackTrace(); } } } public interface EventHandler { public void handleEvent(SelectionKey handle) throws Exception; } public class AcceptEventHandler implements EventHandler { private Selector demultiplexer; public AcceptEventHandler(Selector demultiplexer) { this.demultiplexer = demultiplexer; } @Override public void handleEvent(SelectionKey handle) throws Exception { ServerSocketChannel serverSocketChannel = (ServerSocketChannel); SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { socketChannel.configureBlocking(false); socketChannel.register( demultiplexer, SelectionKey.OP_READ); } } } public class ReadEventHandler implements EventHandler { private Selector demultiplexer; private ByteBuffer inputBuffer = ByteBuffer.allocate(2048); public ReadEventHandler(Selector demultiplexer) { this.demultiplexer = demultiplexer; } @Override public void handleEvent(SelectionKey handle) throws Exception { SocketChannel socketChannel = (SocketChannel);; // Read data from client inputBuffer.flip(); // Rewind the buffer to start reading from the beginning byte[] buffer = new byte[inputBuffer.limit()]; inputBuffer.get(buffer); System.out.println("Received message from client : " + new String(buffer)); inputBuffer.flip(); // Rewind the buffer to start reading from the beginning // Register the interest for writable readiness event for // this channel in order to echo back the message socketChannel.register( demultiplexer, SelectionKey.OP_WRITE, inputBuffer); } } public class WriteEventHandler implements EventHandler { @Override public void handleEvent(SelectionKey handle) throws Exception { SocketChannel socketChannel = (SocketChannel); ByteBuffer inputBuffer = (ByteBuffer) handle.attachment(); socketChannel.write(inputBuffer); socketChannel.close(); // Close connection } } |
Asynchronous Operation Processor:异步操作处理器。其职责是异步地抓出IO操作,提供完成事件通知给应用层的完成处理器。操作系统通常会暴露异步IO接口。
Asynchronous Operation:异步操作的运行在独立的内核线程中,靠异步操作处理器来完成。
Completion Dispatcher:其职责是在异步操作完成时,唤回应用程序的完成处理器。当异步操作处理器完成了一次异步初始化操作,完成分发器会进行应用程序自行维护的回调。通常,委派事件通知处理给相对的事件合适的完成处理器。
Completion Handler:这是被实用程序实现的接口,用于处理异步事件完成events。
让我们来看看如何用新的Java 7里的NIO.2 API来实现这种模式(一个简单的echo server)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | public class ProactorInitiator { static int ASYNC_SERVER_PORT = 4333; public void initiateProactiveServer(int port) throws IOException { final AsynchronousServerSocketChannel listener = new InetSocketAddress(port)); AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(listener); SessionState state = new SessionState(); listener.accept(state, acceptCompletionHandler); } public static void main(String[] args) { try { System.out.println("Async server listening on port : " + ASYNC_SERVER_PORT); new ProactorInitiator().initiateProactiveServer( ASYNC_SERVER_PORT); } catch (IOException e) { e.printStackTrace(); } // Sleep indefinitely since otherwise the JVM would terminate while (true) { try { Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, SessionState> { private AsynchronousServerSocketChannel listener; public AcceptCompletionHandler( AsynchronousServerSocketChannel listener) { this.listener = listener; } @Override public void completed(AsynchronousSocketChannel socketChannel, SessionState sessionState) { // accept the next connection SessionState newSessionState = new SessionState(); listener.accept(newSessionState, this); // handle this connection ByteBuffer inputBuffer = ByteBuffer.allocate(2048); ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(socketChannel, inputBuffer); inputBuffer, sessionState, readCompletionHandler); } @Override public void failed(Throwable exc, SessionState sessionState) { // Handle connection failure... } } public class ReadCompletionHandler implements CompletionHandler<Integer, SessionState> { private AsynchronousSocketChannel socketChannel; private ByteBuffer inputBuffer; public ReadCompletionHandler( AsynchronousSocketChannel socketChannel, ByteBuffer inputBuffer) { this.socketChannel = socketChannel; this.inputBuffer = inputBuffer; } @Override public void completed( Integer bytesRead, SessionState sessionState) { byte[] buffer = new byte[bytesRead]; inputBuffer.rewind(); // Rewind the input buffer to read from the beginning inputBuffer.get(buffer); String message = new String(buffer); System.out.println("Received message from client : " + message); // Echo the message back to client WriteCompletionHandler writeCompletionHandler = new WriteCompletionHandler(socketChannel); ByteBuffer outputBuffer = ByteBuffer.wrap(buffer); socketChannel.write( outputBuffer, sessionState, writeCompletionHandler); } @Override public void failed(Throwable exc, SessionState attachment) { //Handle read failure..... } } public class WriteCompletionHandler implements CompletionHandler<Integer, SessionState> { private AsynchronousSocketChannel socketChannel; public WriteCompletionHandler( AsynchronousSocketChannel socketChannel) { this.socketChannel = socketChannel; } @Override public void completed( Integer bytesWritten, SessionState attachment) { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, SessionState attachment) { // Handle write failure..... } } public class SessionState { private Map<String, String> sessionProps = new ConcurrentHashMap<String, String>(); public String getProperty(String key) { return sessionProps.get(key); } public void setProperty(String key, String value) { sessionProps.put(key, value); } } |
每种类型的事件完成(接受、读、写)都会被一个单独的完成处理器handle,这个处理器实现了CompletionHandler接口(Accept/ Read/WriteCompletionHandler等)。状态过渡被管理在这些连接处理器中。额外SessionState参数可以被用于hold客户端的session,待定的状态就可以跨这一系列的完成事件了。
如果你在考虑实现一个NIO的HTTP服务器,你有福了。Apache HTTPCore包对使用NIO处理HTTP流量提供了优秀的支持。API在内置的用NIO对付http请求处理层之上提供了高层次的抽象。下面给出一个最小化的非阻塞Http服务器实现,任何的GET访问都会返回一个样本输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | public class NHttpServer { public void start() throws IOReactorException { HttpParams params = new BasicHttpParams(); // Connection parameters params. setIntParameter( HttpConnectionParams.SO_TIMEOUT, 60000) .setIntParameter( HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024) .setBooleanParameter( HttpConnectionParams.STALE_CONNECTION_CHECK, true) .setBooleanParameter( HttpConnectionParams.TCP_NODELAY, true); final DefaultListeningIOReactor ioReactor = new DefaultListeningIOReactor(2, params); // Spawns an IOReactor having two reactor threads // running selectors. Number of threads here is // usually matched to the number of processor cores // in the system // Application specific readiness event handler ServerHandler handler = new ServerHandler(); final IOEventDispatch ioEventDispatch = new DefaultServerIOEventDispatch(handler, params); // Default IO event dispatcher encapsulating the // event handler ListenerEndpoint endpoint = ioReactor.listen( new InetSocketAddress(4444)); // start the IO reactor in a new separate thread Thread t = new Thread(new Runnable() { public void run() { try { System.out.println("Listening in port 4444"); ioReactor.execute(ioEventDispatch); } catch (InterruptedIOException ex) { ex.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }); t.start(); // Wait for the endpoint to become ready, // i.e. for the listener to start accepting requests. try { endpoint.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOReactorException { new NHttpServer().start(); } } public class ServerHandler implements NHttpServiceHandler { private static final int BUFFER_SIZE = 2048; private static final String RESPONSE_SOURCE_BUFFER = "response-source-buffer"; // the factory to create HTTP responses private final HttpResponseFactory responseFactory; // the HTTP response processor private final HttpProcessor httpProcessor; // the strategy to re-use connections private final ConnectionReuseStrategy connStrategy; // the buffer allocator private final ByteBufferAllocator allocator; public ServerHandler() { super(); this.responseFactory = new DefaultHttpResponseFactory(); this.httpProcessor = new BasicHttpProcessor(); this.connStrategy = new DefaultConnectionReuseStrategy(); this.allocator = new HeapByteBufferAllocator(); } @Override public void connected( NHttpServerConnection nHttpServerConnection) { System.out.println("New incoming connection"); } @Override public void requestReceived( NHttpServerConnection nHttpServerConnection) { HttpRequest request = nHttpServerConnection.getHttpRequest(); if (request instanceof HttpEntityEnclosingRequest) { // Handle POST and PUT requests } else { ContentOutputBuffer outputBuffer = new SharedOutputBuffer( BUFFER_SIZE, nHttpServerConnection, allocator); HttpContext context = nHttpServerConnection.getContext(); context.setAttribute( RESPONSE_SOURCE_BUFFER, outputBuffer); OutputStream os = new ContentOutputStream(outputBuffer); // create the default response to this request ProtocolVersion httpVersion = request.getRequestLine().getProtocolVersion(); HttpResponse response = responseFactory.newHttpResponse( httpVersion, HttpStatus.SC_OK, nHttpServerConnection.getContext()); // create a basic HttpEntity using the source // channel of the response pipe BasicHttpEntity entity = new BasicHttpEntity(); if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) { entity.setChunked(true); } response.setEntity(entity); String method = request.getRequestLine(). getMethod().toUpperCase(); if (method.equals("GET")) { try { nHttpServerConnection.suspendInput(); nHttpServerConnection.submitResponse(response); os.write(new String("Hello client.."). getBytes("UTF-8")); os.flush(); os.close(); } catch (Exception e) { e.printStackTrace(); } } // Handle other http methods } } @Override public void inputReady( NHttpServerConnection nHttpServerConnection, ContentDecoder contentDecoder) { // Handle request enclosed entities here by reading // them from the channel } @Override public void responseReady( NHttpServerConnection nHttpServerConnection) { try { nHttpServerConnection.close(); } catch (IOException e) { e.printStackTrace(); } } @Override |
- 关于IO的同步,异步,阻塞,非阻塞 (阅读:14455)
- Linux服务器性能评估 (阅读:8151)
- 提升磁盘IO性能的几个技巧 (阅读:7593)
- I/O模型-读书笔记 (阅读:6897)
- Innodb IO优化-配置优化 (阅读:6626)
- 查看 CPU, Memory, I/O and NetFlow (阅读:6413)
- blktrace 深度了解linux系统的IO运作 (阅读:5998)
- Linux操作系统内核3.3版本I/O Stack的流图 (阅读:5767)
- Linux IO协议栈框图 (阅读:5308)
- MySQL Tuning之浅析I/O优化 (阅读:5128)
- 作者:五四陈科学院 来源: 五四陈科学院
- 标签: IO
- 发布时间:2014-03-19 23:01:03
- [68] Go Reflect 性能
- [68] 如何拿下简短的域名
- [67] Oracle MTS模式下 进程地址与会话信
- [62] IOS安全–浅谈关于IOS加固的几种方法
- [61] 图书馆的世界纪录
- [60] 【社会化设计】自我(self)部分――欢迎区
- [58] android 开发入门
- [56] 视觉调整-设计师 vs. 逻辑
- [49] 给自己的字体课(一)——英文字体基础
- [48] 读书笔记-壹百度:百度十年千倍的29条法则