`
flyPig
  • 浏览: 137005 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Mina框架剖析--动态篇

阅读更多
一切从启动开始,MINA服务端启动代码:
    private void start(int port) throws IOException, InstantiationException,
            IllegalAccessException, ClassNotFoundException {
        //1
        NioSocketAcceptor acceptor = new NioSocketAcceptor(5);//5个NioProcessor
        Executor threadPool = Executors
                .newFixedThreadPool(100);// 固定100个的线程池
        
        //2
        acceptor.getFilterChain().addLast("exector",
                new ExecutorFilter(threadPool));
        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TestEncoder(), new TestDecoder()));
        LoggingFilter filter = new LoggingFilter();
        filter.setExceptionCaughtLogLevel(LogLevel.DEBUG);
        filter.setMessageReceivedLogLevel(LogLevel.DEBUG);
        filter.setMessageSentLogLevel(LogLevel.DEBUG);
        filter.setSessionClosedLogLevel(LogLevel.DEBUG);
        filter.setSessionCreatedLogLevel(LogLevel.DEBUG);
        filter.setSessionIdleLogLevel(LogLevel.DEBUG);
        filter.setSessionOpenedLogLevel(LogLevel.DEBUG);
        acceptor.getFilterChain().addLast("logger", filter);
        
        //3
        acceptor.setReuseAddress(true);//ServerSocket.setReuseAddress
        acceptor.setBacklog(128);
        acceptor.setDefaultLocalAddress(new InetSocketAddress(port));
        // 加入处理器(Handler)到Acceptor
        acceptor.setHandler(new TestHandler());

        //4
        acceptor.getSessionConfig().setReuseAddress(true);// 设置每一个非主监听连接的端口可以重用Socket.setReuseAddress
        acceptor.getSessionConfig().setReceiveBufferSize(1024);// 设置输入缓冲区的大小Socket.setReceiveBufferSize
        acceptor.getSessionConfig().setSendBufferSize(10240);// 设置输出缓冲区的大小
        // 设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出Socket.setSendBufferSize
        acceptor.getSessionConfig().setTcpNoDelay(true);
        // 设置主服务监听端口的监听队列的最大值为128,如果当前已经有128个连接,再新的连接来将被reject
        
        //5
        acceptor.bind();
    }

步骤1:初始化NioSocketAcceptor和Global的ThreadPoolExecutor.
步骤2:初始化3个filter,其中ExecutorFilter用Global的ThreadPool实现。TestDecoder/TestEncoder是实现ProtocolEncoder、ProtocolDecoder的类。
步骤3:设置acceptor的一些参数,其实最终是设置的ServerSocket类的参数。
步骤4:设置SessionConfig的参数,最终是设置到established的Socket类的参数。
步骤5:启动监听开始干活。

在bind里面具体做的事情就是:
bind(SocketAddress)-->bindInternal-->startupAcceptor:启动AbstractPollingIoAcceptor.Acceptor.run使用executor [Executor]的线程,注册OP_ACCEPT,然后wakeup selector。这个里面有一个异步处理就是AbstractPollingIoAcceptor里面有一个registerQueue和cancelQueue。bind就是往registerQueue里面put,unbind就是往cancelQueue里面put.AbstractPollingIoAcceptor.Acceptor每次的循环,会先检查registerQueue,调用
完processHandles(必须有Accept事件发生),然后检查cancelQueue.


一旦有连接进来(OP_Accept)就会
1.构建NioSocketSession,对应SocketChannal,NioSocketSession持有对IoService,Processor池,SocketChannel,SessionConfig,IoHandler的所有引用。
SocketChannel ch = handle.accept();        
        if (ch == null) {
            return null;
        }
        return new NioSocketSession(this, processor, ch);

public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel ch) {
        this.service = service;
        this.processor = processor;
        this.ch = ch;
        this.handler = service.getHandler();
        this.config.setAll(service.getSessionConfig());
    }

2.初始化Session里面的attributeMap,WriteRequestQueue。
((AbstractIoSession) session).setAttributeMap(session.getService()                   .getSessionDataStructureFactory().getAttributeMap(session));
((AbstractIoSession) session).setWriteRequestQueue(session
                    .getService().getSessionDataStructureFactory()
                    .getWriteRequestQueue(session));

3.session.getProcessor().add(session)将Session加入到了newSessions这个queue中,Processor会从里面poll数据执行。
session.getProcessor().add(session);

 public final void add(T session) {
        if (isDisposing()) {
            throw new IllegalStateException("Already disposed.");
        }

        // Adds the session to the newSession queue and starts the worker
        newSessions.add(session);
        startupProcessor();
    }

可以总结一下:
1. NioSocketAcceptor就是可以大致理解为ServerSocketChannel的异步模型包装,专门处理OP_ACCEPT事件。内部有一个AbstractPollingIoAcceptor.Acceptor线程对象,由Executors来执行。
2. 一个NioSocketAcceptor对应了多个NioProcessor。NioProcessor是专门处理OP_READ事件,一个NioProcessor对应一个SocketChannel。内部的AbstractPollingIoProcessor.Processor线程对象也是由Executors执行。

这个时候所有的处理就转入到了AbstractPollingIoProcessor.Processor.run 它的主要流程:
for (;;) {     
       ......
       int selected = selector(final SELECT_TIMEOUT = 1000L);
       nSessions += handleNewSessions();
       updateTrafficMask();
       .......
       if (selected > 0) {
          process();
       }
       ......
}:
可以看到,目前还没看到OP_READ注册,接下来就是做这个事情
   private int handleNewSessions() {
        int addedSessions = 0;
        for (;;) {
            T session = newSessions.poll();
            if (session == null) {
                // All new sessions have been handled
                break;
            }
            if (addNow(session)) {
                // A new session has been created 
                addedSessions++;
            }
        }
        return addedSessions;
    }  
    private boolean addNow(T session) {
        boolean registered = false;
        boolean notified = false;
        
        try {
            init(session);
            registered = true;

            // Build the filter chain of this session.
            session.getService().getFilterChainBuilder().buildFilterChain(
                    session.getFilterChain());
            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
            // in AbstractIoFilterChain.fireSessionOpened().
            ((AbstractIoService) session.getService()).getListeners()
                    .fireSessionCreated(session);
            notified = true;
        } catch (Throwable e) {
           ....
        }
        return registered;
    }
    protected void init(NioSession session) throws Exception {
        SelectableChannel ch = (SelectableChannel) session.getChannel();
        ch.configureBlocking(false);
        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
   public void fireSessionCreated(IoSession session) {
      ....
      IoFilterChain filterChain = session.getFilterChain(); 
      filterChain.fireSessionCreated();
      filterChain.fireSessionOpened();
      ....
    }
    }

在init方法里面,终于看到了期待的OP_READ注册。然后是FilterChain的调用。在addNow方法里面调用了fireSessionCreated方法,这里会按照注册的顺序调用IoFilter.sessionCreated,最终是IoHandler.sessionCreated.
再调用fireSessionOpened方法,同样会按照注册IoFilter的顺序调用IoFilter.sessionOpened,最终是IoHandler.sessionOpened.
再额外说下FilterChain的机制,主要的属性:
private final EntryImpl head;
private final EntryImpl tail;
private final Map<String, Entry> name2entry = new HashMap<String, Entry>();

正向就是head--> all registered filter -->tail  [fireSessionCreated/fireSessionOpened/fireMessageReceived都是这个顺序]
反向就是tail--> all registered filter -->head
[fireFilterWrite/fireFilterClose就是这个顺序]

既然事件注册了,create/open的处理也完了,那下一轮循环的selected > 0就成立了,进入process的处理流程
process()-->Iterate all session-channal -->read(session) 这个read方法是AbstractPollingIoProcessor.private void read(T session)方法。
个人感觉,这里有很明显的性能问题。
首先看具体的process方法:
 private void process() throws Exception {
        for (Iterator<T> i = selectedSessions(); i.hasNext();) {
            T session = i.next();
            process(session);
            i.remove();
        }
    }

在这里,MINA是遍历所有的有OP_READ事件的session,然后read再fireMessageReceived到我们的IoHandler.messageReceived中,即使我们注册了ExecutorFilter,它也只是让fireMessageReceived方法能被极快的处理。真正的read过程并不是并发处理,这样一来很明显后来的请求要等前面的请求被处理完才能得到处理,这样会造成后来请求的延迟.即使把NioProcessor数目变多也于事无补,因为并发数量大于NioProcessor数目后,总有至少一个NioProcessor上的session总会有排队的现象。

然后还有第二个问题,仔细看read方法
   private void read(T session) {
        IoSessionConfig config = session.getConfig();
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
        final boolean hasFragmentation = session.getTransportMetadata()
                .hasFragmentation();
        try {
            int readBytes = 0;
            int ret;
            try {
                if (hasFragmentation) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }
            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }           
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            ....
        }
    }

read(session)的主要执行流程是
read data to buffer --> if readBytes> 0 --> IoFilterChain.fireMessageReceived(buf)  IoHandler.messageReceived将在其中被调用。个人怀疑,如果client发送消息是采用消息分片方式发送,一般在client端资源不够或要报文太大,在这种情况下IoHandler.messageReceived,结果就是消息体被分割了若干份,等于我们在IoHandler.message会被调用多次,每次的message都是一个报文片,这个时候就悲剧了。

转到IoFilterChain.fireMessageReceived(buf),责任链模式无需多说,按照注册的顺序调用所有的IoFilter.messageReceived,最后会调用到IoHandler.messageReceived.需要注意的是,传入的message对象是个Object类型,具体的类型是最后一个Filter处理后的结果。但是一般上说来,只有ProtocolCodecFilter才会去更改message type.

在IoHandler.messageReceived里面就是我们具体的业务逻辑。在这里我们很可能需要做 写操作,一般会IoSession.write(Object message) 它返回的是WriteFuture。简单看看write会做什么事情。
 public WriteFuture write(Object message, SocketAddress remoteAddress) {
        .....
        // Now, we can write the message. First, create a future
        WriteFuture writeFuture = new DefaultWriteFuture(this);
        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
        IoFilterChain filterChain = getFilterChain();
        filterChain.fireFilterWrite(writeRequest);
        .....
        return writeFuture;
    }

在这里面会调用filterChain.fireFilterWrite,它会按照IoFilter注册的反向顺序调用IoFilter.filterWrite,按照IoSession.write-->tail-->registered Filter-->head的顺序,最后会调用filterChain里面的headFilter.它的filterWrite实现大概如下:
.....
s.getWriteRequestQueue().offer(s, writeRequest);
            if (!s.isWriteSuspended()) {
                s.getProcessor().flush(s);
            }
            .....

还记得前面NioSocketAcceptor处理OP_ACCEPT的时候在IoSession里面初始化的那个WriteRequestQueue吗,就在这里offer进去,然后把session add到NioProcessor的flushingSessions里面去,然后wakeup Processor.
 public final void flush(T session) {
        boolean needsWakeup = flushingSessions.isEmpty();
        if (scheduleFlush(session) && needsWakeup) {
            wakeup();
        }
    }

    private boolean scheduleFlush(T session) {
        if (session.setScheduledForFlush(true)) {
            // add the session to the queue
            flushingSessions.add(session);
            return true;
        }
        return false;
    }


如果我们没有用到ExecutorFilter,其实所有从read开始的处理全部在单线程环境下包括IoSession.write,所以以Processor的线程环境看,就是process代码处理完。
如果用到了ExecutorFilter,也就是process内部从fireMessageReceived开始是异步处理的。
但是这都不影响整个流程。总之,我们的焦点又切回到了AbstractPollingIoProcessor.Processor.run 看看它在process后要做的事情。
long currentTime = System.currentTimeMillis();
                    flush(currentTime);
                    nSessions -= removeSessions();
                    notifyIdleSessions(currentTime);

private void flush(long currentTime) {
             .....
             for (;;) {
             T session = flushingSessions.poll();
             boolean flushedAll = flushNow(session, currentTime);
                        if (flushedAll
                                && !session.getWriteRequestQueue().isEmpty(session)
                                && !session.isScheduledForFlush()) {
                            scheduleFlush(session);
                        }
             }

 private boolean flushNow(T session, long currentTime) {
final WriteRequestQueue writeRequestQueue = session
                .getWriteRequestQueue();
.....
// Clear OP_WRITE
setInterestedInWrite(session, false);
req = writeRequestQueue.poll(session);
Object message = req.getMessage();
if (message instanceof IoBuffer) {
                    localWrittenBytes = writeBuffer(session, req,
                            hasFragmentation, maxWrittenBytes - writtenBytes,
                            currentTime);
                    if (localWrittenBytes > 0
                            && ((IoBuffer) message).hasRemaining()) {
                        // the buffer isn't empty, we re-interest it in writing 
                        writtenBytes += localWrittenBytes;
                        setInterestedInWrite(session, true);
                        return false;
                    }
                }
.....
}

获取到IoSession.writeRequestQueue --> poll--> write buffer to channel
在细看下writeBuffer这个方法。
private int writeBuffer(T session, WriteRequest req,
            boolean hasFragmentation, int maxLength, long currentTime)
            throws Exception {
        IoBuffer buf = (IoBuffer) req.getMessage();
        int localWrittenBytes = 0;
        if (buf.hasRemaining()) {
            int length;
            if (hasFragmentation) {
                length = Math.min(buf.remaining(), maxLength);
            } else {
                length = buf.remaining();
            }
            for (int i = WRITE_SPIN_COUNT; i > 0; i--) {
                localWrittenBytes = write(session, buf, length);
                if (localWrittenBytes != 0) {
                    break;
                }
            }
        }
        session.increaseWrittenBytes(localWrittenBytes, currentTime);
        if (!buf.hasRemaining() || !hasFragmentation && localWrittenBytes != 0) {
            // Buffer has been sent, clear the current request.
            buf.reset();
            fireMessageSent(session, req);
        }
        return localWrittenBytes;
    }

如果顺利写入所有字节到channel,则reset buffer -->fireMessageSent-->head(Null messageSent) -->registered Filter-->tail-->IoHandler.messageSent
如果还有写入不完整则setInterestedInWrite --> register OP_WRITE-->process OP_WRITE-->add to flushingSessions

关闭连接的处理就简单的描述下
如果关闭连接,IoSession.close-->tail.filterClose-->registered Filter-->head.filterClose-->AbstractPollingIoProcessor.remove(Session) -->add to removingSessions -->wake up
然后NioProcessor在process处理完后,会调用
nSessions -= removeSessions();

AbstractPollingIoProcessor.Processor.run.removeSessions -->AbstractPollingIoProcessor.removeNow -->clearWriteRequestQueue-->destroy(Session) -->fireSessionDestroyed-->head(Null sessionClosed)-->registered Filter-->tail.sessionClosed-->IoHandler.sessionClosed

Note:如果用MINAC客户端Connector,关闭是一定要调用
connector.dispose(); 

该方法通过调用ExecutorService的shutdown()方法停止业务处理线程,并设置内部disposed标志位标识需要停止连接管理器。
分享到:
评论
1 楼 yinlei126 2013-09-03  
这么好的文章没有评论,谢谢

相关推荐

    Mina2.0框架源码剖析

    Mina2.0框架源码剖析

    基于Java的mina框架

    基于Java的米娜框架,报告对使用基于Java、websocket协议的网页聊天室的过程和技术做了详细的叙述首先,对现有网页进行了分析与评价。首先, 启动后台服务器,然后连接站点,客户端在pc端输入网站或者在手机端扫...

    Apache MINA框架相关资料

    包括中文参考手册,mina2的源码分析,api文档,MINA和spring结合等相关资料

    Mina2.0框架源码剖析.pdf

    Mina2.0框架源码剖析.pdf

    mina框架详解

    mina框架详解 想学的看看

    Mina2.0框架源码剖析.doc

    Mina2.0框架源码剖析.docMina2.0框架源码剖析.docMina2.0框架源码剖析.docMina2.0框架源码剖析.docMina2.0框架源码剖析.docMina2.0框架源码剖析.doc

    apache下的mina框架的源码

    可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式,是一个开发高性能和高可伸缩性网络应用程序的网络应用框架的源码.

    基于高性能NIO的MINA框架的应用

    基于高性能NIO的MINA框架的应用,剖析MINA框架,提出MINA的简单应用

    Mina2.0框架源码剖析(六).pdf

    Mina2.0框架源码剖析(六).pdf

    Apache mina源代码框架解析

    写这篇文档主要是想对刚接触Mina的人讲解一些Mina的基本知识,由浅入深,一步一步的学习Mina思想的精髓。这里只是简单的涉及一点,不会对其做很深入的探讨。但是Mina的服务器和客户端在很大的程度上都是一样,所以...

    Mina2源码分析.doc

    Mina2源码分析,mina2框架。mina nio框架。mina用于webgame游戏开发很多。

    微信小程序开发框架MINA分析

    MINA框架中有四种类型的文件: .js文件 基于JavaScript的逻辑层框架 .wxml 视图层文件,是MINA设计的一套标签语言 .wxss 样式文件,用于描述WXML的组件样式 .json 文件,配置文件,用于单个页面的配置和整个项目的...

    mina2源码分析

    mina框架 运用封装socket 行一系列的

    Mina2.0完全剖析,完全自学手册

    Apache的Mina(Multipurpose Infrastructure Networked Applications)是一个网络应用框架,可以帮助用户开发高性能和高扩展性的网络应用程序;它提供了一个抽象的、事件驱动的异步API,使Java NIO在各种传输协议...

    资料_MINA(2、3、4).rar

    资源包括: MINA笔记.docx ...Mina2.0快速入门与源码剖析.pdf MINA网络框架和RMI的对比研究.pdf 基于3G网络的移动流媒体服务器的设计与实现.pdf 高性能通信框架及智能主站技术研究.nh MINA类图.doc 等

    轻奢品港代小程序设计与实现+ssm框架后台.zip

    本小程序采用微信小程序(MINA框架)、SSM框架(Spring+SpringMVC+Mybatis)、Maven工具、Shiro框架、Bootstrap前端框架、MySQL等技术。根据系统的功能需求分析与设计、系统测试等步骤进行设计与开发。本小程序系统...

Global site tag (gtag.js) - Google Analytics