实在想不出来名字叫啥,就写Netty吧,反正我也不准备分篇,就全写在这篇里好了。

1. 介绍

Netty的官网:netty.io

Netty的github页:netty/netty · GitHub

文档:http://netty.io/wiki/

官方的3.xTutorial:http://netty.io/3.6/guide/,这篇文档里零零碎碎讲了很多,不过没啥系统,能看就看

官方的4.0.6.Final版本上个礼拜刚出来,4.x版本也慢慢开始有稳定版本了。3.x的版本和4.x的版本互相不兼容,我暂时没涉足4.x的版本。我玩的版本是3.6.5.Final,3.x的最后一个版本是3.6.6.Final。

官方介绍:

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.

关于同作者参与制作的两款NIO框架,netty和mina之间的性能测试:NIO系列6:流行 NIO Framework netty 和 mina 性能测评与分析

不得不吐槽,netty的文档实在是,太少了,而且还很渣,受不了。。。

很棒的学习netty的资料:

关于如何在netty中使用protobuf来进行消息通讯:在Netty中使用Protobuf

我的开源项目,我做了一个基于Java和Netty的服务端框架,并同步做了一个Sample项目,两个都开源在github上:

2. 架构

接下来我们简单过下netty的设计架构。

2.1 核心思想

核心思想就是围绕着两个概念,channel和nio。

  • channel就是netty中抽象的连接的概念,netty中你不需要关心channel是什么连接(TCP?UDP?),channel就是建立的连接,能从里面读数据,也能向里面写数据(ServerBootstrap这个Helper类用来处理TCP连接,而ConnectionlessBoostrap则是用来处理UDP连接的)
  • 当你有了channel(获得到连接,可以开始读取、写入数据)之后,你就能使用异步事件机制来处理后续的IO(NIO),这就是netty最重要的设计思想,尽量使用异步处理来提高吞吐效率

2.2 消息流转

当然,channel还仅仅只是一个连接而已,整个消息的流转过程还是有点小复杂的,我们来看下:

  • 服务器启动,生成Boss线程(一个服务器端口对应一个)
    • --- Boss线程接收到客户端连接 --- 生成Channel --- 交给Worker线程池(多个Worker线程)来分配处理
      • Worker线程 --- 读完已接收的消息数据到ChannelBuffer --- 触发ChannelPipeline中的ChannelHandler链来处理消息数据
        • 假设你的应用有heavy business,响应速度比较慢的话,你还需要在业务handler上添加一层业务逻辑的worker线程池,使用MemoryAwareThreadPoolExecutor

换个程序点的流程:

  • Bootstrap启动服务器,与ChannelFactory一并初始化,ChannelFactory里带有两个线程池
    • boss线程池中的boss线程得到请求(一个端口一个线程),创建Channel对象(连接),并交予worker线程池的线程(随机)进行处理
      • worker线程将Channel内的消息读出装填到ChannelBuffer中,然后交予ChannelPiplineFactory创建出来的ChannelPipline处理
        • ChannelPipline按UpStream和DownStream的方向,将Channel按ChannelPipline里注册的ChannelHandler顺序,一个个过
          • 各个ChannelHandler按内部实现的ChannelEvent处理器,进行各个事件的处理
            • 如果ChannelPipline内还有heavy business的话,使用MemoryAwareThreadPoolExecutor线程池来队列响应缓慢的ChannelHandler

2.3 核心组件

下面列一下Netty里的核心组件类,请结合上面的流转过程,将它们一个个放进对应的步骤中:

  • ServerBootstrap、ClientBootstrap:服务器、客户端的启动Helper类
  • ChannelFactory:创建Channel实例的工厂类
  • Executors:创建线程池的Helper类
  • Channel:你可以理解为netty里的连接
  • ChannelGroup:Channel的组,可以用来进行连接管理和批量操作
  • ChannelPipelineFactory:创建ChannelPipline的工厂类
  • ChannelEvent:ChannelPipline中的事件类
  • ChannelHandler:注册在ChannelPipline中,用来处理ChannelEvent的处理器类
  • ChannelPipeline:处理Channel中的消息的事件管道类
  • ChannelBuffer:Channel中消息读取和写入的缓冲类
  • Codec Framework:消息的序列化和反序列化类系列
  • MemoryAwareThreadPoolExecutor:消息处理的heavy logic线程池类
  • OrderedMemoryAwareThreadPoolExecutor:同上,执行顺序严格限定的线程池

3. 核心组件

3.1 Bootstrap

Bootstrap类是服务器、客户端的启动Helper类。查看源代码你会发现,netty的Bootstrap类一共只有4个,在org.jboss.netty.bootstrap包下。分别是:

  • Bootstrap:基类
  • ClientBootstrap:客户端Helper,结合NioClientSocketChannelFactory使用
  • ConnectionlessBootstrap:UDP的服务端Helper,结合NioDatagramChannelFactory使用
  • ServerBootstrap:服务端Helper,结合NioServerSocketChannelFactory使用

参考NioServerSocketChannelFactory,正确的关闭一个服务器的方法是:

To shut down a service gracefully, you should do the following:

  1. unbind all channels created by the factory,
  2. close all child channels accepted by the unbound channels, and (these two steps so far is usually done using ChannelGroup.close())
  3. call releaseExternalResources().

Please make sure not to shut down the executor until all channels are closed. Otherwise, you will end up with a RejectedExecutionException and the related resources might not be released properly.

3.1.1 Options

Bootstrap在实例化之后,可以使用setOption来调整其参数,这里简单收集下参数列表。因为官方文档上很零碎,都需要自己整理。

3.1.2 ChannelConfig
Name Associated setter method
"bufferFactory" setBufferFactory(ChannelBufferFactory)
"connectTimeoutMillis" setConnectTimeoutMillis(int)
"pipelineFactory" setPipelineFactory(ChannelPipelineFactory)
3.1.3 SocketChannelConfig
Name Associated setter method
"keepAlive" setKeepAlive(boolean)
"reuseAddress" setReuseAddress(boolean)
"soLinger" setSoLinger(int)
"tcpNoDelay" setTcpNoDelay(boolean)
"receiveBufferSize" setReceiveBufferSize(int)
"sendBufferSize" setSendBufferSize(int)
"trafficClass" setTrafficClass(int)
3.1.4 NioSocketChannelConfig
Name Associated setter method
"writeBufferHighWaterMark" NioChannelConfig.setWriteBufferHighWaterMark(int)
"writeBufferLowWaterMark" NioChannelConfig.setWriteBufferLowWaterMark(int)
"writeSpinCount" NioChannelConfig.setWriteSpinCount(int)
"receiveBufferSizePredictor" setReceiveBufferSizePredictor(ReceiveBufferSizePredictor)
"receiveBufferSizePredictorFactory" setReceiveBufferSizePredictorFactory(ReceiveBufferSizePredictorFactory)

3.2. ChannelFactory

ChannelFactory是创建Channel实例的工厂类。根据服务器和客户端类型的不同,使用的工厂类也各不相同。

ChannelFactory分为NIO和OIO两种,一般来说,都使用NIO。

经常使用的一般是下面三种:

  • NioServerSocketChannelFactory:初始化需要两个线程池,一个boss(一个线程绑定一个端口,根据服务器绑定的端口,可能需要多个boss线程),一个worker
  • NioClientSocketChannelFactory:基本上和服务端的工厂类一致,也需要两个线程池
  • NioDatagramChannelFactory:初始化只需要一个线程池就够了,给worker使用,因为不需要boss来分配连接

3.3. Executors

这东西就是 java.util.concurrent.Executors ,用来辅助创建线程池。

3.4. Channel

Channel是一个非常重要的对象,就是netty里的连接。

A channel provides a user:

  • the current state of the channel (e.g. is it open? is it connected?),
  • the configuration parameters of the channel (e.g. receive buffer size),
  • the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
  • the ChannelPipeline which handles all I/O events and requests associated with the channel.

3.5. ChannelGroup

如其名,主要用来维护Channel组,并可以执行批量操作。

A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don't need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.

3.6. ChannelPipelineFactory

为新创建的Channel创建对应的ChannelPipline的工厂类,在Bootstrap实例化的时候就被分配给Bootstrap,负责为服务器或者客户端提供Pipline。

When a server-side channel accepts a new incoming connection, a new child channel is created for each newly accepted connection. A new child channel uses a new ChannelPipeline, which is created by the ChannelPipelineFactory specified in the server-side channel's "pipelineFactory" option.

Also, when a ClientBootstrap or ConnectionlessBootstrap creates a new channel, it uses the "pipelineFactory" property to create a new ChannelPipeline for each new channel.

3.7. ChannelEvent

流转在ChannelPipline中的事件,由注册在ChannelPipline中的ChannelHandler来负责处理。事件也分为UpStream和DownStream两种,我们会在讲Pipline的时候进行详细分解。

详细的事件分类和事件类型,请参考官方文档。

3.8. ChannelHandler

注册在ChannelPipline中的处理器,用来处理或者截取ChannelEvent进行处理,返回值永远是一个ChannelEvent,交由Pipline中注册的下一个处理器进行处理。

Handles or intercepts a ChannelEvent, and sends a ChannelEvent to the next handler in a ChannelPipeline.

这里的Handler可以做的事情应该说是五花八门,什么都有。包含了解决粘包,解码,获取消息,进行处理,加码,进行发送等等。凡是收到消息进行响应的过程(UpStream)和发送消息进行通知的过程(DownStream)都由Handler处理。

几个附属于ChannelHandler的概念,需要了解的,请阅读官方文档:

  • Sub-types
  • Context
  • State management
  • Using an attachment
  • Using a ChannelLocal
  • The @Sharable annotation

3.9. ChannelPipeline

ChannelPipline是注册了一系列用来处理Channel内消息事件的管线。这根管线内根据方向,分为UpStream和DownStream两种。

  • UpStream:向上的数据流,也就是客户端向服务器发送的数据流,会由pipline中注册的handler的第一个执行到最后一个,当然会被执行的都是上行处理handler或者双向handler
  • DownStream:向下的数据流,也就是服务器向客户端发送的数据流,会由pipline中注册的handler的最后一个执行到第一个,会被执行的都是下行处理handler或者双向handler

我们来看一张Pipline的flow图:

PIPLINE_FLOW

 

[codesyntax lang="java5"]

return new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();

        pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
        pipeline.addLast("protobufDecoder", new ProtobufDecoder(Communication.XjfMessage.getDefaultInstance()));

        pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast("protobufEncoder", new ProtobufEncoder());

        pipeline.addLast("handler", new XjfNettyServerHandler());

        return pipeline;
    }
};

[/codesyntax]

上面这段代码中,UpStream会经过"frameDecoder"然后是"protobufDecoder"然后到"handler",结束。而DownStream则会经过"handler"然后是"protobufEncoder"然后是"frameEncoder",结束。

3.10. ChannelBuffer

ChannelBuffer用来存储并操作读写的网络数据。

  • HeapChannelBuffer:是Netty读网络数据时默认使用的ChannelBuffer,这里的Heap就是Java堆的意思,因为 读SocketChannel的数据是要经过ByteBuffer的,而ByteBuffer实际操作的就是个byte数组,所以 ChannelBuffer的内部就包含了一个byte数组,使得ByteBuffer和ChannelBuffer之间的转换是零拷贝方式。HeapChannelBuffer是个大小固定的buffer,为了不至于分配的Buffer的 大小不太合适,Netty在分配Buffer时会参考上次请求需要的大小。
  • DynamicChannelBuffer:相比于HeapChannelBuffer,DynamicChannelBuffer可动态自适 应大 小。对于在DecodeHandler中的写数据操作,在数据大小未知的情况下,通常使用DynamicChannelBuffer。

3.11. Codec Framework

从业务逻辑代码中分离协议处理部分是一个很不错的想法。然而如果一切从零开始便会遭遇到实现上的复杂性。你不得不处理分段的消息。一些协议是多层的(例如构建在其他低层协议之上的协议)。一些协议过于复杂以致难以在一台主机(single state machine)上实现。

因此,一个好的网络应用框架应该提供一种可扩展,可重用,可单元测试并且是多层的codec框架,为用户提供易维护的codec代码。Netty提供了一组构建在其核心模块之上的codec实现,这些简单的或者高级的codec实现帮你解决了大部分在你进行协议处理开发过程会遇到的问题,无论这些协议是简单的还是复杂的,二进制的或是简单文本的。

参考:FrameDecoderFrameEncoder

3.12. MemoryAwareThreadPoolExecutor

这个线程池在使用的时候是放在ChannelPipline中,用来队列heavy business logic的ChannelHandler的,因为netty内部的异步事件都基于高响应的基础来进行设计的,如果某一个ChannelHandler在响应上花了很长时间的话,某一个线程就会被卡死在那个点上,无法进行后续请求的响应,那么系统的吞吐量就会明显下降。这个时候,我们就需要使用线程池来队列执行响应缓慢的heavy logic ChannelHandler。

这个线程池是基于内存消耗来判断是否队列需要阻塞。可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻塞),并可控制单个Channel待处理任务的上限。

官方文档:

ThreadPoolExecutor which blocks the task submission when there's too many tasks in the queue. Both per-Channel and per-Executor limitation can be applied.

When a task (i.e. Runnable) is submitted, MemoryAwareThreadPoolExecutor calls ObjectSizeEstimator.estimateSize(Object) to get the estimated size of the task in bytes to calculate the amount of memory occupied by the unprocessed tasks.

If the total size of the unprocessed tasks exceeds either per-Channel or per-Executor threshold, any further execute(Runnable) call will block until the tasks in the queue are processed so that the total size goes under the threshold.

这个线程池的执行顺序是没有保证的:

Please note that this executor does not maintain the order of the ChannelEvents for the same Channel. For example, you can even receive a "channelClosed" event before a "messageReceived" event, as depicted by the following diagram. For example, the events can be processed as depicted below:

NOT_ORDERED

3.13.  OrderedMemoryAwareThreadPoolExecutor

基本同上,可以保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处理模式下可能出现的错误的事件顺序,但它并不保证同一Channel中的事件都在一个线程中执行(通常也没必要)。

ORDERED

问题

  • NIO和OIO在netty中究竟差别体现在什么地方?
  • 在使用Netty制作一个保持大量TCP连接的服务器的时候,到底应该怎么管理这么多的连接,并保持其live?

资料