简述:以上是一张来自网上比較经典的图,总体上揭示了mina的结构,当中IoService包括clientIoConnector和服务端IoAcceptor两部分。即不管是client还是服务端都是这个结构。IoService封装了网络传输层(TCP和UDP),而IoFilterChain中mina自带的filter做了一些主要的操作之外,支持扩展。经过FilterChain之后终于调用IoHandler,IoHandler是详细实现业务逻辑的处理接口,详细的业务实现可扩展。 2.一个可执行的案例(案例来自网上,转载后试验): Client.java:import java.net.InetSocketAddress;import java.nio.charset.Charset;import java.util.Random;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.future.IoFutureListener;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.textline.TextLineCodecFactory;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class Client extends IoHandlerAdapter { private Random random = new Random(System.currentTimeMillis()); public Client() { IoConnector connector = new NioSocketConnector(); connector.getFilterChain().addLast( "text", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName(Server.ENCODE)))); connector.setHandler(this); ConnectFuture future = connector.connect(new InetSocketAddress( "", Server.PORT)); future.awaitUninterruptibly(); future.addListener(new IoFutureListener() { @Override public void operationComplete(ConnectFuture future) { IoSession session = future.getSession(); while (!session.isClosing()) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } String message = "你好。我roll了" + random.nextInt(100) + "点."; session.write(message); } } }); connector.dispose(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { System.out.println("批复:" + message.toString()); } @Override public void messageSent(IoSession session, Object message) throws Exception { System.out.println("报告:" + message.toString()); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); session.close(true); } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Client(); } }}
import java.net.InetSocketAddress;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;public class ServerHandler extends IoHandlerAdapter { @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); session.close(false); } public void messageReceived(IoSession session, Object message) throws Exception { String s = message.toString(); System.out.println("收到请求:" + s); if (s != null) { int i = getPoint(s); if (session.isConnected()) { if (i >= 95) { session.write("运气不错,你能够出去了."); session.close(false); return; } Integer count = (Integer) session.getAttribute(Server.KEY); count++; session.setAttribute(Server.KEY, count); session.write("抱歉。你运气太差了,第" + count + "次请求未被通过。继续在小黑屋呆着吧."); } else { session.close(true); } } } @Override public void messageSent(IoSession session, Object message) throws Exception { System.out.println("发给client:" + message.toString()); } @Override public void sessionClosed(IoSession session) throws Exception { long l = session.getCreationTime(); System.out.println("来自" + getInfo(session) + "的会话已经关闭,它已经存活了" + (System.currentTimeMillis() - 1) + "毫秒"); } @Override public void sessionCreated(IoSession session) throws Exception { System.out.println("给" + getInfo(session) + "创建了一个会话"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { System.out.println("来自" + getInfo(session) + "的会话闲置,状态为" + status.toString()); } public void sessionOpened(IoSession session) throws Exception { session.setAttribute(Server.KEY, 0); System.out.println("和" + getInfo(session) + "的会话已经打开."); } public String getInfo(IoSession session) { if (session == null) { return null; } InetSocketAddress address = (InetSocketAddress) session .getRemoteAddress(); int port = address.getPort(); String ip = address.getAddress().getHostAddress(); return ip + ":" + port; } public int getPoint(String s) { if (s == null) { return -1; } Pattern p = Pattern.compile("^[\u0041-\uFFFF,]*(\\d+).*$"); Matcher m = p.matcher(s); if (m.matches()) { return Integer.valueOf(m.group(1)); } return 0; }}
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.charset.Charset;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.textline.TextLineCodecFactory;import org.apache.mina.transport.socket.SocketAcceptor;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class Server { public static final int PORT = 2534; public static String ENCODE = "UTF-8"; public static final String KEY = "roll"; public static void main(String[] args){ SocketAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast( "text", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName(ENCODE)))); acceptor.setHandler(new ServerHandler()); try { acceptor.bind(new InetSocketAddress(PORT)); System.out.println("游戏開始,你想出去吗,来,碰碰运气吧!"); } catch (IOException e) { e.printStackTrace(); acceptor.dispose(); } }}
二、mina 核心源代码分析:
AbstractPollingIoAcceptor 类的 startupAcceptor 方法例如以下:/** * This method is called by the doBind() and doUnbind() * methods. If the acceptor is null, the acceptor object will * be created and kicked off by the executor. If the acceptor * object is null, probably already created and this class * is now working, then nothing will happen and the method * will just return. */private void startupAcceptor() throws InterruptedException { // If the acceptor is not ready, clear the queues // TODO : they should already be clean : do we have to do that ?
if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started Acceptor acceptor = acceptorRef.get(); //这里仅仅会启动一个worker if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }
上面调用到 AbstractIoService 的 executeWorker方法例如以下:
protected final void executeWorker(Runnable worker) { executeWorker(worker, null);}protected final void executeWorker(Runnable worker, String suffix) { String actualThreadName = threadName; if (suffix != null) { actualThreadName = actualThreadName + '-' + suffix; } executor.execute(new NamePreservingRunnable(worker, actualThreadName));}
简述:有类AbstractPollingIoAcceptor 的 startupAcceptor方法(上文)能够看到,一个SocketAcceptor仅仅启动了一个Worker线程(即代码中的Acceptor对象)而且把他加到线程池中。反过来讲,也能够看出AbstractIoService维护了Worker的线程池。(ps:这个Worker就是服务端处理请求的线程)。
概述:从1中的启动时序能够看到,启动过程通过创建SocketAcceptor将有类AbstractPollingIoAcceptor的内部类Acceptor放到了 AbstractIoService的线程池里面,而这个Acceptor就是处理client网络请求的worker。而以下这个时序就是线程池中每一个worker处理client网络请求的时序流程。
ps:APR(Apache Protable Runtime Library,Apache可移植执行库)是能够提供非常好的可拓展性、性能以及对底层操作系统一致性操作的技术,说白了就是apache实现的一套标准的通讯接口。AprSocketAcceptor先不做深入了解,主要了解下NioSocketAcceptor,NioSocketAcceptor顾名思义,它调用了java NIO的API实现了NIO的网络连接处理过程。
AbstractPolling$Acceptor 的run方法的核心代码例如以下:
private class Acceptor implements Runnable { public void run() { assert (acceptorRef.get() == this); int nHandles = 0; // Release the lock lock.release(); while (selectable) { try { // Detect if we have some keys ready to be processed // The select() will be woke up if some new connection // have occurred, or if the selector has been explicitly // woke up //调用了NioSocketAcceptor的select方法,获取了selectKey int selected = select(); // this actually sets the selector to OP_ACCEPT, // and binds to the port on which this class will // listen on nHandles += registerHandles(); // Now, if the number of registred handles is 0, we can // quit the loop: we don't have any socket listening // for incoming connection. if (nHandles == 0) { acceptorRef.set(null); if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { assert (acceptorRef.get() != this); break; } if (!acceptorRef.compareAndSet(null, this)) { assert (acceptorRef.get() != this); break; } assert (acceptorRef.get() == this); } if (selected > 0) { // We have some connection request, let's process // them here. processHandles(selectedHandles()); } // check to see if any cancellation request has been made. nHandles -= unregisterHandles(); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop break; } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } // Cleanup all the processors, and shutdown the acceptor. if (selectable && isDisposing()) { selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } }
/** * This method will process new sessions for the Worker class. All * keys that have had their status updates as per the Selector.selectedKeys() * method will be processed here. Only keys that are ready to accept * connections are handled here. * * Session objects are created by making new instances of SocketSessionImpl * and passing the session object to the SocketIoProcessor class. */ @SuppressWarnings("unchecked") private void processHandles(Iteratorhandles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session //这里调用了NioSocketAcceptor的accept方法 S session = accept(processor, handle); if (session == null) { continue; } initSession(session, null, null); // add the session to the SocketIoProcessor // 这步处理add操作,会触发对client请求的异步处理。
session.getProcessor().add(session); } }
/** * {@inheritDoc} */@Overrideprotected NioSession accept(IoProcessorprocessor, ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch);}
/** * Starts the inner Processor, asking the executor to pick a thread in its * pool. The Runnable will be renamed */private void startupProcessor() { Processor processor = processorRef.get(); if (processor == null) { processor = new Processor(); if (processorRef.compareAndSet(null, processor)) { executor.execute(new NamePreservingRunnable(processor, threadName)); } } // Just stop the select() and start it again, so that the processor // can be activated immediately. wakeup();}
简述:这个startupProcessor方法在调用 session里包装的processor的add方法是,触发了将处理client请求的processor放入异步处理的线程池中。兴许详细Processor怎么处理client请求的流程,涉及到FilterChain的过滤。以及Adapter的调用。用来处理业务逻辑。详细的异步处理时序看以下的时序图:
private class Processor implements Runnable { public void run() { assert (processorRef.get() == this); int nSessions = 0; lastIdleCheckTime = System.currentTimeMillis(); for (;;) { try { // This select has a timeout so that we can manage // idle session when we get out of the select every // second. (note : this is a hack to avoid creating // a dedicated thread). long t0 = System.currentTimeMillis(); //调用了NioProcessor int selected = select(SELECT_TIMEOUT); long t1 = System.currentTimeMillis(); long delta = (t1 - t0); if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) { // Last chance : the select() may have been // interrupted because we have had an closed channel. if (isBrokenConnection()) { LOG.warn("Broken connection"); // we can reselect immediately // set back the flag to false wakeupCalled.getAndSet(false); continue; } else { LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0)); // Ok, we are hit by the nasty(讨厌的) epoll // spinning. // Basically, there is a race condition // which causes a closing file descriptor not to be // considered as available as a selected channel, but // it stopped the select. The next time we will // call select(), it will exit immediately for the same // reason, and do so forever, consuming 100% // CPU. // We have to destroy the selector, and // register all the socket on a new one. registerNewSelector(); } // Set back the flag to false wakeupCalled.getAndSet(false); // and continue the loop continue; } // Manage newly created session first nSessions += handleNewSessions(); updateTrafficMask(); // Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test... //触发了详细的调用逻辑 process(); } // Write the pending requests long currentTime = System.currentTimeMillis(); flush(currentTime); // And manage removed sessions nSessions -= removeSessions(); // Last, not least, send Idle events to the idle sessions notifyIdleSessions(currentTime); // Get a chance to exit the infinite loop if there are no // more sessions on this Processor if (nSessions == 0) { processorRef.set(null); if (newSessions.isEmpty() && isSelectorEmpty()) { // newSessions.add() precedes startupProcessor assert (processorRef.get() != this); break; } assert (processorRef.get() != this); if (!processorRef.compareAndSet(null, this)) { // startupProcessor won race, so must exit processor assert (processorRef.get() != this); break; } assert (processorRef.get() == this); } // Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. if (isDisposing()) { for (Iteratori = allSessions(); i.hasNext();) { scheduleRemove(i.next()); } wakeup(); } } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop break; } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } try { synchronized (disposalLock) { if (disposing) { doDispose(); } } } catch (Throwable t) { ExceptionMonitor.getInstance().exceptionCaught(t); } finally { disposalFuture.setValue(true); } }}
简述:这么一坨代码能够看出,这个处理器也调用了java的Nio API是一个NIO模型。当中select和process方法各自是从session拿到要处理的请求,并进行处理。而详细的Processor实例是NioProcessor。从加入凝视的代码中有一步调用了自身的process方法,这步调用触发了详细业务逻辑的调用。能够结合代码和时序图看下。在Process方法中会调用reader(session)或wirte(session)方法,然后调用fireMessageReceived方法,这种方法又调用了callNextMessageReceived方法致使触发了整个FilterChain和Adapter的调用。read方法的核心代码例如以下:
private void read(S session) { IoSessionConfig config = session.getConfig(); int bufferSize = config.getReadBufferSize(); IoBuffer buf = IoBuffer.allocate(bufferSize); final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); try { int readBytes = 0; int ret; try { if (hasFragmentation) { while ((ret = read(session, buf)) > 0) { readBytes += ret; if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) { session.increaseReadBufferSize(); } } } if (ret < 0) { scheduleRemove(session); } } catch (Throwable e) { if (e instanceof IOException) { if (!(e instanceof PortUnreachableException) || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass()) || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) { scheduleRemove(session); } } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); }}
简述: 从类继承结构图来看,能够看到在IOService体系下,存在IoConnector和IoAcceptor两个大的分支体系。IoConnector是做为client的时候使用,IoAcceptor是作为服务端的时候使用。实际上在Mina中,有三种worker线程各自是:Acceptor、Connector 和 I/O processor。
(1) Acceptor Thread 作为server端的链接线程,实现了IoService接口。线程的数量就是创建SocketAcceptor的数量。 (2) Connector Thread 作为client请求建立的链接线程,实现了IoService接口,维持了一个和服务端Acceptor的一个链接,线程的数量就是创建SocketConnector的数量。 (3) I/O processorThread 作为I/O真正处理的线程,存在于server端和client。线程的数量是能够配置的,默认是CPU个数+1。上面那个图仅仅是表述了IoService类体系,而I/O Processor的类体系并不在当中,见下图:
简述:IOProcessor主要分为两种。各自是AprIOProcessor和NioProcessor,Apr的解释见上文:ps:APR(Apache Protable Runtime Library,Apache可移植执行库)。
NioProcessor也是Nio的一种实现,用来处理client连接过来的请求。在Processor中会调用到 FilterChain 和 Handler,见上文代码。先看下FilterChain的类结构图例如以下:
Filter 和 Handler的类结构例如以下: