Netty入门简介

netty是一个异步、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。

Netty优势

Netty解决了TCP传输问题,如黏包、半包问题,解决了epoll空轮询导致CPU100%的问题。并且Netty对API进行增强,使之更易用。如FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer等NIO API进行增强。

在测试Netty时不能使用单元测试

  1. 线程问题:在单元测试中,当线程启动后,系统会立即执行下一行代码,直到所有代码执行完毕。然后,main函数会调用system.exit(0)来结束整个程序。这可能会导致你的服务器在还没有接收到任何数据的情况下就被关闭了。
  2. 输入/输出流:单元测试通常不会与标准输入/输出流进行交互,这可能会影响到你的服务器的运行

Netty实例

服务器
public static void main(String[] args) {System.out.println("server starter connection......");// 服务器端的启动器 负责组装Netty组件 启动服务器new ServerBootstrap()// 循环处理事件,通过group EventLoopGroup等事件,// 可能会有BossEventLoopGroup WorkerEventLoopGroup(selector, thread).group(new NioEventLoopGroup())// 选择channel的实现方式,可以是OIO BIO等channel实现方式.channel(NioServerSocketChannel.class)// 将事件的处理进行分工,类似Boss负责连接 Worker负责读写事件等操作// ChannelInitializer和客户端连接后,对数据读写的通道.childHandler(new ChannelInitializer<NioSocketChannel>() {// 添加具体的handler@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {// 将接收到的字符串进行解码 将ByteBuf转为字符串nioSocketChannel.pipeline().addLast(new StringDecoder());// 自定义handlernioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}});}}).bind(8080);System.out.println("server connection ok...");}
客户端
public static void main(String[] args) throws InterruptedException {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class)// 添加处理器,会在连接建立后进行调用.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {// 将发送的数据进行编码nioSocketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080)).sync().channel()// 向服务器发送数据.writeAndFlush("hello world");System.out.println("client connection.....");}

客户端中的sync方法会阻塞,等待连接建立,当连接建立好后会获取到客户端和服务端建立连接的SocketChannel对象,通过这个对象向服务器进行读写数据。客户端收发数据都会调用handler中的方法,将channel的数据通过StringEncoder进行编码来发送。然后服务器通过EventLoopGroup来接收读事件。将接收到的读事件调用服务器的childHandler来进行处理,通过StringDecoder来进行解码,然后将解码的数据交给ChannelInboundHandlerAdapter自定义处理来进行处理。

总结

1、channel可以理解为数据通道

2、把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeline的加工,会变成其他类型对象,最后输出又变成ByteBuf

3、将handler理解为数据的加工工序

3.1、工序有多道,合在一起就是pipeline,pipeline负责发布事件(读,读取完成)传播到每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)

3.2、handler分为Inbound(入站)和Outbound(出站)两类

4、eventLoop理解为处理数据的工人

4.1、工人可以管理多个channel的io操作,并且一旦工人负责某个channel,就要负责到底(绑定)

4.2、工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列中可以堆放多个channel的待处理任务,任务分为普通任务、定时任务。

4.3、工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人

Netty组件

EventLoop

EventLoop本质是一个单线程执行器(同时维护一个selector,执行器意味着可以向该对象提交一些任务包括定时任务),里面有run方法处理channel上源源不断的IO事件

EventLoop继承事件如下:

1、继承自j.u.c.ScheduledExecutorService因此包含了线程池的所有方法

2、继承自netty自己的OrderedEventExecutor

2.1、提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop

2.2、提供了parent方法来看看自己属于哪个EventLoopGroup

3、EventLoopGroup是一组EventLoop,channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)

3.1、EventLoopGroup继承自EventExecutorGroup,实现了iterable接口提供遍历EventLoop的能力,同时有next方法获取集合下一个EventLoop

NioEventLoopGroup与DefaultEventLoopGroup

NioEventLoopGroup

处理io事件,既能提交普通任务也能提交定时任务,默认使用电脑的cpu核心数*2

DefaultEventLoopGroup

DefaultEventLoopGroup() 只能处理普通任务和定时任务,不能处理io事件

public static void main(String[] args) {// 处理io事件,既能提交普通任务也能提交定时任务,默认使用电脑的cpu核心数*2// 有多少个线程就有多少个EventLoopEventLoopGroup group = new NioEventLoopGroup(2);// 提交执行普通任务group.next().submit(() -> {log.debug("event loop task submit....");});// 定时任务 参数2:初始延迟事件 参数3:间隔事件,参数4:事件单位group.next().scheduleAtFixedRate(() -> {log.debug("scheduled tasks ....");}, 1L, 1L, TimeUnit.SECONDS);log.debug("main thread exe....");}
分工细化
public static void main(String[] args) {// 只处理普通任务以及定时任务EventLoopGroup group = new DefaultEventLoop();new ServerBootstrap()// 细分:boss 只负责ServerSocketChannel的accept事件, worker只负责SocketChannel上的读写事件.group(new NioEventLoopGroup(), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter() {@Override// Object msg其实就是ByteBufpublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;log.debug(buf.toString(Charset.defaultCharset()));// 将消息传递给下一个handlerctx.fireChannelRead(msg);}// 绑定DefaultEventLoop}).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {@Override// Object msg其实就是ByteBufpublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;log.debug(buf.toString(Charset.defaultCharset()));}});}}).bind(8080);}

当第一个NioEventLoopGroup请求连接建立成功后,会由第二个NioEventLoopGroup去处理(执行childHandler中的逻辑)

Hanlder切换线程

关键代码io.netty.channel.AbstractChannelHandlerContextinvokeChannelRead方法

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);// 获取下一个handler的EventLoop(EventLoop继承EventExecutor)EventExecutor executor = next.executor();// 如果下一个handler和当前的handler属于同一个线程// 就直接调用if (executor.inEventLoop()) {// 使用当前handler的线程来调用invokeChannelRead// 寻找下一个handlernext.invokeChannelRead(m);} else {// 如果不是同一个线程就将执行任务代码交给// 下一个handler线程处理executor.execute(new Runnable() {public void run() {next.invokeChannelRead(m);}});}}

如果一个channel来建立连接成功后,会从head的headler调用invokeChannelRead去寻找下一个handler的read事件

Channel

close可以用来关闭channel(异步操作)

closeFuture()用来处理channel的关闭

sync方法作用是同步等待channel关闭

addListener方法是异步等待channel关闭

pipeline方法添加处理器

write方法将数据写入(将数据不会立马发送到服务器或者客户端,而且缓存起来,当调用flush或者数据填满缓冲区就发送出去)

writeAndFlush方法将数据写入并刷出

new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class)// 添加处理器,会在连接建立后进行调用.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {// 将发送的数据进行编码nioSocketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080)).sync().channel().writeAndFlush("hello world");

connect方法是异步非阻塞的,main线程发起调用,这种执行连接的是NioEventLoopGroup线程。sync阻塞当前线程直到NioEventLoopGroup建立连接完毕才往下执行。

使用addListener异步处理
ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class)// 添加处理器,会在连接建立后进行调用.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {// 将发送的数据进行编码nioSocketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080));// 使用addListener异步处理结果channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Channel channel = future.channel();log.debug("{}", channel);channel.writeAndFlush("hello, world");}});

operationComplete方法在NioEventLoopGroup建立好连接后会调用,并在NioEventLoopGroup线程中执行,不会阻塞主线程。

CloseFuture
Channel channel = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class)// 添加处理器,会在连接建立后进行调用.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {// 将发送的数据进行编码nioSocketChannel.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8080)).sync().channel();String str = null;Scanner input = null;input = new Scanner(System.in);do {str = input.nextLine();System.out.println("input value is:\t" + str);channel.writeAndFlush(str);} while (!input.hasNext("exit"));channel.close();ChannelFuture closeFuture = channel.closeFuture();// 同步关闭结果closeFuture.sync();System.out.println("channel 关闭后操作");

异步处理Close关闭问题

channel.close();ChannelFuture closeFuture = channel.closeFuture();// 异步关闭结果closeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {System.out.println("channel 关闭后操作");// 在channel关闭后关闭NioSocketChannel线程group.shutdownGracefully();}});
Future和Promise

在异步处理时,经常使用到两个接口

首先说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise又对netty Future进行扩展

  • jdk Future只能同步等待任务结束(或成功,或失败)才能得到结果
  • netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称jdk Futurenetty Future
cancel取消任务
isCaneled任务是否取消
isDone任务是否完成,不能区分任务成功失败
get获取任务结果,阻塞等待
getNow获取任务结果,非阻塞,还未产生结果时返回null
await等待任务结束,如果任务失败,不会抛异常,而且通过isSuccess判断
sync等待任务结束,如果任务失败,抛出异常
isSuccess判断任务是否成功
cause获取失败信息,非阻塞,如果没有失败,返回null
addLinstener添加回调,异步接收结果
JDK Future
// 创建线程池ExecutorService threadPool = Executors.newFixedThreadPool(2);Future<Integer> submit = threadPool.submit(() -> {log.debug("执行计算");Thread.sleep(1000);return 50;});log.debug("等待结果:");// 主线程通过Future获取结果Integer integer = submit.get();log.debug("获取结果: {}",integer);
Netty Future
NioEventLoopGroup group = new NioEventLoopGroup();// 获取一个EventGroupEventLoop eventLoop = group.next();Future<Integer> submit = eventLoop.submit(() -> {log.debug("进行计算");Thread.sleep(1000);return 50;});log.debug("等待结果:");Integer result = submit.get();log.debug("获取结果: {}", result);
promise
DefaultPromise<Integer> promise = new DefaultPromise<>(new NioEventLoopGroup().next());new Thread(() -> {try {log.debug("开始计算");Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}promise.setSuccess(50);}).start();log.debug("获取结果: {}", promise.get());
Handler和Pipeline

ChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被在一起就是Pipeline

  • 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工

每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各个工序,而ByteBuf就是原材料。经过很多工序的加工最终成为产品。

EmbeddedChannel

EmbeddedChannel用来模拟测试入站,出站执行顺序

ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h1");super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h2");super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("h3");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("h4");super.write(ctx, msg, promise);}};EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1, h2, h3, h4);// 模拟入站操作embeddedChannel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));// 模拟出站操作embeddedChannel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
ByteBuf调试工具
public static void log(ByteBuf buf) {int length = buf.readableBytes();int rows=length / 16 + (length % 15 == 0 " />0 : 1) + 4;StringBuilder sb = new StringBuilder(rows * 80 * 20).append("read index:").append(buf.readerIndex()).append(" write index:").append(buf.writerIndex()).append(" capacity:").append(buf.capacity()).append(NEWLINE);appendPrettyHexDump(sb, buf);System.out.println(sb.toString());}
ByteBuf
直接内存与堆内存

可以使用如下代码来创建池化基于堆内存的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以通过如下代码来创建池化基于直接内存的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
  • 直接内存创建和销毁代价昂贵,但读写性能比较高(少一次内存复制),适合配合池化功能一起使用
  • 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放内存
池化与非池化

池化的最大意义在于可以重用ByteBuf,优点如下:

  • 没有池化,则每次都创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力
  • 有了池化,则可以重用池中的ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能开启,可以通过如下系统变量设置

-Dio.netty.allocator.type={unpooled|pooled}

Netty 4.1以后,非Android平台默认启用池化实现,Android平台默认使用非池化实现。在4.1之前,池化功能还不成熟,默认使用的是非池化实现

ByteBuf组成

ByteBuf组成由capacity(ByteBuf容量,默认256)、max capacity(最大容量,值为整数的最大值),读写指针组成。

slice

零拷贝体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read、write指针。

public static void main(String[] args) {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j' });log(buf);// 切片ByteBuf buf1 = buf.slice(0, 5);ByteBuf buf2 = buf.slice(5, 5);log(buf1);log(buf2);}public static void log(ByteBuf buf) {int length = buf.readableBytes();int rows=length / 16 + (length % 15 == 0 " />0 : 1) + 4;StringBuilder sb = new StringBuilder(rows * 80 * 20).append("read index:").append(buf.readerIndex()).append(" write index:").append(buf.writerIndex()).append(" capacity:").append(buf.capacity()).append(NEWLINE);appendPrettyHexDump(sb, buf);System.out.println(sb.toString());}

修改切片的数据,原数据也会改变。同时不允许切片后的切片进行添加数据,原有的ByteBuf进行release后其他的切片不能再使用。若其他切片需要使用可以使用retain将引用计数加1,就不会因为release导致切片不可以,但需要自己手动释放切片的内存(调用release)

结合切片CompositeByteBuf
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();buf1.writeBytes(new byte[] {'a', 'b', 'c', 'd'});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();buf2.writeBytes(new byte[] {'e', 'f', 'g', 'h'});CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();// 使用带boolean参数的方法来改变读写指针buf3.addComponents(true, buf1, buf2);TestSlice.log(buf3);

CompositeByteBuf避免了内存的复制

retain与release

由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收

  • UnpooledHeapByteBuf使用JVM内存,只需等待GC回收内存即可
  • UnpooledDirectByteBuf使用就是直接内存,需要通过特殊的方式来回收内存
  • PooledByteBuf和他的子类使用了池化机制,需要更复杂的规则来回收内存

Netty采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

  • 每个ByteBuf对象初始计数为1
  • 调用release方法数减1,如果计数为0,ByteBuf内存被回收
  • 调用retain方法计数加1,表示调用者没用完之前,其他handler即使调用了release也不会造成回收
  • 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用。
工具类Unpooled

Unpooled是一个工具类,提供非池化的ByteBuf的创建、组合、复制等操作。

与零拷贝相关的方法有wrappedBuffer方法可以用来包装ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();buf1.writeBytes(new byte[] {'a', 'b', 'c', 'd'});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();buf2.writeBytes(new byte[] {'e', 'f', 'g', 'h'});ByteBuf buf = Unpooled.wrappedBuffer(buf1, buf2);TestSlice.log(buf);