创建阻塞的服务器

ServerSocketChannelSockelChannel 采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程

public class EchoServer {    private int port = 8000;    private ServerSocketChannel serverSocketChannel = null;    private ExecutorService executorService; //线程池    private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目        public EchoServer() throws IOException {        //创建一个线程池        executorService = Executors.newFixedThreadPool(            Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);        //创建一个ServerSocketChannel对象        serverSocketChannel = ServerSocketChannel.open();        //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口        serverSocketChannel.socket().setReuseAddress(true);        //把服务器进程与一个本地端口绑定        serverSocketChannel.socket().bind(new InetSocketAddress(port));        System.out.println("服务器启动");    }        public void service() {        while (true) {            SocketChannel socketChannel = null;            try {                socketChannel = serverSocketChannel.accept();                //处理客户连接                executorService.execute(new Handler(socketChannel));            } catch(IOException e) {                e.printStackTrace();            }        }    }        public static void main(String args[])throws IOException {        new EchoServer().service();    }        //处理客户连按    class Handler implements Runnable {        private SocketChannel socketChannel;        public Handler(SocketChannel socketChannel) {            this.socketChannel = socketChannel;        }                public void run() {            handle(socketChannel);        }                public void handle(SocketChannel socketChannel) {            try {                //获得与socketChannel关联的Socket对象                Socket socket = socketChannel.socket();                System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort());                                BufferedReader br = getReader(socket);                PrintWriter pw = getWriter(socket);                                String msg = null;                while ((msg = br.readLine()) != null) {                    System.out.println(msg);                    pw.println(echo(msg));                    if (msg.equals("bye")) {                        break;                    }                }            } catch (IOException e) {                e.printStackTrace();            } finally {                try {                    if(socketChannel != null) {                        socketChannel.close();                    } catch (IOException e) {                        e.printStackTrace();                    }                }            }        }    }         private PrintWriter getWriter(Socket socket) throws IOException {        OutputStream socketOut = socket.getOutputStream();        return new PrintWriter(socketOut,true);    }        private BufferedReader getReader(Socket socket) throws IOException {        InputStream socketIn = socket.getInputStream();        return new BufferedReader(new InputStreamReader(socketIn));    }        public String echo(String msg) {        return "echo:" + msg;    }}

创建非阻塞的服务器

在非阻塞模式下,EchoServer 只需要启动一个主线程,就能同时处理三件事:

  • 接收客户的连接
  • 接收客户发送的数据
  • 向客户发回响应数据

EchoServer 委托 Selector 来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件

// 创建一个Selector对象selector = Selector.open();//创建一个ServerSocketChannel对象serverSocketChannel = ServerSocketChannel.open();//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时//可以顺利绑定到相同的端口serverSocketChannel.socket().setReuseAddress(true);//使ServerSocketChannel工作于非阻塞模式serverSocketChannel.configureBlocking(false)://把服务器进程与一个本地端口绑定serverSocketChannelsocket().bind(new InetSocketAddress(port));

EchoServer 类的 service() 方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:

public void service() throws IOException {    serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);    //第1层while循环    while(selector.select() > 0) {        //获得Selector的selected-keys集合        Set readyKeys = selector.selectedKeys();        Iterator it = readyKeys.iterator();        //第2层while循环        while (it.hasNext()) {            SelectionKey key = null;            //处理SelectionKey            try {                //取出一个SelectionKey                key = (SelectionKey) it.next();                //把 SelectionKey从Selector 的selected-key 集合中删除                it.remove();                1f (key.isAcceptable()) { 处理接收连接就绪事件; }                if (key.isReadable()) { 处理读就绪水件; }                if (key.isWritable()) { 处理写就绪事件; }            } catch(IOException e) {                e.printStackTrace();                try {                    if(key != null) {                        //使这个SelectionKey失效                        key.cancel();                        //关闭与这个SelectionKey关联的SocketChannel                        key.channel().close();                    }                } catch(Exception ex) {                     e.printStackTrace();                }            }        }    }}
  • 首先由 ServerSocketChannelSelector 注册接收连接就绪事件,如果 Selector 监控到该事件发生,就会把相应的 SelectionKey 对象加入 selected-keys 集合
  • 第一层 while 循环,不断询问 Selector 已经发生的事件,select() 方法返回当前相关事件已经发生的 SelectionKey 的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。SelectorselectedKeys() 方法返回 selected-keys 集合,它存放了相关事件已经发生的 SelectionKey 对象
  • 第二层 while 循环,从 selected-keys 集合中依次取出每个 SelectionKey 对象并从集合中删除,,然后调用 isAcceptable()isReadable()isWritable() 方法判断到底是哪种事件发生了,从而做出相应的处理

1. 处理接收连接就绪事件

if (key.isAcceptable()) {    //获得与SelectionKey关联的ServerSocketChannel    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();    //获得与客户连接的SocketChannel    SocketChannel socketChannel = (SocketChannel) ssc.accept();    //把Socketchannel设置为非阻塞模式    socketChannel.configureBlocking(false);    //创建一个用于存放用户发送来的数据的级冲区    ByteBuffer buffer = ByteBuffer.allocate(1024);    //Socketchannel向Selector注册读就绪事件和写就绪事件    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);}

2. 处理读就绪事件

public void receive(SelectionKey key) throws IOException {    //获得与SelectionKey关联的附件    ByteBuffer buffer = (ByteBuffer) key.attachment();    //获得与SelectionKey关联的Socketchannel    SocketChannel socketChannel = (SocketChannel)key.channel();    //创建一个ByteBuffer用于存放读到的数据    ByteBuffer readBuff = ByteBuffer.allocate(32);    socketChannel.read(readBuff);    readBuff.flip();    //把buffer的极限设为容量    buffer.limit(buffer.capacity());    //把readBuff中的内容拷贝到buffer    buffer.put(readBuff);}

3. 处理写就绪事件

public void send(SelectionKey key) throws IOException {    //获得与SelectionKey关联的ByteBuffer    ByteBuffer buffer = (ByteBuffer) key.attachment();    //获得与SelectionKey关联的SocketChannel    SocketChannel socketChannel = (SocketChannel) key.channel();    buffer.flip();    //按照GBK编码把buffer中的字节转换为字符串    String data = decode(buffer);    //如果还没有读到一行数据就返回    if(data.indexOf("\r\n") == -1)        return;    //截取一行数据    String outputData = data.substring(0, data.indexOf("\n") + 1);    //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中    ByteBuffer outputBuffer = encode("echo:" + outputData);    //输出outputBuffer的所有字节    while(outputBuffer,hasRemaining())        socketChannel.write(outputBuffer);    //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer    ByteBuffer temp = encode(outputData);    //把buffer的位置设为temp的极限    buffer.position(temp.limit()):    //删除buffer已经处理的数据    buffer.compact();    //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel    if(outputData.equals("bye\r\n")) {        key.cancel();        socketChannel.close();    }}

完整代码如下:

public class EchoServer {    private int port = 8000;    private ServerSocketChannel serverSocketChannel = null;    private Selector selector;    private Charset charset = Charset.forName("GBK");public EchoServer() throws IOException {        // 创建一个Selector对象        selector = Selector.open();        //创建一个ServerSocketChannel对象        serverSocketChannel = ServerSocketChannel.open();        //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时        //可以顺利绑定到相同的端口        serverSocketChannel.socket().setReuseAddress(true);        //使ServerSocketChannel工作于非阻塞模式        serverSocketChannel.configureBlocking(false):        //把服务器进程与一个本地端口绑定        serverSocketChannelsocket().bind(new InetSocketAddress(port));    }        public void service() throws IOException {        serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);        //第1层while循环        while(selector.select() > 0) {            //获得Selector的selected-keys集合            Set readyKeys = selector.selectedKeys();            Iterator it = readyKeys.iterator();            //第2层while循环            while (it.hasNext()) {                SelectionKey key = null;                //处理SelectionKey                try {                    //取出一个SelectionKey                    key = (SelectionKey) it.next();                    //把 SelectionKey从Selector 的selected-key 集合中删除                    it.remove();                    1f (key.isAcceptable()) {                         //获得与SelectionKey关联的ServerSocketChannel                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();                        //获得与客户连接的SocketChannel                        SocketChannel socketChannel = (SocketChannel) ssc.accept();                        //把Socketchannel设置为非阻塞模式                        socketChannel.configureBlocking(false);                        //创建一个用于存放用户发送来的数据的级冲区                        ByteBuffer buffer = ByteBuffer.allocate(1024);                        //Socketchannel向Selector注册读就绪事件和写就绪事件                        socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);                    }                    if (key.isReadable()) { receive(key); }                    if (key.isWritable()) { send(key); }                } catch(IOException e) {                    e.printStackTrace();                    try {                        if(key != null) {                            //使这个SelectionKey失效                            key.cancel();                            //关闭与这个SelectionKey关联的SocketChannel                            key.channel().close();                        }                    } catch(Exception ex) {                         e.printStackTrace();                    }                }            }        }    }        public void receive(SelectionKey key) throws IOException {        //获得与SelectionKey关联的附件        ByteBuffer buffer = (ByteBuffer) key.attachment();        //获得与SelectionKey关联的Socketchannel        SocketChannel socketChannel = (SocketChannel)key.channel();        //创建一个ByteBuffer用于存放读到的数据        ByteBuffer readBuff = ByteBuffer.allocate(32);        socketChannel.read(readBuff);        readBuff.flip();        //把buffer的极限设为容量        buffer.limit(buffer.capacity());        //把readBuff中的内容拷贝到buffer        buffer.put(readBuff);    }        public void send(SelectionKey key) throws IOException {        //获得与SelectionKey关联的ByteBuffer        ByteBuffer buffer = (ByteBuffer) key.attachment();        //获得与SelectionKey关联的SocketChannel        SocketChannel socketChannel = (SocketChannel) key.channel();        buffer.flip();        //按照GBK编码把buffer中的字节转换为字符串        String data = decode(buffer);        //如果还没有读到一行数据就返回        if(data.indexOf("\r\n") == -1)            return;        //截取一行数据        String outputData = data.substring(0, data.indexOf("\n") + 1);        //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中        ByteBuffer outputBuffer = encode("echo:" + outputData);        //输出outputBuffer的所有字节        while(outputBuffer,hasRemaining())            socketChannel.write(outputBuffer);        //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer        ByteBuffer temp = encode(outputData);        //把buffer的位置设为temp的极限        buffer.position(temp.limit()):        //删除buffer已经处理的数据        buffer.compact();        //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel        if(outputData.equals("bye\r\n")) {            key.cancel();            socketChannel.close();        }    }        //解码    public String decode(ByteBuffer buffer) {        CharBuffer charBuffer = charset.decode(buffer);        return charBuffer.toStrinq();    }        //编码    public ByteBuffer encode(String str) {        return charset.encode(str);    }        public static void main(String args[])throws Exception {        EchoServer server = new EchoServer();        server.service();    }}

阻塞模式与非阻塞模式混合使用

使用非阻塞模式时,ServerSocketChannel 以及 SocketChannel 都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer 采用一个线程同时完成这些操作

假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能

负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector 注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作

public class EchoServer {    private int port = 8000;    private ServerSocketChannel serverSocketChannel = null;    private Selector selector = null;    private Charset charset = Charset.forName("GBK");public EchoServer() throws IOException {        selector = Selector.open();        serverSocketChannel = ServerSocketChannel.open();        serverSocketChannel.socket().setReuseAddress(true);        serverSocketChannelsocket().bind(new InetSocketAddress(port));    }        public void accept() {        while(true) {            try {                SocketChannel socketChannel = serverSocketChannel.accept();                socketChannel.configureBlocking(false);                                ByteBuffer buffer = ByteBuffer.allocate(1024);                synchronized(gate) {                    selector.wakeup();                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);                }            } catch(IOException e) {                e.printStackTrace();            }        }    }        private Object gate=new Object();        public void service() throws IOException {        while(true) {            synchronized(gate){}            int n = selector.select();            if(n == 0) continue;            Set readyKeys = selector.selectedKeys();            Iterator it = readyKeys.iterator();            while (it.hasNext()) {                SelectionKey key = null;                try {    it.remove();                    if (key.isReadable()) {                        receive(key);                    }                    if (key.isWritable()) {                        send(key);                    }                } catch(IOException e) {                    e.printStackTrace();                    try {                        if(key != null) {                            key.cancel();                            key.channel().close();                        }                    } catch(Exception ex) { e.printStackTrace(); }                }            }        }    }        public void receive(SelectionKey key) throws IOException {        ...    }        public void send(SelectionKey key) throws IOException {        ...    }        public String decode(ByteBuffer buffer) {        ...    }        public ByteBuffer encode(String str) {        ...    }        public static void main(String args[])throws Exception {        final EchoServer server = new EchoServer();        Thread accept = new Thread() {            public void run() {                server.accept();            }        };        accept.start();server.service();    }}

注意一点:主线程的 selector select() 方法和 Accept 线程的 register(...) 方法都会造成阻塞,因为他们都会操作 Selector 对象的共享资源 all-keys 集合,这有可能会导致死锁

导致死锁的具体情形是:Selector 中尚没有任何注册的事件,即 all-keys 集合为空,主线程执行 selector.select() 方法时将进入阻塞状态,只有当 Accept 线程向 Selector 注册了事件,并且该事件发生后,主线程才会从 selector.select() 方法返回。然而,由于主线程正在 selector.select() 方法中阻塞,这使得 Acccept 线程也在 register() 方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去

为了避免对共享资源的竞争,同步机制使得一个线程执行 register() 时,不允许另一个线程同时执行 select() 方法,反之亦然