JAVA NIO多线程服务器1.2版 (一)

2014-11-24 02:29:14 · 作者: · 浏览: 5

Reactor 模式的 JAVA NIO 多线程服务器

public class MiniServer extends Thread
{
private static final Log log = LogFactory.getLog(MiniServer.class);

private final Selector s;
private final ServerSocketChannel ssc;
private ExecutorService executor;

public MiniServer(int portnumber,ExecutorService executor) throws IOException
{
this.executor=executor;
s = Selector.open();
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(portnumber));
ssc.configureBlocking(false);
ssc.register(s,SelectionKey.OP_ACCEPT);
}

public void run()
{
try
{
while(s.isOpen())
{
int nKeys=s.select();
if(nKeys>0)
{
Iterator it = s.selectedKeys().iterator();
while (it.hasNext())
{
SelectionKey key = it.next();
it.remove();
if (!key.isValid() || !key.channel().isOpen())
continue;
if(key.isAcceptable())
{
SocketChannel sc = ssc.accept();
if (sc != null)
{
sc.configureBlocking(false);
sc.register(s, SelectionKey.OP_READ, new Reader(executor));
}
}
else if(key.isReadable()||key.isWritable())
{
Reactor reactor = (Reactor) key.attachment();
reactor.execute(key);
}
}
}
}
}
catch(IOException e)
{
log.info(e);
}
}
}


public interface Reactor
{
void execute(SelectionKey key);
}


public class Reader implements Reactor
{
private static final Log log = LogFactory.getLog(Reader.class);

private byte[] bytes=new byte[0];
private ExecutorService executor;

public Reader(ExecutorService executor)
{
this.executor=executor;
}

@Override
public void execute(SelectionKey key)
{
SocketChannel sc = (SocketChannel) key.channel();
try
{
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=-1;
while(sc.isConnected() && (len=sc.read(buffer))>0)
{
buffer.flip();
byte [] content = new byte[buffer.limit()];
buffer.get(content);
bytes=NutUtil.ArrayCoalition(bytes,content);
buffer.clear();
}
if(len==0)
{
key.interestOps(SelectionKey.OP_READ);
key.selector().wakeup();
}
else if(len==-1)
{
Callable call=new ProcessCallable(bytes);
Future task=executor.submit(call);
ByteBuffer output=ByteBuffer.wrap(task.get());
sc.register(key.selector(), SelectionKey.OP_WRITE, new Writer(output));
}
}
catch(