IT技术博客大学习 共学习 共进步

Jetty线程“互锁”导致数据传输性能降低问题分析

忘我的追寻 2013-09-23 23:06:25 浏览 4,323 次

   以下分析针对jetty的特定版本:http://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/tags/jetty-7.2.1.v20101111

   首先介绍一下Jetty的反映器模型,Jetty用的经典的NIO异步模型(Scalable IO in Java http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。连接管理的示意图如下:

   pool

   Jetty在使用这个模型的时候,做了一些改动,acceptor是独立出来的一个阻塞线程,用于阻塞地接受新的连接请求,而所有的连接建立之后,都会想selector线程注册网络事件和内部的事件(changes),selector需要同时处理网络事件和内部的changes。同时还要定期检查超时的链接。

   当一个连接建立之后,除了分发网络事件之外,主线程还会与子线程有一些交互。当子线程发现网络拥塞,缓冲区的数据无法及时刷走时,会注册一个表明自己处于阻塞状态的内部事件,并且期望主线程在发现拥塞解除的时候能通知到自己。

   具体的代码如下:

/* ------------------------------------------------------------ */    /*     * Allows thread to block waiting for further events.     */    @Override    public boolean blockWritable(long timeoutMs) throws IOException    {        synchronized (this)        {            long start=_selectSet.getNow();            try            {                   _writeBlocked=true;                while (isOpen() && _writeBlocked)                {                    try                    {                        updateKey();                        this.wait(timeoutMs);                        timeoutMs -= _selectSet.getNow()-start;                        if (_writeBlocked && timeoutMs<=0)                            return false;                    }                    catch (InterruptedException e)                    {                        Log.warn(e);                    }                }            }            finally            {                _writeBlocked=false;                if (_idleTimestamp!=-1)                    scheduleIdle();            }        }        return true;    }

   而主线程的select主循环的代码如下:

/* ------------------------------------------------------------ */    /**     * Select and dispatch tasks found from changes and the selector.     *      * @throws IOException     */    public void doSelect() throws IOException    {        try        {            _selecting=Thread.currentThread();            final Selector selector=_selector;            // Make any key changes required            Object change;            int changes=_changes.size();            while (changes-->0 && (change=_changes.poll())!=null)            {                try                {                    if (change instanceof EndPoint)                    {                        // Update the operations for a key.                        SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;                        endpoint.doUpdateKey();                    }                    else if (change instanceof ChannelAndAttachment)                    {                        // finish accepting/connecting this connection                        final ChannelAndAttachment asc = (ChannelAndAttachment)change;                        final SelectableChannel channel=asc._channel;                        final Object att = asc._attachment;                        SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);                        SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);                        key.attach(endpoint);                        endpoint.schedule();                    }                    else if (change instanceof SocketChannel)                    {                        // Newly registered channel                        final SocketChannel channel=(SocketChannel)change;                        SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);                        SelectChannelEndPoint endpoint = createEndPoint(channel,key);                        key.attach(endpoint);                        endpoint.schedule();                    }                    else if (change instanceof Runnable)                    {                        dispatch((Runnable)change);                    }                    else                        throw new IllegalArgumentException(change.toString());                }                catch (Exception e)                {                    if (isRunning())                        Log.warn(e);                    else                        Log.debug(e);                }                catch (Error e)                {                    if (isRunning())                        Log.warn(e);                    else                        Log.debug(e);                }            }            // Do and instant select to see if any connections can be handled.            int selected=selector.selectNow();            _selects++;            long now=System.currentTimeMillis();                        // if no immediate things to do            if (selected==0)            {                // If we are in pausing mode                if (_pausing)                {                    try                    {                        Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop                    }                    catch(InterruptedException e)                    {                        Log.ignore(e);                    }                    now=System.currentTimeMillis();                }                // workout how long to wait in select                _timeout.setNow(now);                long to_next_timeout=_timeout.getTimeToNext();                long wait = _changes.size()==0?__IDLE_TICK:0L;                  if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)                    wait = to_next_timeout;                // If we should wait with a select                if (wait>0)                {                    long before=now;                    selected=selector.select(wait);                    _selects++;                    now = System.currentTimeMillis();                    _timeout.setNow(now);                    checkJvmBugs(before, now, wait, selected);                }            }                        // have we been destroyed while sleeping            if (_selector==null || !selector.isOpen())                return;            // Look for things to do            for (SelectionKey key: selector.selectedKeys())            {                   try                {                    if (!key.isValid())                    {                        key.cancel();                        SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();                        if (endpoint != null)                            endpoint.doUpdateKey();                        continue;                    }                    Object att = key.attachment();                    if (att instanceof SelectChannelEndPoint)                    {                        ((SelectChannelEndPoint)att).schedule();                    }                    else                    {                        // Wrap readable registered channel in an endpoint                        SocketChannel channel = (SocketChannel)key.channel();                        SelectChannelEndPoint endpoint = createEndPoint(channel,key);                        key.attach(endpoint);                        if (key.isReadable())                            endpoint.schedule();                                               }                    key = null;                }                catch (CancelledKeyException e)                {                    Log.ignore(e);                }                catch (Exception e)                {                    if (isRunning())                        Log.warn(e);                    else                        Log.ignore(e);                    if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())                        key.cancel();                }            }                        // Everything always handled            selector.selectedKeys().clear();                        now=System.currentTimeMillis();            _timeout.setNow(now);            Task task = _timeout.expired();            while (task!=null)            {                if (task instanceof Runnable)                    dispatch((Runnable)task);                task = _timeout.expired();            }            // Idle tick            if (now-_idleTick>__IDLE_TICK)            {                _idleTick=now;                                final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))                    ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)                    :now;                                    dispatch(new Runnable()                {                    public void run()                    {                        for (SelectChannelEndPoint endp:_endPoints.keySet())                        {                            endp.checkIdleTimestamp(idle_now);                        }                    }                });            }        }        catch (CancelledKeyException e)        {            Log.ignore(e);        }        finally        {            _selecting=null;        }    }    /* ------------------------------------------------------------ */    private void checkJvmBugs(long before, long now, long wait, int selected)        throws IOException    {        Selector selector = _selector;        if (selector==null)            return;                    // Look for JVM bugs over a monitor period.        // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933        // http://bugs.sun.com/view_bug.do?bug_id=6693490        if (now>_monitorNext)        {            _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));            _pausing=_selects>__MAX_SELECTS;            if (_pausing)                _paused++;            _selects=0;            _jvmBug=0;            _monitorStart=now;            _monitorNext=now+__MONITOR_PERIOD;        }        if (now>_log)        {            if (_paused>0)                  Log.debug(this+" Busy selector - injecting delay "+_paused+" times");            if (_jvmFix2>0)                Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");            if (_jvmFix1>0)                Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");            else if(Log.isDebugEnabled() && _jvmFix0>0)                Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");            _paused=0;            _jvmFix2=0;            _jvmFix1=0;            _jvmFix0=0;            _log=now+60000;        }        // If we see signature of possible JVM bug, increment count.        if (selected==0 && wait>10 && (now-before)<(wait/2))        {            // Increment bug count and try a work around            _jvmBug++;            if (_jvmBug>(__JVMBUG_THRESHHOLD))            {                try                {                    if (_jvmBug==__JVMBUG_THRESHHOLD+1)                        _jvmFix2++;                    Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop                }                catch(InterruptedException e)                {                    Log.ignore(e);                }            }            else if (_jvmBug==__JVMBUG_THRESHHOLD)            {                synchronized (this)                {                    // BLOODY SUN BUG !!!  Try refreshing the entire selector.                    final Selector new_selector = Selector.open();                    for (SelectionKey k: selector.keys())                    {                        if (!k.isValid() || k.interestOps()==0)                            continue;                        final SelectableChannel channel = k.channel();                        final Object attachment = k.attachment();                        if (attachment==null)                            addChange(channel);                        else                            addChange(channel,attachment);                    }                    _selector.close();                    _selector=new_selector;                    return;                }            }            else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops            {                // Cancel keys with 0 interested ops                int cancelled=0;                for (SelectionKey k: selector.keys())                {                    if (k.isValid()&&k.interestOps()==0)                    {                        k.cancel();                        cancelled++;                    }                }                if (cancelled>0)                    _jvmFix0++;                return;            }        }        else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)        {            // Look for busy key            SelectionKey busy = selector.selectedKeys().iterator().next();            if (busy==_busyKey)            {                if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))                {                    final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();                    Log.warn("Busy Key "+busy.channel()+" "+endpoint);                    busy.cancel();                    if (endpoint!=null)                    {                        dispatch(new Runnable()                        {                            public void run()                            {                                try                                {                                    endpoint.close();                                }                                catch (IOException e)                                {                                    Log.ignore(e);                                }                            }                        });                    }                }            }            else                _busyKeyCount=0;            _busyKey=busy;        }    }

   在checkjvmbugs的方法中,当主线程正在阻塞selector.select(wait)时,子线程出现blockwrite的情况,会导致主线程被惊醒,使得主线程select到的事件数为0,当selector管理大量的连接时,会出现1秒内上百次主线程被惊醒,最终在checkjvmbugs方法内被当作是jvmbug来处理了,强制让主线程休眠50毫秒,而此时还有大量的线程注册阻塞事件,并等待主线程的唤醒。最终一个12K的数据花费了50ms的时间来写往客户端,最终导致性能下降。

   问题已经很清晰,jetty自身的阻塞-唤醒机制被当作jvmbugs来处理,导致数据传输性能受影响,这样的情况也只有在有大量连接和数据传输的时候才会体现出来。

   修改方案,可以将checkjvmbugs调用直接注释掉,保留每秒select 25000次后休眠一段时间的实现即可。也可以将50毫秒的休眠时间调短来避免阻塞时间过长的问题。

   分析最新的jetty代码,发现这个checkjvmbugs的调用已经去掉了,正是按照方案1来实现的。

   问题明确了之后,看似比较简单,但实际上发现问题的过程是很漫长和曲折的,在问题已经锁定在上面之后,有幸参与最终的定位并且分析出了具体的问题点,问题找到的那一刻,有一种兴奋感与成就感。

建议继续学习

  1. Netty和Jetty的Java NIO 网络框架模型分析 (阅读 5,463)
  2. Apache + Jetty 架设 CAS 单点登录 (阅读 5,062)
  3. 关于 Jetty Continuation (阅读 3,424)
  4. Jetty 8长连接上的又一个坑 (阅读 2,362)