一、核心编排组件:ChannelPipeLine

ChannelPipeLine是Netty的核心编排组件,负责调度各类ChannelHandler,实际的加工处理由ChannelHandler完成。

ChannelPipeLine可以看做是ChannelHandler的容器,包含一组ChannelHandler实例,内部通过双向链表将ChannelHandler链接在一起。当有I/O读写事件时,依次调用ChannelHandler列表对Channel的数据进行拦截和处理。

每个Channel绑定一个ChannelPipeLine,每个ChannelPipeLine包含多个ChannelHandlerContext,所有ChannelHandlerCoontext组成双向链表。每个ChannelHandler对应一个ChannelHandlerContext,ChannelHandlerContext可以保存ChannelHandler上下文,同时包含ChannelHandler生命周期的所有事件(如 connect、bind、read、flush、write、close 等)。基于ChannelHandlerContext的封装,可以提取事件传递的前置和后置通用逻辑,降低耦合性。

ChannelPipeLine中包含两大类处理器:InboudHandler入站处理器和OutboundHandler出站处理器,内部的双向链表维护了HeadContext和TailContext的头尾节点,自定义的ChannelHandler会插入两个节点之间。

HeadContext既是Inbound处理器,又是Outbound处理器,分别实现了ChannelInboudHandler和ChannelOutboudHandler。网络数据的写入操作入口就是由HeadContext完成。HeadContext作为头结点负责读取数据并传递InBound事件,当数据处理完后,数据反方向经过Outbound处理器,最终又传到HeadContext,所有HeadContext又是处理OutBound事件的最后一站。此外,HeadContext在传递时间之前还会执行一些前置操作。

TailContext只实现了ChannelOutboudHandler,在ChannelInboundHandler调用链路的最后一步执行,用于终止InBound事件的传播。作为OutBound事件传播的第一站,仅仅是将OutBound事件传递给下一个节点。

Netty支持由Channel直接触发事件,这样调用链路将会贯穿整个ChannelPipeLine。同时,也可以在某一个ChannelHandlerContext触发事件传播,这样只会从当前ChannelHandler开始事件传播,不会从头贯穿到尾。

二、事件处理器:ChannelHandler

ChannelHandler是围绕I/O事件的生命周期(建立连接、读数据、写数据、连接销毁)设计的,包含两个重要子接口:ChannelIInboudHandler和ChannelOutboundHandler,分别拦截入站和出站的各种I/O事件。

ChannelInboudHandler的事件回调方法

事件方法
新的客户端连接事件handlerAdded
通道注册事件channelRegistered
通道处于活动状态事件channelActive
通道数据可读取事件channelRead0
通道数据读取完毕事件channelReadComplete
通道进入非活动状态事件channelInactive
通道移除事件channelUnregistered
处理器移除事件(断开连接)handlerRemoved
异常发生事件exceptionCaught

ChannelOutboundHandler的事件回调方法

三、事件传播机制

ChannelPipeLine中的处理器分为InboundHandler和OutboundHandler两种处理器。InBound事件的传播方向为:Head –> Tail,而OutBound事件的传播方向为:Tail –> Head。
通过以下的代码演示ChannelPipeLine的时间传播机制:

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new SampleInBoundHandler("SampleInBoundHandlerA", false)).addLast(new SampleInBoundHandler("SampleInBoundHandlerB", false)).addLast(new SampleInBoundHandler("SampleInBoundHandlerC", true));ch.pipeline().addLast(new SampleOutBoundHandler("SampleOutBoundHandlerA")).addLast(new SampleOutBoundHandler("SampleOutBoundHandlerB")).addLast(new SampleOutBoundHandler("SampleOutBoundHandlerC"));}}public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public SampleInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InBoundHandler: " + name);if (flush) {ctx.channel().writeAndFlush(msg);} else {super.channelRead(ctx, msg);}}}public class SampleOutBoundHandler extends ChannelOutboundHandlerAdapter {private final String name;public SampleOutBoundHandler(String name) {this.name = name;}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutBoundHandler: " + name);super.write(ctx, msg, promise);}}

最终的控制台输出为:

四、异常传播机制

ChannelPipeLine中的事件传播采用了经典的责任链模式,调用链路环环相扣。但是,如果有一个节点处理逻辑出现异常会怎么样?当用户在自定义的ChannelHandler中对异常没有进行拦截,最终会由TailContext进行拦截。

通过以下代码演示,第一个A节点会抛出RunTimeException。同时重写ChannelInboundHandlerAdapter的exceptionCaught方法,直在开头加上控制台输出

public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public SampleInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("InBoundHandler: " + name);if (flush) {ctx.channel().writeAndFlush(msg);} else {throw new RuntimeException("InBoundHandler: " + name);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.out.println("InBoundHandlerException: " + name);ctx.fireExceptionCaught(cause);}}

最终的控制台输出结果为:

实际在用Netty进行开发时,推荐对异常进行同一拦截,然后根据实际业务场景进行更加完善的异常处理机制,参考如下方式:

具体的代码如下:

public class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof RuntimeException) {System.out.println("Handle Business Exception Success.");}}}

最终的控制台输出结果为: