Jetty线程“互锁”导致数据传输性能降低问题分析
以下分析针对jetty的特定版本:http://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/tags/jetty-7.2.1.v20101111
首先介绍一下Jetty的反映器模型,Jetty用的经典的NIO异步模型(Scalable IO in Java http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。连接管理的示意图如下:
Jetty在使用这个模型的时候,做了一些改动,acceptor是独立出来的一个阻塞线程,用于阻塞地接受新的连接请求,而所有的连接建立之后,都会想selector线程注册网络事件和内部的事件(changes),selector需要同时处理网络事件和内部的changes。同时还要定期检查超时的链接。
当一个连接建立之后,除了分发网络事件之外,主线程还会与子线程有一些交互。当子线程发现网络拥塞,缓冲区的数据无法及时刷走时,会注册一个表明自己处于阻塞状态的内部事件,并且期望主线程在发现拥塞解除的时候能通知到自己。
具体的代码如下:
/* ------------------------------------------------------------ *//*
* Allows thread to block waiting for further events.
*/
@Override
public boolean blockWritable(long timeoutMs) throws IOException
{
synchronized (this)
{
long start=_selectSet.getNow();
try
{
_writeBlocked=true;
while (isOpen() && _writeBlocked)
{
try
{
updateKey();
this.wait(timeoutMs);
timeoutMs -= _selectSet.getNow()-start;
if (_writeBlocked && timeoutMs<=0)
return false;
}
catch (InterruptedException e)
{
Log.warn(e);
}
}
}
finally
{
_writeBlocked=false;
if (_idleTimestamp!=-1)
scheduleIdle();
}
}
return true;
}
而主线程的select主循环的代码如下:
/* ------------------------------------------------------------ *//**
* Select and dispatch tasks found from changes and the selector.
*
* @throws IOException
*/
public void doSelect() throws IOException
{
try
{
_selecting=Thread.currentThread();
final Selector selector=_selector;
// Make any key changes required
Object change;
int changes=_changes.size();
while (changes-->0 && (change=_changes.poll())!=null)
{
try
{
if (change instanceof EndPoint)
{
// Update the operations for a key.
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
endpoint.doUpdateKey();
}
else if (change instanceof ChannelAndAttachment)
{
// finish accepting/connecting this connection
final ChannelAndAttachment asc = (ChannelAndAttachment)change;
final SelectableChannel channel=asc._channel;
final Object att = asc._attachment;
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (change instanceof SocketChannel)
{
// Newly registered channel
final SocketChannel channel=(SocketChannel)change;
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (change instanceof Runnable)
{
dispatch((Runnable)change);
}
else
throw new IllegalArgumentException(change.toString());
}
catch (Exception e)
{
if (isRunning())
Log.warn(e);
else
Log.debug(e);
}
catch (Error e)
{
if (isRunning())
Log.warn(e);
else
Log.debug(e);
}
}
// Do and instant select to see if any connections can be handled.
int selected=selector.selectNow();
_selects++;
long now=System.currentTimeMillis();
// if no immediate things to do
if (selected==0)
{
// If we are in pausing mode
if (_pausing)
{
try
{
Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop
}
catch(InterruptedException e)
{
Log.ignore(e);
}
now=System.currentTimeMillis();
}
// workout how long to wait in select
_timeout.setNow(now);
long to_next_timeout=_timeout.getTimeToNext();
long wait = _changes.size()==0?__IDLE_TICK:0L;
if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
wait = to_next_timeout;
// If we should wait with a select
if (wait>0)
{
long before=now;
selected=selector.select(wait);
_selects++;
now = System.currentTimeMillis();
_timeout.setNow(now);
checkJvmBugs(before, now, wait, selected);
}
}
// have we been destroyed while sleeping
if (_selector==null || !selector.isOpen())
return;
// Look for things to do
for (SelectionKey key: selector.selectedKeys())
{
try
{
if (!key.isValid())
{
key.cancel();
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
if (endpoint != null)
endpoint.doUpdateKey();
continue;
}
Object att = key.attachment();
if (att instanceof SelectChannelEndPoint)
{
((SelectChannelEndPoint)att).schedule();
}
else
{
// Wrap readable registered channel in an endpoint
SocketChannel channel = (SocketChannel)key.channel();
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
if (key.isReadable())
endpoint.schedule();
}
key = null;
}
catch (CancelledKeyException e)
{
Log.ignore(e);
}
catch (Exception e)
{
if (isRunning())
Log.warn(e);
else
Log.ignore(e);
if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
key.cancel();
}
}
// Everything always handled
selector.selectedKeys().clear();
now=System.currentTimeMillis();
_timeout.setNow(now);
Task task = _timeout.expired();
while (task!=null)
{
if (task instanceof Runnable)
dispatch((Runnable)task);
task = _timeout.expired();
}
// Idle tick
if (now-_idleTick>__IDLE_TICK)
{
_idleTick=now;
final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
:now;
dispatch(new Runnable()
{
public void run()
{
for (SelectChannelEndPoint endp:_endPoints.keySet())
{
endp.checkIdleTimestamp(idle_now);
}
}
});
}
}
catch (CancelledKeyException e)
{
Log.ignore(e);
}
finally
{
_selecting=null;
}
}
/* ------------------------------------------------------------ */
private void checkJvmBugs(long before, long now, long wait, int selected)
throws IOException
{
Selector selector = _selector;
if (selector==null)
return;
// Look for JVM bugs over a monitor period.
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
// http://bugs.sun.com/view_bug.do?bug_id=6693490
if (now>_monitorNext)
{
_selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
_pausing=_selects>__MAX_SELECTS;
if (_pausing)
_paused++;
_selects=0;
_jvmBug=0;
_monitorStart=now;
_monitorNext=now+__MONITOR_PERIOD;
}
if (now>_log)
{
if (_paused>0)
Log.debug(this+" Busy selector - injecting delay "+_paused+" times");
if (_jvmFix2>0)
Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
if (_jvmFix1>0)
Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
else if(Log.isDebugEnabled() && _jvmFix0>0)
Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
_paused=0;
_jvmFix2=0;
_jvmFix1=0;
_jvmFix0=0;
_log=now+60000;
}
// If we see signature of possible JVM bug, increment count.
if (selected==0 && wait>10 && (now-before)<(wait/2))
{
// Increment bug count and try a work around
_jvmBug++;
if (_jvmBug>(__JVMBUG_THRESHHOLD))
{
try
{
if (_jvmBug==__JVMBUG_THRESHHOLD+1)
_jvmFix2++;
Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
}
catch(InterruptedException e)
{
Log.ignore(e);
}
}
else if (_jvmBug==__JVMBUG_THRESHHOLD)
{
synchronized (this)
{
// BLOODY SUN BUG !!! Try refreshing the entire selector.
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
return;
}
}
else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
{
// Cancel keys with 0 interested ops
int cancelled=0;
for (SelectionKey k: selector.keys())
{
if (k.isValid()&&k.interestOps()==0)
{
k.cancel();
cancelled++;
}
}
if (cancelled>0)
_jvmFix0++;
return;
}
}
else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
{
// Look for busy key
SelectionKey busy = selector.selectedKeys().iterator().next();
if (busy==_busyKey)
{
if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
{
final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
Log.warn("Busy Key "+busy.channel()+" "+endpoint);
busy.cancel();
if (endpoint!=null)
{
dispatch(new Runnable()
{
public void run()
{
try
{
endpoint.close();
}
catch (IOException e)
{
Log.ignore(e);
}
}
});
}
}
}
else
_busyKeyCount=0;
_busyKey=busy;
}
}
在checkjvmbugs的方法中,当主线程正在阻塞selector.select(wait)时,子线程出现blockwrite的情况,会导致主线程被惊醒,使得主线程select到的事件数为0,当selector管理大量的连接时,会出现1秒内上百次主线程被惊醒,最终在checkjvmbugs方法内被当作是jvmbug来处理了,强制让主线程休眠50毫秒,而此时还有大量的线程注册阻塞事件,并等待主线程的唤醒。最终一个12K的数据花费了50ms的时间来写往客户端,最终导致性能下降。
问题已经很清晰,jetty自身的阻塞-唤醒机制被当作jvmbugs来处理,导致数据传输性能受影响,这样的情况也只有在有大量连接和数据传输的时候才会体现出来。
修改方案,可以将checkjvmbugs调用直接注释掉,保留每秒select 25000次后休眠一段时间的实现即可。也可以将50毫秒的休眠时间调短来避免阻塞时间过长的问题。
分析最新的jetty代码,发现这个checkjvmbugs的调用已经去掉了,正是按照方案1来实现的。
问题明确了之后,看似比较简单,但实际上发现问题的过程是很漫长和曲折的,在问题已经锁定在上面之后,有幸参与最终的定位并且分析出了具体的问题点,问题找到的那一刻,有一种兴奋感与成就感。
建议继续学习:
- Netty和Jetty的Java NIO 网络框架模型分析 (阅读:4362)
- Apache + Jetty 架设 CAS 单点登录 (阅读:4161)
- 关于 Jetty Continuation (阅读:2436)
- Jetty 8长连接上的又一个坑 (阅读:1269)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:童燕群 来源: 忘我的追寻
- 标签: Jetty 互锁
- 发布时间:2013-09-23 23:06:25
- [67] Go Reflect 性能
- [67] Oracle MTS模式下 进程地址与会话信
- [67] 如何拿下简短的域名
- [61] IOS安全–浅谈关于IOS加固的几种方法
- [60] 图书馆的世界纪录
- [59] android 开发入门
- [59] 【社会化设计】自我(self)部分――欢迎区
- [56] 视觉调整-设计师 vs. 逻辑
- [49] 给自己的字体课(一)——英文字体基础
- [47] 界面设计速成