博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MINA2 源代码学习--源代码结构梳理
阅读量:6716 次
发布时间:2019-06-25

本文共 22776 字,大约阅读时间需要 75 分钟。

一、mina总体框架与案例:

1.总体结构图:

简述:以上是一张来自网上比較经典的图,总体上揭示了mina的结构,当中IoService包括clientIoConnector和服务端IoAcceptor两部分。即不管是client还是服务端都是这个结构。IoService封装了网络传输层(TCP和UDP),而IoFilterChain中mina自带的filter做了一些主要的操作之外,支持扩展。经过FilterChain之后终于调用IoHandler,IoHandler是详细实现业务逻辑的处理接口,详细的业务实现可扩展。

2.一个可执行的案例(案例来自网上,转载后试验):
Client.java:

import java.net.InetSocketAddress;import java.nio.charset.Charset;import java.util.Random;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.future.IoFutureListener;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.textline.TextLineCodecFactory;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class Client extends IoHandlerAdapter {    private Random random = new Random(System.currentTimeMillis());    public Client() {        IoConnector connector = new NioSocketConnector();        connector.getFilterChain().addLast(                "text",                new ProtocolCodecFilter(new TextLineCodecFactory(Charset                        .forName(Server.ENCODE))));        connector.setHandler(this);        ConnectFuture future = connector.connect(new InetSocketAddress(                "127.0.0.1", Server.PORT));        future.awaitUninterruptibly();        future.addListener(new IoFutureListener
() { @Override public void operationComplete(ConnectFuture future) { IoSession session = future.getSession(); while (!session.isClosing()) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } String message = "你好。我roll了" + random.nextInt(100) + "点."; session.write(message); } } }); connector.dispose(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { System.out.println("批复:" + message.toString()); } @Override public void messageSent(IoSession session, Object message) throws Exception { System.out.println("报告:" + message.toString()); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); session.close(true); } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Client(); } }}

ServerHandler.java:

import java.net.InetSocketAddress;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;public class ServerHandler extends IoHandlerAdapter {    @Override    public void exceptionCaught(IoSession session, Throwable cause)            throws Exception {        cause.printStackTrace();        session.close(false);    }    public void messageReceived(IoSession session, Object message)            throws Exception {        String s = message.toString();        System.out.println("收到请求:" + s);        if (s != null) {            int i = getPoint(s);            if (session.isConnected()) {                if (i >= 95) {                    session.write("运气不错,你能够出去了.");                    session.close(false);                    return;                }                Integer count = (Integer) session.getAttribute(Server.KEY);                count++;                session.setAttribute(Server.KEY, count);                session.write("抱歉。你运气太差了,第" + count + "次请求未被通过。继续在小黑屋呆着吧.");            } else {                session.close(true);            }        }    }    @Override    public void messageSent(IoSession session, Object message) throws Exception {        System.out.println("发给client:" + message.toString());    }    @Override    public void sessionClosed(IoSession session) throws Exception {        long l = session.getCreationTime();        System.out.println("来自" + getInfo(session) + "的会话已经关闭,它已经存活了"                + (System.currentTimeMillis() - 1) + "毫秒");    }    @Override    public void sessionCreated(IoSession session) throws Exception {        System.out.println("给" + getInfo(session) + "创建了一个会话");    }    @Override    public void sessionIdle(IoSession session, IdleStatus status)            throws Exception {        System.out.println("来自" + getInfo(session) + "的会话闲置,状态为"                + status.toString());    }    public void sessionOpened(IoSession session) throws Exception {        session.setAttribute(Server.KEY, 0);        System.out.println("和" + getInfo(session) + "的会话已经打开.");    }    public String getInfo(IoSession session) {        if (session == null) {            return null;        }        InetSocketAddress address = (InetSocketAddress) session                .getRemoteAddress();        int port = address.getPort();        String ip = address.getAddress().getHostAddress();        return ip + ":" + port;    }    public int getPoint(String s) {        if (s == null) {            return -1;        }        Pattern p = Pattern.compile("^[\u0041-\uFFFF,]*(\\d+).*$");        Matcher m = p.matcher(s);        if (m.matches()) {            return Integer.valueOf(m.group(1));        }        return 0;    }}

Server.java:

import java.io.IOException;import java.net.InetSocketAddress;import java.nio.charset.Charset;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.textline.TextLineCodecFactory;import org.apache.mina.transport.socket.SocketAcceptor;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class Server {    public static final int PORT = 2534;    public static String ENCODE = "UTF-8";    public static final String KEY = "roll";    public static void main(String[] args){         SocketAcceptor acceptor = new NioSocketAcceptor();        acceptor.getFilterChain().addLast(                "text",                new ProtocolCodecFilter(new TextLineCodecFactory(Charset                        .forName(ENCODE))));        acceptor.setHandler(new ServerHandler());        try {            acceptor.bind(new InetSocketAddress(PORT));            System.out.println("游戏開始,你想出去吗,来,碰碰运气吧!");        } catch (IOException e) {            e.printStackTrace();            acceptor.dispose();        }    }}

本案例依赖的jar例如以下图:

简述:以上是依赖mina实现的一个可执行的案例,就不多说了,结合总体的结构图和案例实现能够看出mina框架还是非常轻量级的。以下分析一下mina的源代码结构和一些时序流程。

二、mina 核心源代码分析:

1.mina的启动时序(结合上面的案例):

简述:SocketAcceptor作为服务端对外启动接口类,在bind网络地址的时候,会触发服务端一系列服务的启动,从调用链能够清晰找到相应的源代码阅读。

当中AbstractPollingIoAcceptor是一个核心类,它会调用自身的startupAcceptor方法,来启动一个存放Acceptor的线程池用来处理client传输过来的请求。

AbstractPollingIoAcceptor 类的 startupAcceptor 方法例如以下:

/** * This method is called by the doBind() and doUnbind() * methods.  If the acceptor is null, the acceptor object will * be created and kicked off by the executor.  If the acceptor * object is null, probably already created and this class * is now working, then nothing will happen and the method * will just return. */private void startupAcceptor() throws InterruptedException {    // If the acceptor is not ready, clear the queues    // TODO : they should already be clean : do we have to do that ?

if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started Acceptor acceptor = acceptorRef.get(); //这里仅仅会启动一个worker if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }

上面调用到 AbstractIoService 的 executeWorker方法例如以下:

protected final void executeWorker(Runnable worker) {    executeWorker(worker, null);}protected final void executeWorker(Runnable worker, String suffix) {    String actualThreadName = threadName;    if (suffix != null) {        actualThreadName = actualThreadName + '-' + suffix;    }    executor.execute(new NamePreservingRunnable(worker, actualThreadName));}

简述:有类AbstractPollingIoAcceptor 的 startupAcceptor方法(上文)能够看到,一个SocketAcceptor仅仅启动了一个Worker线程(即代码中的Acceptor对象)而且把他加到线程池中。反过来讲,也能够看出AbstractIoService维护了Worker的线程池。(ps:这个Worker就是服务端处理请求的线程)。

2.Mina处理client链接的过程(启动后):

概述:从1中的启动时序能够看到,启动过程通过创建SocketAcceptor将有类AbstractPollingIoAcceptor的内部类Acceptor放到了 AbstractIoService的线程池里面,而这个Acceptor就是处理client网络请求的worker。而以下这个时序就是线程池中每一个worker处理client网络请求的时序流程。

处理请求时序: 

简述:worker线程Acceptor的run方法中会调用NioSocketAcceptor或者AprSocketAccetpor的select方法。

ps:APR(Apache Protable Runtime Library,Apache可移植执行库)是能够提供非常好的可拓展性、性能以及对底层操作系统一致性操作的技术,说白了就是apache实现的一套标准的通讯接口。

AprSocketAcceptor先不做深入了解,主要了解下NioSocketAcceptor,NioSocketAcceptor顾名思义,它调用了java NIO的API实现了NIO的网络连接处理过程。

AbstractPolling$Acceptor 的run方法的核心代码例如以下:

private class Acceptor implements Runnable {    public void run() {        assert (acceptorRef.get() == this);        int nHandles = 0;        // Release the lock        lock.release();        while (selectable) {            try {                // Detect if we have some keys ready to be processed                // The select() will be woke up if some new connection                // have occurred, or if the selector has been explicitly                // woke up                //调用了NioSocketAcceptor的select方法,获取了selectKey                int selected = select();                // this actually sets the selector to OP_ACCEPT,                // and binds to the port on which this class will                // listen on                nHandles += registerHandles();                // Now, if the number of registred handles is 0, we can                // quit the loop: we don't have any socket listening                // for incoming connection.                if (nHandles == 0) {                    acceptorRef.set(null);                    if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {                        assert (acceptorRef.get() != this);                        break;                    }                    if (!acceptorRef.compareAndSet(null, this)) {                        assert (acceptorRef.get() != this);                        break;                    }                    assert (acceptorRef.get() == this);                }                if (selected > 0) {                    // We have some connection request, let's process                    // them here.                    processHandles(selectedHandles());                }                // check to see if any cancellation request has been made.                nHandles -= unregisterHandles();            } catch (ClosedSelectorException cse) {                // If the selector has been closed, we can exit the loop                break;            } catch (Throwable e) {                ExceptionMonitor.getInstance().exceptionCaught(e);                try {                    Thread.sleep(1000);                } catch (InterruptedException e1) {                    ExceptionMonitor.getInstance().exceptionCaught(e1);                }            }        }        // Cleanup all the processors, and shutdown the acceptor.        if (selectable && isDisposing()) {            selectable = false;            try {                if (createdProcessor) {                    processor.dispose();                }            } finally {                try {                    synchronized (disposalLock) {                        if (isDisposing()) {                            destroy();                        }                    }                } catch (Exception e) {                    ExceptionMonitor.getInstance().exceptionCaught(e);                } finally {                    disposalFuture.setDone();                }            }        }    }

简述:从上面的代码中能够看出一个典型的网络请求处理的程序,在循环中拿到处理的请求后就调用AbstractPollingIoAcceptor的processHandles()对网络请求做处理。

代码例如以下:

/**     * This method will process new sessions for the Worker class.  All     * keys that have had their status updates as per the Selector.selectedKeys()     * method will be processed here.  Only keys that are ready to accept     * connections are handled here.     * 

* Session objects are created by making new instances of SocketSessionImpl * and passing the session object to the SocketIoProcessor class. */ @SuppressWarnings("unchecked") private void processHandles(Iterator
handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session //这里调用了NioSocketAcceptor的accept方法 S session = accept(processor, handle); if (session == null) { continue; } initSession(session, null, null); // add the session to the SocketIoProcessor // 这步处理add操作,会触发对client请求的异步处理。

session.getProcessor().add(session); } }

NioSocketAcceptor的accept方法new了一个包装Process处理线程的session实例:而且在调用session.getProcessor().add(session)的操作的时候触发了对client请求的异步处理。

/** * {@inheritDoc} */@Overrideprotected NioSession accept(IoProcessor
processor, ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch);}

再看上面时序图:有一步是AbstractPollingIoProcessor调用了startupProcessor方法。代码例如以下:

/** * Starts the inner Processor, asking the executor to pick a thread in its * pool. The Runnable will be renamed */private void startupProcessor() {    Processor processor = processorRef.get();    if (processor == null) {        processor = new Processor();        if (processorRef.compareAndSet(null, processor)) {            executor.execute(new NamePreservingRunnable(processor, threadName));        }    }    // Just stop the select() and start it again, so that the processor    // can be activated immediately.    wakeup();}

简述:这个startupProcessor方法在调用 session里包装的processor的add方法是,触发了将处理client请求的processor放入异步处理的线程池中。兴许详细Processor怎么处理client请求的流程,涉及到FilterChain的过滤。以及Adapter的调用。用来处理业务逻辑。详细的异步处理时序看以下的时序图:

简述:这个时序就是将待处理的client链接,通过NIO的形式接受请求,并将请求包装成Processor的形式放到处理的线程池中异步的处理。

在异步的处理过程中则调用了Processor的run方法,详细的filterchain的调用和业务Adapter的调用也是在这一步得到处理。

值得注意的是。Handler的调用是封装在DefaultFilterchain的内部类诶TairFilter中触发调用的。Processor的run方法代码例如以下:

private class Processor implements Runnable {    public void run() {        assert (processorRef.get() == this);        int nSessions = 0;        lastIdleCheckTime = System.currentTimeMillis();        for (;;) {            try {                // This select has a timeout so that we can manage                // idle session when we get out of the select every                // second. (note : this is a hack to avoid creating                // a dedicated thread).                long t0 = System.currentTimeMillis();                //调用了NioProcessor                int selected = select(SELECT_TIMEOUT);                long t1 = System.currentTimeMillis();                long delta = (t1 - t0);                if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {                    // Last chance : the select() may have been                    // interrupted because we have had an closed channel.                    if (isBrokenConnection()) {                        LOG.warn("Broken connection");                        // we can reselect immediately                        // set back the flag to false                        wakeupCalled.getAndSet(false);                        continue;                    } else {                        LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));                        // Ok, we are hit by the nasty(讨厌的) epoll                        // spinning.                        // Basically, there is a race condition                        // which causes a closing file descriptor not to be                        // considered as available as a selected channel, but                        // it stopped the select. The next time we will                        // call select(), it will exit immediately for the same                        // reason, and do so forever, consuming 100%                        // CPU.                        // We have to destroy the selector, and                        // register all the socket on a new one.                        registerNewSelector();                    }                    // Set back the flag to false                    wakeupCalled.getAndSet(false);                    // and continue the loop                    continue;                }                // Manage newly created session first                nSessions += handleNewSessions();                updateTrafficMask();                // Now, if we have had some incoming or outgoing events,                // deal with them                if (selected > 0) {                    //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...                    //触发了详细的调用逻辑                    process();                }                // Write the pending requests                long currentTime = System.currentTimeMillis();                flush(currentTime);                // And manage removed sessions                nSessions -= removeSessions();                // Last, not least, send Idle events to the idle sessions                notifyIdleSessions(currentTime);                // Get a chance to exit the infinite loop if there are no                // more sessions on this Processor                if (nSessions == 0) {                    processorRef.set(null);                    if (newSessions.isEmpty() && isSelectorEmpty()) {                        // newSessions.add() precedes startupProcessor                        assert (processorRef.get() != this);                        break;                    }                    assert (processorRef.get() != this);                    if (!processorRef.compareAndSet(null, this)) {                        // startupProcessor won race, so must exit processor                        assert (processorRef.get() != this);                        break;                    }                    assert (processorRef.get() == this);                }                // Disconnect all sessions immediately if disposal has been                // requested so that we exit this loop eventually.                if (isDisposing()) {                    for (Iterator i = allSessions(); i.hasNext();) {                        scheduleRemove(i.next());                    }                    wakeup();                }            } catch (ClosedSelectorException cse) {                // If the selector has been closed, we can exit the loop                break;            } catch (Throwable t) {                ExceptionMonitor.getInstance().exceptionCaught(t);                try {                    Thread.sleep(1000);                } catch (InterruptedException e1) {                    ExceptionMonitor.getInstance().exceptionCaught(e1);                }            }        }        try {            synchronized (disposalLock) {                if (disposing) {                    doDispose();                }            }        } catch (Throwable t) {            ExceptionMonitor.getInstance().exceptionCaught(t);        } finally {            disposalFuture.setValue(true);        }    }}

简述:这么一坨代码能够看出,这个处理器也调用了java的Nio API是一个NIO模型。当中select和process方法各自是从session拿到要处理的请求,并进行处理。而详细的Processor实例是NioProcessor。从加入凝视的代码中有一步调用了自身的process方法,这步调用触发了详细业务逻辑的调用。能够结合代码和时序图看下。在Process方法中会调用reader(session)或wirte(session)方法,然后调用fireMessageReceived方法,这种方法又调用了callNextMessageReceived方法致使触发了整个FilterChain和Adapter的调用。read方法的核心代码例如以下:

private void read(S session) {    IoSessionConfig config = session.getConfig();    int bufferSize = config.getReadBufferSize();    IoBuffer buf = IoBuffer.allocate(bufferSize);    final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();    try {        int readBytes = 0;        int ret;        try {            if (hasFragmentation) {                while ((ret = read(session, buf)) > 0) {                    readBytes += ret;                    if (!buf.hasRemaining()) {                        break;                    }                }            } else {                ret = read(session, buf);                if (ret > 0) {                    readBytes = ret;                }            }        } finally {            buf.flip();        }        if (readBytes > 0) {            IoFilterChain filterChain = session.getFilterChain();            filterChain.fireMessageReceived(buf);            buf = null;            if (hasFragmentation) {                if (readBytes << 1 < config.getReadBufferSize()) {                    session.decreaseReadBufferSize();                } else if (readBytes == config.getReadBufferSize()) {                    session.increaseReadBufferSize();                }            }        }        if (ret < 0) {            scheduleRemove(session);        }    } catch (Throwable e) {        if (e instanceof IOException) {            if (!(e instanceof PortUnreachableException)                    || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())                    || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {                scheduleRemove(session);            }        }        IoFilterChain filterChain = session.getFilterChain();        filterChain.fireExceptionCaught(e);    }}

从这段代码并结合上面的时序图能够看出来触发整个FilterChain的调用以及IoHandler的调用。

三、类结构分析

參考第一部分的总体结构图,画一下每一个部分大致的类结构图:

简述: 从类继承结构图来看,能够看到在IOService体系下,存在IoConnector和IoAcceptor两个大的分支体系。IoConnector是做为client的时候使用,IoAcceptor是作为服务端的时候使用。实际上在Mina中,有三种worker线程各自是:Acceptor、Connector 和 I/O processor。

(1) Acceptor Thread 作为server端的链接线程,实现了IoService接口。线程的数量就是创建SocketAcceptor的数量。

(2) Connector Thread 作为client请求建立的链接线程,实现了IoService接口,维持了一个和服务端Acceptor的一个链接,线程的数量就是创建SocketConnector的数量。

(3) I/O processorThread 作为I/O真正处理的线程,存在于server端和client。线程的数量是能够配置的,默认是CPU个数+1。

上面那个图仅仅是表述了IoService类体系,而I/O Processor的类体系并不在当中,见下图:

简述:IOProcessor主要分为两种。各自是AprIOProcessor和NioProcessor,Apr的解释见上文:ps:APR(Apache Protable Runtime Library,Apache可移植执行库)。

NioProcessor也是Nio的一种实现,用来处理client连接过来的请求。在Processor中会调用到 FilterChain 和 Handler,见上文代码。先看下FilterChain的类结构图例如以下:

Filter 和 Handler的类结构例如以下:

Handler的类结构例如以下:

Mina的session类结构图例如以下:

Mina的Buffer的类结构图例如以下:

版权声明:本文博主原创文章,博客,未经同意不得转载。

你可能感兴趣的文章
关于11月比特币现金将添加CTOR事件
查看>>
SIGIR2018大会最佳短论文:利用对抗学习的跨域正则化
查看>>
美国黄金公司Schiff Gold:BCH避险潜力远大于BCE
查看>>
Tomcat运行web程序过程及server.xml配置
查看>>
可读可写流简明实现指北【多图,附demo源码】
查看>>
翻译连载 | 第 10 章:异步的函数式(上)-《JavaScript轻量级函数式编程》 |《你不知道的JS》姊妹篇...
查看>>
Android webview 与 js(Vue) 交互
查看>>
UML统一建模语言
查看>>
给迷茫的JAVA员一些中肯建议, 你还在虚度光阴吗?
查看>>
计算机程序的思维逻辑 (40) - 剖析HashMap
查看>>
【腾讯 TMQ】从 0 到 1:打造移动端 H5 性能测试平台
查看>>
我是HDRoot!
查看>>
Postgres On Docker-窥探容器服务
查看>>
性能优化工具知识梳理(2) Systrace
查看>>
JS中的洋葱模型
查看>>
js call、apply、bind的实现
查看>>
《程序员的职业素养之代码整洁之道》成为专业人士必读
查看>>
使用IntelliJ Idea新建SpringBoot项目
查看>>
聊聊flink的Table API及SQL Programs
查看>>
Android M 封装过的运行时权限处理
查看>>