Reactor线程模型的解析及实现 有更新!

  ,
评论 • 167 浏览

五一在家研究了下nio,看到了Doug Lea写的nio-Scalable IO in java这篇文章,了解到了reactor线程模型。并且根据其理念使用nio实现了reactor线程模型。
这里附上Doug Lea大神的个人page

传统的设计

png

我们先来看看传统的设计。
(PS:这篇文章所有的图都来自Scalable IO in java这篇文章)
在传统的设计中,服务端每接收到一个请求都会开一个线程去处理。在并发小的情况下还能抗的住,请求量大了服务器基本就完蛋了。
线程的创建,上下文的切换带来的损耗无法估计。
这种方案已经被抛弃。
有人讲什么体量小,什么业务小这么写。但是我觉得代码写出来是不受控制的。除非你这个系统就是实验一下效果。否则尽量写好点。
现在估计也就外包公司的程序员这么干了。

nio中的多路复用

niopng

我写的上一篇文章介绍nio的时候就提到了这个多路复用。
在nio中通过Channel,Buffer,Selector,SelectionKey实现了靠单线程来处理所有的事件请求。
相对于传统的设计,资源消耗及稳定性上肯定是有所提升。
但是单个线程即要处理IO连接请求,也要处理IO读写请求,还需要处理业务。并发量一上来,肯定很多IO事件得不到及时的响应,从而造成超时。

Worker Thread Pools

nioworkerpng
对上面的模型加以改进。
实际上这种处理方式就是在nio多路复用的基础上把业务的操作交由一个线程池去处理。
这样reactor线程只需要负责监听连接,处理IO事件,进行分发调度即可。
相对于前一种模型,有效的将框架设计与业务处理进行解耦合。同时提高了系统处理的能力。
但是,如果并发量进一步提高。reactor线程即要处理连接也需要处理事件。仍然会有性能瓶颈。
需要怎么做呢?
可以将连接监听和事件处理分开。将事件处理交由一个线程池去做。

Multiple Reactors

reactorspng

这种设计相对于上一种,其实就是将accept和事件处理分开到一个mainReactor和subReactor中。
在mainReactor中,线程只负责监听客户端发来的accept请求,然后进行分发。交给subReactor去处理。
subReactor负责数据读写。subReactor的设计可以采用线程池去处理。
subReactor读到数据后交给woker线程池去处理具体业务。
大大提高性能。

javaReactorspng

Doug Lea还提出了相关建议。
可以使用多个selector绑定到不同的Reactor上来实现multiple reactors。
可以考虑使用堆外内存实现零拷贝来提高性能。
注意同步的问题。因为在nio的源码里面很多操作都需要同一把锁。
比如selector注册就会调用到SelectorImpl里面的register方法,里面需要对publicKeys进行加锁。源码如下:

protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {  
    if(!(var1 instanceof SelChImpl)) {  
        throw new IllegalSelectorException();  
     } else {  
    SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);  
    var4.attach(var3);  
    Set var5 = this.publicKeys;  
    //需要publicKeys锁
    synchronized(this.publicKeys) {  
            this.implRegister(var4);  
     }  
    var4.interestOps(var2);  
    return var4;  
  }  
}

而在selector的select()这个方法中。最终会调用到SelectorImpl的lockAndDoSelect()方法。这个方法中也会对publicKeys进行加锁。源码如下:

private int lockAndDoSelect(long var1) throws IOException {  
    synchronized(this) {  
        if(!this.isOpen()) {  
            throw new ClosedSelectorException();  
        } else {  
        Set var4 = this.publicKeys;  
        int var10000;  
        synchronized(this.publicKeys) {  
                Set var5 = this.publicSelectedKeys;  
                synchronized(this.publicSelectedKeys) {  
                    var10000 = this.doSelect(var1);  
                }  
        }  
        return var10000;  
      }  
   }  
}

我参考这个设计里面通过nio实现了简要版Reactor线程模型。
大体理念如下:

/**  
 * 处理业务操作的线程  
  */  
private static ExecutorService workPool = Executors.newCachedThreadPool();  
/**  
 * 主reactor负责监听连接,accept  
 */private ReactorThread[] mainReactor=new ReactorThread[1];  
/**  
 * 子reactor负责处理IO读写发送  
  */  
private ReactorThread[] subReactor;  
/**  
 * 只有一个serverSocketChannel来监听断开  
  */  
private ServerSocketChannel serverSocketChannel;  
/**  
 * 用来分发subReactor的序号  
  */  
private int subNext = 0;

mainReactor负责监听accept连接。
subReactor负责处理IO读写发送。
workPool 负责业务处理。

初始化的时候构造生成mainReactor线程和subReactor线程组。

public void eventLoopGroup(int subThreads) throws IOException {  
  subReactor = new ReactorThread[subThreads];  
  /**  
 * 初始化构造主reactor  
 */  
 initMainReactor();  
  /**  
 * 初始化构造子reactor  
 */  
 initSubReactor();  
}

指定一个mainReactor作为serverSocketChannel去监听连接。

public void initAndRegister(int port) throws IOException, ExecutionException, InterruptedException {  
  serverSocketChannel = ServerSocketChannel.open();  
  serverSocketChannel.configureBlocking(false);  
  //获取一个主线程  
  ReactorThread main = mainReactor[0];  
  main.doStart();  
  //主线程只负责accept事件  
  main.register(serverSocketChannel, SelectionKey.OP_ACCEPT);  
  serverSocketChannel.socket().bind(new InetSocketAddress(port));  
}

mainReactor监听到事件之后负责选择具体某一个subReactor去处理。

@Override  
void handler(SelectableChannel channel) throws IOException, ExecutionException, InterruptedException {  
  SocketChannel clientChannel = ((ServerSocketChannel) channel).accept();  
  ReactorThread sub = subReactor[subNext++ % subReactor.length];  
  System.out.println(" 主线程 :" + getName() + " 收到一个连接事件,交给子线程处理,子线程:" + sub.getName());  
  clientChannel.configureBlocking(false);  
  sub.doStart();  
  sub.register(clientChannel, SelectionKey.OP_READ);  
}

subReactor里面负责数据读和写,然后交由worker线程池去处理。

//读数据
while (clientChannel.isOpen() && clientChannel.read(byteBuffers) != -1) {  
    if (byteBuffers.position() > 0) {  
        break;  
  }  
}
...
//业务处理  
workPool.submit(() -> {  
    //TODO 业务处理  
});
...
//写数据
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());  
while (buffer.hasRemaining()) {  
    clientChannel.write(buffer);  
}

具体源码放在了我的github上。大家有兴趣可以看下。写的难免有很多漏洞,还请指正。

 
评论
validate