技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 源码分析 --> Jetty线程“互锁”导致数据传输性能降低问题分析

Jetty线程“互锁”导致数据传输性能降低问题分析

浏览:2991次  出处信息

   以下分析针对jetty的特定版本:http://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/tags/jetty-7.2.1.v20101111

   首先介绍一下Jetty的反映器模型,Jetty用的经典的NIO异步模型(Scalable IO in Java http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。连接管理的示意图如下:

   pool

   Jetty在使用这个模型的时候,做了一些改动,acceptor是独立出来的一个阻塞线程,用于阻塞地接受新的连接请求,而所有的连接建立之后,都会想selector线程注册网络事件和内部的事件(changes),selector需要同时处理网络事件和内部的changes。同时还要定期检查超时的链接。

   当一个连接建立之后,除了分发网络事件之外,主线程还会与子线程有一些交互。当子线程发现网络拥塞,缓冲区的数据无法及时刷走时,会注册一个表明自己处于阻塞状态的内部事件,并且期望主线程在发现拥塞解除的时候能通知到自己。

   具体的代码如下:

/* ------------------------------------------------------------ */

   /*

    * Allows thread to block waiting for further events.

    */

   @Override

   public boolean blockWritable(long timeoutMs) throws IOException

   {

       synchronized (this)

       {

           long start=_selectSet.getNow();

           try

           {  

               _writeBlocked=true;

               while (isOpen() && _writeBlocked)

               {

                   try

                   {

                       updateKey();

                       this.wait(timeoutMs);

                       timeoutMs -= _selectSet.getNow()-start;

                       if (_writeBlocked && timeoutMs<=0)

                           return false;

                   }

                   catch (InterruptedException e)

                   {

                       Log.warn(e);

                   }

               }

           }

           finally

           {

               _writeBlocked=false;

               if (_idleTimestamp!=-1)

                   scheduleIdle();

           }

       }

       return true;

   }

   

   而主线程的select主循环的代码如下:

/* ------------------------------------------------------------ */

   /**

    * Select and dispatch tasks found from changes and the selector.

    *

    * @throws IOException

    */

   public void doSelect() throws IOException

   {

       try

       {

           _selecting=Thread.currentThread();

           final Selector selector=_selector;

           // Make any key changes required

           Object change;

           int changes=_changes.size();

           while (changes-->0 && (change=_changes.poll())!=null)

           {

               try

               {

                   if (change instanceof EndPoint)

                   {

                       // Update the operations for a key.

                       SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;

                       endpoint.doUpdateKey();

                   }

                   else if (change instanceof ChannelAndAttachment)

                   {

                       // finish accepting/connecting this connection

                       final ChannelAndAttachment asc = (ChannelAndAttachment)change;

                       final SelectableChannel channel=asc._channel;

                       final Object att = asc._attachment;

                       SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);

                       SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);

                       key.attach(endpoint);

                       endpoint.schedule();

                   }

                   else if (change instanceof SocketChannel)

                   {

                       // Newly registered channel

                       final SocketChannel channel=(SocketChannel)change;

                       SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);

                       SelectChannelEndPoint endpoint = createEndPoint(channel,key);

                       key.attach(endpoint);

                       endpoint.schedule();

                   }

                   else if (change instanceof Runnable)

                   {

                       dispatch((Runnable)change);

                   }

                   else

                       throw new IllegalArgumentException(change.toString());

               }

               catch (Exception e)

               {

                   if (isRunning())

                       Log.warn(e);

                   else

                       Log.debug(e);

               }

               catch (Error e)

               {

                   if (isRunning())

                       Log.warn(e);

                   else

                       Log.debug(e);

               }

           }

           // Do and instant select to see if any connections can be handled.

           int selected=selector.selectNow();

           _selects++;

           long now=System.currentTimeMillis();

           

           // if no immediate things to do

           if (selected==0)

           {

               // If we are in pausing mode

               if (_pausing)

               {

                   try

                   {

                       Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop

                   }

                   catch(InterruptedException e)

                   {

                       Log.ignore(e);

                   }

                   now=System.currentTimeMillis();

               }

               // workout how long to wait in select

               _timeout.setNow(now);

               long to_next_timeout=_timeout.getTimeToNext();

               long wait = _changes.size()==0?__IDLE_TICK:0L;  

               if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)

                   wait = to_next_timeout;

               // If we should wait with a select

               if (wait>0)

               {

                   long before=now;

                   selected=selector.select(wait);

                   _selects++;

                   now = System.currentTimeMillis();

                   _timeout.setNow(now);

                   checkJvmBugs(before, now, wait, selected);

               }

           }

           

           // have we been destroyed while sleeping

           if (_selector==null || !selector.isOpen())

               return;

           // Look for things to do

           for (SelectionKey key: selector.selectedKeys())

           {  

               try

               {

                   if (!key.isValid())

                   {

                       key.cancel();

                       SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();

                       if (endpoint != null)

                           endpoint.doUpdateKey();

                       continue;

                   }

                   Object att = key.attachment();

                   if (att instanceof SelectChannelEndPoint)

                   {

                       ((SelectChannelEndPoint)att).schedule();

                   }

                   else

                   {

                       // Wrap readable registered channel in an endpoint

                       SocketChannel channel = (SocketChannel)key.channel();

                       SelectChannelEndPoint endpoint = createEndPoint(channel,key);

                       key.attach(endpoint);

                       if (key.isReadable())

                           endpoint.schedule();                          

                   }

                   key = null;

               }

               catch (CancelledKeyException e)

               {

                   Log.ignore(e);

               }

               catch (Exception e)

               {

                   if (isRunning())

                       Log.warn(e);

                   else

                       Log.ignore(e);

                   if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())

                       key.cancel();

               }

           }

           

           // Everything always handled

           selector.selectedKeys().clear();

           

           now=System.currentTimeMillis();

           _timeout.setNow(now);

           Task task = _timeout.expired();

           while (task!=null)

           {

               if (task instanceof Runnable)

                   dispatch((Runnable)task);

               task = _timeout.expired();

           }

           // Idle tick

           if (now-_idleTick>__IDLE_TICK)

           {

               _idleTick=now;

               

               final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))

                   ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)

                   :now;

                   

               dispatch(new Runnable()

               {

                   public void run()

                   {

                       for (SelectChannelEndPoint endp:_endPoints.keySet())

                       {

                           endp.checkIdleTimestamp(idle_now);

                       }

                   }

               });

           }

       }

       catch (CancelledKeyException e)

       {

           Log.ignore(e);

       }

       finally

       {

           _selecting=null;

       }

   }

   /* ------------------------------------------------------------ */

   private void checkJvmBugs(long before, long now, long wait, int selected)

       throws IOException

   {

       Selector selector = _selector;

       if (selector==null)

           return;

           

       // Look for JVM bugs over a monitor period.

       // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933

       // http://bugs.sun.com/view_bug.do?bug_id=6693490

       if (now>_monitorNext)

       {

           _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));

           _pausing=_selects>__MAX_SELECTS;

           if (_pausing)

               _paused++;

           _selects=0;

           _jvmBug=0;

           _monitorStart=now;

           _monitorNext=now+__MONITOR_PERIOD;

       }

       if (now>_log)

       {

           if (_paused>0)  

               Log.debug(this+" Busy selector - injecting delay "+_paused+" times");

           if (_jvmFix2>0)

               Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");

           if (_jvmFix1>0)

               Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");

           else if(Log.isDebugEnabled() && _jvmFix0>0)

               Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");

           _paused=0;

           _jvmFix2=0;

           _jvmFix1=0;

           _jvmFix0=0;

           _log=now+60000;

       }

       // If we see signature of possible JVM bug, increment count.

       if (selected==0 && wait>10 && (now-before)<(wait/2))

       {

           // Increment bug count and try a work around

           _jvmBug++;

           if (_jvmBug>(__JVMBUG_THRESHHOLD))

           {

               try

               {

                   if (_jvmBug==__JVMBUG_THRESHHOLD+1)

                       _jvmFix2++;

                   Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop

               }

               catch(InterruptedException e)

               {

                   Log.ignore(e);

               }

           }

           else if (_jvmBug==__JVMBUG_THRESHHOLD)

           {

               synchronized (this)

               {

                   // BLOODY SUN BUG !!!  Try refreshing the entire selector.

                   final Selector new_selector = Selector.open();

                   for (SelectionKey k: selector.keys())

                   {

                       if (!k.isValid() || k.interestOps()==0)

                           continue;

                       final SelectableChannel channel = k.channel();

                       final Object attachment = k.attachment();

                       if (attachment==null)

                           addChange(channel);

                       else

                           addChange(channel,attachment);

                   }

                   _selector.close();

                   _selector=new_selector;

                   return;

               }

           }

           else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops

           {

               // Cancel keys with 0 interested ops

               int cancelled=0;

               for (SelectionKey k: selector.keys())

               {

                   if (k.isValid()&&k.interestOps()==0)

                   {

                       k.cancel();

                       cancelled++;

                   }

               }

               if (cancelled>0)

                   _jvmFix0++;

               return;

           }

       }

       else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)

       {

           // Look for busy key

           SelectionKey busy = selector.selectedKeys().iterator().next();

           if (busy==_busyKey)

           {

               if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))

               {

                   final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();

                   Log.warn("Busy Key "+busy.channel()+" "+endpoint);

                   busy.cancel();

                   if (endpoint!=null)

                   {

                       dispatch(new Runnable()

                       {

                           public void run()

                           {

                               try

                               {

                                   endpoint.close();

                               }

                               catch (IOException e)

                               {

                                   Log.ignore(e);

                               }

                           }

                       });

                   }

               }

           }

           else

               _busyKeyCount=0;

           _busyKey=busy;

       }

   }

   

   在checkjvmbugs的方法中,当主线程正在阻塞selector.select(wait)时,子线程出现blockwrite的情况,会导致主线程被惊醒,使得主线程select到的事件数为0,当selector管理大量的连接时,会出现1秒内上百次主线程被惊醒,最终在checkjvmbugs方法内被当作是jvmbug来处理了,强制让主线程休眠50毫秒,而此时还有大量的线程注册阻塞事件,并等待主线程的唤醒。最终一个12K的数据花费了50ms的时间来写往客户端,最终导致性能下降。

   问题已经很清晰,jetty自身的阻塞-唤醒机制被当作jvmbugs来处理,导致数据传输性能受影响,这样的情况也只有在有大量连接和数据传输的时候才会体现出来。

   修改方案,可以将checkjvmbugs调用直接注释掉,保留每秒select 25000次后休眠一段时间的实现即可。也可以将50毫秒的休眠时间调短来避免阻塞时间过长的问题。

   分析最新的jetty代码,发现这个checkjvmbugs的调用已经去掉了,正是按照方案1来实现的。

   问题明确了之后,看似比较简单,但实际上发现问题的过程是很漫长和曲折的,在问题已经锁定在上面之后,有幸参与最终的定位并且分析出了具体的问题点,问题找到的那一刻,有一种兴奋感与成就感。

建议继续学习:

  1. Netty和Jetty的Java NIO 网络框架模型分析    (阅读:4551)
  2. Apache + Jetty 架设 CAS 单点登录    (阅读:4260)
  3. 关于 Jetty Continuation    (阅读:2535)
  4. Jetty 8长连接上的又一个坑    (阅读:1498)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1