案例说明:一个简单的群聊实现,支持重复上下线。

NIO

服务端

public class NIOServer {public static void main(String[] args) throws IOException {ServerSocketChannel serverChannel = ServerSocketChannel.open();// 初始化服务器serverChannel.bind(new InetSocketAddress(9999));Selector selector = Selector.open();serverChannel.configureBlocking(false);serverChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {// 每过两秒中来看是否有请求过来if (selector.select(2000) != 0) {System.out.println("===================");Iterator<SelectionKey> it = selector.selectedKeys().iterator();try {String ipStr = "";while (it.hasNext()) {SelectionKey next = it.next();// 建立连接if (next.isAcceptable()) {ByteBuffer bu = ByteBuffer.allocate(1024);SocketChannel channel = serverChannel.accept();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_READ, bu);ipStr = channel.getRemoteAddress().toString().substring(1);System.out.println(ipStr + "上线 ...");}// 读取数据if (next.isReadable()) {SocketChannel channel = (SocketChannel) next.channel();// 如果这个时候通道已经关闭了if (!channel.isOpen()) {next.cancel();return;}try {channel.configureBlocking(false);ByteBuffer buffer = (ByteBuffer) next.attachment();channel.read(buffer);String msg = new String(buffer.array(), 0, buffer.position());System.out.println("receive : " + msg);// 广播消息broadCast(selector, channel, msg);buffer.clear();} catch (Exception e) {System.out.println("======================发生异常进行下线操作=========");next.cancel();it.remove(); continue;}}it.remove();}} catch (Exception e) {e.printStackTrace();}}}}public static void broadCast(Selector selector, SocketChannel channel, String msg) throws IOException {Set<SelectionKey> keys = selector.keys();Iterator<SelectionKey> iterator = keys.iterator();while (iterator.hasNext()) {SelectionKey next = iterator.next();SelectableChannel targetChannel = next.channel();// 如果被广播的对象连接还在if (targetChannel.isOpen()) {if (targetChannel instanceof SocketChannel && channel != targetChannel) {((SocketChannel) targetChannel).write(ByteBuffer.wrap(msg.getBytes()));}} else {// 表示通道不存在了 进行下线操作next.cancel();}}}}

客户端

public class NIOClient {private SocketChannel channel;private String userName;private String bindIP;private int bindPort;public NIOClient(String userName, String bindIP, int bindPort) throws IOException {channel = SocketChannel.open();channel.configureBlocking(false);this.bindIP = bindIP;this.bindPort = bindPort;channel.connect(new InetSocketAddress(bindIP, bindPort));this.userName = userName;while (!channel.finishConnect()) {// 等待连接成功}}public void sendMsg(String msg) throws IOException {if (msg == "end") {channel.close();return;}msg = "from " + this.userName + " : " + msg;channel.write(ByteBuffer.wrap(msg.getBytes()));}public void receive() throws IOException {ByteBuffer buffer = ByteBuffer.allocate(1024);int size = channel.read(buffer);if(size>0){String msg=new String(buffer.array());System.out.println(msg.trim());}}}// Main 函数public static void main(String[] args) throws IOException {new Thread(() -> {final NIOClient nioClient;try {nioClient = new NIOClient("one", "127.0.0.1", 9999);} catch (IOException e) {throw new RuntimeException(e);}Thread thread = new Thread(() -> {try {while (true) {nioClient.receive();Thread.sleep(3000);}} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {System.out.println("=============== 离线 ===================");}});thread.start();;System.out.println( "one pleas input : ");Scanner scanner = new Scanner(System.in);String msg = "";while (!(msg = scanner.nextLine()).equals("end")) {try {nioClient.sendMsg(msg);} catch (IOException e) {e.printStackTrace();throw new RuntimeException(e);}}thread.interrupt();}).start();};

AIO

public class NettyServer {public static void main(String[] args) {NioEventLoopGroup boosGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boosGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new NettyChatServerHandler());}});ChannelFuture f = bootstrap.bind(9999).sync();f.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {boosGroup.shutdownGracefully();workGroup.shutdownGracefully();System.out.println("关闭");}}}
public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {public static List<Channel> channels = new ArrayList<>();@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channels.add(channel);System.out.println(channel.remoteAddress().toString().substring(1) + " online");}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {Channel channel = ctx.channel();for (Channel ch : channels) {if (ch != channel) {ch.writeAndFlush("["+ch.remoteAddress().toString().substring(1)+"]"+"said:"+s+"\n");}}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channels.remove(channel);System.out.println(channel.remoteAddress().toString().substring(1) + " off line");}}
public class ChatClient {private String host;private int port;public ChatClient(String host, int port) {this.host = host;this.port = port;}public void run() {NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new NettyClientHandler());}});ChannelFuture sync = bootstrap.connect(new InetSocketAddress("127.0.0.1",9999)).sync();Channel channel = sync.channel();Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String msg = scanner.nextLine();channel.writeAndFlush(msg + "\\r\\n").sync();}sync.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}}}
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println(s.trim());}}