# 网络编程

多台计算机连接到一起组成网络,网络形成后各计算机之间相互连接进行通信,需要一系列的通信协议,在通信过程中按模式基本分为两种模式:

  • C/S(Client/Server),需要开发服务器端和客户端,服务器端等待客户端连接,连接完成后进行通信。常见的协议有TCP和UDP。
  • B/S(Browser/Server),服务器端开发一套程序,客户端使用浏览器访问,可看成C/S的延伸,一般都是很多浏览器来访问一个服务器。此类程序使用公共端口,公共协议,安全性比较差。常见的协议有HTTP,HTTPS,它们都是基于TCP协议上做出来的。

Java只提供核心网络支持(Socket层),但是有第三方框架Netty,可轻松支持TCP,UDP,HTTP,WebSocket协议。

# TCP实例

下面用TCP来实现ECHO模型,server端用了BIO模型,当有一个客户端连接时就新建一个线程来运行,没有客户端连接就阻塞等待的模式就是BIO模型。注意该程序中,并没有设置并发线程上线,可能会导致服务器性能问题。

正常的TCP模型如下:

tcpModel

BIO模型如下:

BIOModel

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;

class ServerTcpModel {
    private int port = 996;
    ServerSocket tcpServer;
    Socket clientSocket;
    Thread mainThread;

    public class ClientSocket implements Runnable {
        Socket clientSocket;
        Scanner scan;
        PrintStream outPrintStream;

        public ClientSocket(Socket clientSocket) {
            // TODO Auto-generated constructor stub
            try {
                scan = new Scanner(clientSocket.getInputStream());
                scan.useDelimiter("\n");
                outPrintStream = new PrintStream(clientSocket.getOutputStream());
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            boolean continueOper = true;
            System.out.println(Thread.currentThread().getName()+" 启动");
            while (continueOper) {
                if (scan.hasNext()) {
                    String readStr = scan.next().trim();
                    if ("exit".equalsIgnoreCase(readStr)) {
                        continueOper = false;
                        outPrintStream.println("server is exiting!");
                    } else {
                        outPrintStream.println("server response:" + readStr);
                        System.out.println("server receive:" + readStr);
                    }
                }
            }
            System.out.println(Thread.currentThread().getName()+" 结束");
        }

        @Override
        protected void finalize() throws Throwable {
            // TODO Auto-generated method stub
            super.finalize();
            scan.close();
            outPrintStream.close();
        }

    }

    public ServerTcpModel(int port) {
        this.port = port;
        try {
            this.tcpServer = new ServerSocket(this.port);
            System.out.println("等待客户端连接中。。。");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void MultiThreadServerStart() {
        mainThread = new Thread(() -> {
            while (true) {
                try {
                    this.clientSocket = tcpServer.accept();
                    System.out.println("客户端连接进来了。。。");
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                new Thread(new ClientSocket(this.clientSocket)).start();
            }
        });
        //mainThread.setDaemon(true);
        mainThread.start();
    }

    public void SingleServerStart() {
        mainThread = new Thread(() -> {
            try {
                this.clientSocket = tcpServer.accept();
                System.out.println("客户端连接进来了。。。");
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            ClientSocket clientSocket = new ClientSocket(this.clientSocket);
            clientSocket.run();
        });
        //mainThread.setDaemon(true);
        mainThread.start();
    }

    @Override
    protected void finalize() throws Throwable {
        // TODO Auto-generated method stub
        super.finalize();
        clientSocket.close();
        tcpServer.close();

    }

}

class ClientTcpModel {
    private int port = 996;
    Socket clientSocket;
    Scanner scan;
    PrintStream outPrintStream;

    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));

    public ClientTcpModel(String targetIp, int port) {
        this.port = port;
        try {
            this.clientSocket = new Socket(targetIp, port);
            this.scan = new Scanner(clientSocket.getInputStream());
            scan.useDelimiter("\n");
            outPrintStream = new PrintStream(clientSocket.getOutputStream());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public String getKeyboardInput(String prompt) {
        System.out.println(prompt);
        String str = null;
        try {
            str = bufferedReader.readLine();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return str;
    }

    public void ConnectToServerOper() {
        boolean continueOper = true;
        while (continueOper) {
            String inputString = getKeyboardInput("请输入:").trim();
            outPrintStream.println(inputString);
            if ("exit".equalsIgnoreCase(inputString)) {
                continueOper = false;
                outPrintStream.println("client is exiting!");
            }
            if (scan.hasNext()) {
                String readStr = scan.next().trim();
                System.out.println("client receive:" + readStr);
            }
        }
    }

    @Override
    protected void finalize() throws Throwable {
        // TODO Auto-generated method stub
        super.finalize();
        scan.close();
        outPrintStream.close();
        clientSocket.close();
    }

}

public class Main {
    public static void main(String[] args) throws IOException {
        ServerTcpModel sTcpModel = new ServerTcpModel(9999);
        // sTcpModel.SingleServerStart();
        sTcpModel.MultiThreadServerStart();
        ClientTcpModel clientTcpModel = new ClientTcpModel("localhost", 9999);
        clientTcpModel.ConnectToServerOper();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185

# UDP实例

TCP的所有操作必须建立可靠的链接才能通信,这在低频率通信的模型中会浪费资源,为了适应低频率通信,网络中又提到了一种传输协议:UDP,它是利用数据报的形式进行发送,它更像是邮局邮寄信件,信从一个邮局到另一个邮局,最后到收件人手里。由于不像TCP在通信之前新建了一个可靠通道,它的接收端可能处在关闭状态。

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;

public class Main {
    public static void main(String[] args) throws IOException {
        // 服务器端接收消息
        new Thread(() -> {
            DatagramSocket serverUDP = null;
            try {
                serverUDP = new DatagramSocket(9999);
                byte data[] = new byte[1024];
                DatagramPacket packet = new DatagramPacket(data, data.length);
                System.out.println("等待接收消息");
                serverUDP.receive(packet);
                System.out.println("接收消息的内容为:" + new String(data, 0, packet.getLength()));

                serverUDP.close();
            } catch (SocketException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }).start();

        // 客户端发送消息
        DatagramSocket clientDatagramSocket = null;
        clientDatagramSocket = new DatagramSocket(10000);

        String sendInfo = "hello world";
        DatagramPacket packet = new DatagramPacket(sendInfo.getBytes(), 0, sendInfo.length(),
                InetAddress.getByName("localhost"), 9999);
        clientDatagramSocket.send(packet);
        clientDatagramSocket.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# NIO编程

在前面的TCP使用的是BIO模型,NIO(Non-Blocking I/O,非阻塞IO,被称为NewIO)是在JDK1.4之后提供的特性,它使得Java底层通信的性能大幅度提升,NIO提供了一个全新的底层IO模型,它采用非阻塞模式结合通道(Channel)与缓冲区(Buffer)实现IO通信操作。与之前的面向流(Stream Oriented)概念不同,NIO是面向缓冲区的(BUffer Oriented),在所有数据被读取到缓冲区后,可使用指针对缓冲区进行读取控制,可保证不覆盖原始数据的前提下进行。

传统的JavaIO采用阻塞模式,这在读写时会一直处于阻塞状态,在读写完成前不能进行其他操作,这在网络通信过程中会因为线程阻塞而影响程序性能,Java发展初期,JVM优化性能不高,Java程序运行缓慢,对IO没有过多要求,随着JVM不断优化,JVM中字节码执行接近本地,此时IO问题就显得严重,这个背景下推出了java.nio包,采用非阻塞设计模型,在有数据读写的情况下也不会产生阻塞,同时可以完成其他操作,一个线程可通过选择器管理多个输入输出通道。

# BIO,NIO和AIO区别

  • BIO:同步阻塞IO,服务器为每一个连接的客户端分配一个线程,可通过线程池提高线程的管理机制。
  • NIO:同步非阻塞IO,服务器为每一次请求分配一个线程,所有的请求都会注册到多路复用器中,多路复用器通过轮询的形式针对每次请求创建处理线程。
  • AIO:异步非阻塞IO,客户端的IO请求都是由系统先处理,当处理完成后再通知服务器启动线程进行处理。

# Buffer

Buffer(缓冲区)是一个线性的,有序的数据集,一个缓冲区只能容纳一种数据类型,它是一个抽象类,提供了一个缓存操作的标准,要保存不同的数据类型应该使用Buffer的不同子类,它的子类有:ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer。它们的继承关系如下:

缓冲区有3个状态变量:

  • position:表示缓冲区下一次读取或写入时的指针位置。
  • limit:还有多少数据需要存储或读取。
  • capacity:表示缓冲区的最大容量,此值在分配缓冲区时被设置,一般不会更改。
import java.nio.CharBuffer;
public class Main {
    public static void main(String[] args)  {
        CharBuffer buffer = CharBuffer.allocate(20);
        String str = "hello world";
        System.out.println("capacity = "+buffer.capacity()+",limit = "+buffer.limit()+",position = "+buffer.position());
        //向缓存区放入数据
        buffer.put(str);
        System.out.println("capacity = "+buffer.capacity()+",limit = "+buffer.limit()+",position = "+buffer.position());
        //从缓存区中读出数据
        buffer.flip();
        System.out.println("capacity = "+buffer.capacity()+",limit = "+buffer.limit()+",position = "+buffer.position());
        //一个一个读出缓存区的数据
        while(buffer.hasRemaining()) {
            System.out.println(buffer.get()+",");
            System.out.println("capacity = "+buffer.capacity()+",limit = "+buffer.limit()+",position = "+buffer.position());
        }
        //清空缓存区
        buffer.clear();
        System.out.println("capacity = "+buffer.capacity()+",limit = "+buffer.limit()+",position = "+buffer.position());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# Channel

Channel(通道)可用来读取和写入数据,通道类似于之前的输入和输出流,但是程序不会直接操作通道,所有内容都是先读取或写入缓冲区中,再通过缓冲区取得或写入。

Channel本身是个接口,它与传统的流不同,传统的流分为输入流和输出流,通道本身是双向操作的,既可以完成输入也可以完成输出。

它和Buffer的关系如下图:

ChannelBuffer

# FileChannel

它可进行文件通道的读写。

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;

public class Main {
    public static void main(String[] args) throws IOException  {
        File file = new File("test.txt");
        try {
            //文件输出流
            FileInputStream fileInputStream = new FileInputStream(file);
            //从文件输出流获取通道
            FileChannel fileChannel = fileInputStream.getChannel();
            //开辟缓存区
            ByteBuffer buffer = ByteBuffer.allocate(20);
            //开辟字节输出流
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            int count=0;
            while((count=fileChannel.read(buffer))!=-1) {
                buffer.flip();
                while(buffer.hasRemaining()) {
                    byteArrayOutputStream.write(buffer.get());
                }
                buffer.clear();
            }
            System.out.println(new String(byteArrayOutputStream.toByteArray()));
            fileChannel.close();
            fileInputStream.close();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# Pipe

Channel针对线程管道的IO也有专门的通道:Pipe.SinkChannel(管道数据输出),Pipe.SourceChannel(管道数据输入)。

import java.io.IOException;
import java.nio.channels.Pipe;
import java.nio.ByteBuffer;

public class Main {
    public static void main(String[] args) throws IOException {
        Pipe pipe = Pipe.open();
        new Thread(() -> {
            Pipe.SourceChannel sourceChannel = pipe.source();
            ByteBuffer readBuffer = ByteBuffer.allocate(50);
            try {
                int count = sourceChannel.read(readBuffer);
                readBuffer.flip();
                System.out.println("Receive Thread:" + new String(readBuffer.array(), 0, count));
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }, "Receive Thread :").start();
        new Thread(() -> {
            Pipe.SinkChannel sinkChannel = pipe.sink();
            ByteBuffer sendBuffer = ByteBuffer.allocate(50);
            sendBuffer.put("hello world!".getBytes());
            sendBuffer.flip();

            while (sendBuffer.hasRemaining()) {
                try {
                    sinkChannel.write(sendBuffer);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }, "Send Thread :").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 文件锁

下面中测试lock和tryLock,文档中说lock会阻塞,实际上也没阻塞,后面需要研究一下。

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.ByteBuffer;

class writeFileWithChannel {
    File file;
    FileOutputStream fileOutputStream;
    FileChannel fileChannel;
    ByteBuffer buffer = ByteBuffer.allocate(20);

    public writeFileWithChannel() {
        try {
            file = new File("test.txt");
            fileOutputStream = new FileOutputStream(file);
            fileChannel = fileOutputStream.getChannel();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
    
    //测试其是否阻塞
    public void TestLock() {
        System.out.println("blocking channel。。。");
        try {
            FileLock fileLock;
            //fileLock = fileChannel.lock();
            //System.out.println("passed lock!");
            fileLock=new RandomAccessFile(file, "rw").getChannel().lock();
            System.out.println("passed RandomAccessFile channel lock!");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }


    }

    public void writeFile(String content) {
        FileLock fileLock = null;
        boolean tryToGetLock = true;
        while (tryToGetLock) {
            try {
                fileLock = fileChannel.lock();
            } catch (OverlappingFileLockException e) {
                System.out.println(Thread.currentThread().getName()+"---------处理异常中-------------");
                // TODO Auto-generated catch block
                //e.printStackTrace();
            } catch (IOException e) {
                // TODO: handle exception
                System.out.println("exception 异常");
            }

            if (fileLock != null) {
                System.out.println(Thread.currentThread().getName() + "----------执行任务开始--------!");
                try {
                    String msg = Thread.currentThread().getName() + ":" + content;
                    buffer.put(msg.getBytes());
                    buffer.flip();
                    fileChannel.write(buffer);
                    buffer.clear();
                    Thread.sleep(300);
                    fileLock.release();
                } catch (Exception e) {
                    // TODO: handle exception
                    System.out.println("---------1");
                }
                tryToGetLock = false;
                System.out.println(Thread.currentThread().getName() + "----------执行任务结束--------!");
            }else {
                System.out.println(Thread.currentThread().getName()+":file lock is null,not get lock");
            }
        }

    }

    @Override
    protected void finalize() throws Throwable {
        // TODO Auto-generated method stub
        super.finalize();
        fileChannel.close();
        fileOutputStream.close();
    }
}

public class Main {
    public static void main(String[] args) {
        writeFileWithChannel wf = new writeFileWithChannel();

        for (int i = 0; i < 10; i++) {
            final int tempI = i;
            new Thread(() -> {
                wf.writeFile("hehe:" + tempI + "\n");
            }, "线程" + i).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

# 字符集

Java语言中所有的信息都是以Unicode进行编码的,但是计算机中并不只有这一种编码,在IO通信中如果编码处理不恰当,可能产生乱码,Java提供了Charset类来负责编码问题,该类创建了CharsetEncoder和CharsetDecoder。

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

public class Main {
    public static void main(String[] args) throws CharacterCodingException {
        Charset charset = Charset.forName("UTF-8");
        CharsetEncoder encoder = charset.newEncoder();
        CharsetDecoder decoder = charset.newDecoder();
        String sourceString="测试结果,测试这句话!";
        CharBuffer charBuffer = CharBuffer.allocate(20);
        charBuffer.put(sourceString);
        charBuffer.flip();
        ByteBuffer buffer = encoder.encode(charBuffer);
        System.out.println(decoder.decode(buffer));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# NIO通信模型

同步非阻塞IO通信模型主要目的是解决IO的性能问题,传统IO最大的问题是属于同步阻塞IO模型,即在一个线程进行操作的时候,其他线程都无法处理,这在网络应用中会难以接受。NIO一般应用在高效的的网络传输处理程序中,网络通信是一个最基本的通道连接,NIO中新提供了两个类,ServerSocketChannel和SocketChannel,为了方便对所有通道进行管理,NIO提供了一个Selector通道管理类,所有的通道向Selector进行注册,采用统一的模式进行读写操作,这样的设计模式被称为Reactor模式。它的结构如下:

NIOClass

具体实现是使用SelectableChannel向Select类注册,它提供了注册Selector的方法和阻塞模式,ServerSocketChannel描述服务器通道。使用register方法时需要指定一个Selector对象以及Select域,Selector对象可通过Selector的open方法获得,Selector域在SelectionKey中定义。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class NIOServerDealClientThread implements Runnable {

    private SocketChannel clientSocketChannel;
    private boolean flag = true;

    public NIOServerDealClientThread(SocketChannel channel) {
        try {
            clientSocketChannel = channel;
            System.out.println("客户端连接成功,其地址为:" + clientSocketChannel.getRemoteAddress());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        ByteBuffer buffer = ByteBuffer.allocate(50);
        while (this.flag) {
            buffer.clear();
            try {
                int readCount = this.clientSocketChannel.read(buffer);
                String readMessage = new String(buffer.array(), 0, readCount).trim();
                System.out.println("服务器接收消息:" + readMessage);
                String writeMessageString = "server send:" + readMessage + "\n";
                if ("exit".equals(readMessage)) {
                    writeMessageString = "服务器端退出!";
                    this.flag = false;
                }
                buffer.clear();
                buffer.put(writeMessageString.getBytes());
                buffer.flip();
                this.clientSocketChannel.write(buffer);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        try {
            this.clientSocketChannel.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

class NIOEchoServer {
    public static final int PORT = 9999;

    public void StartServer() throws IOException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
        // ExecutorService executorService = new ThreadPoolExecutor(1, 2, 6,TimeUnit.SECONDS, queue,Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(PORT));

        // 打开注册器
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务器端启动,在监听端口:" + PORT);
        int keySelect = 0;
        while ((keySelect = selector.select()) > 0) {
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> selectIterator = selectionKeys.iterator();
            while (selectIterator.hasNext()) {
                SelectionKey selectionKey = selectIterator.next();
                if (selectionKey.isAcceptable()) {
                    System.out.println("等待客户端连接!");
                    SocketChannel clientChannel = serverSocketChannel.accept();
                    if (clientChannel != null) {
                        executorService.submit(new NIOServerDealClientThread(clientChannel));
                    }
                }
                selectIterator.remove();
            }
        }
        executorService.shutdown();
        serverSocketChannel.close();
        System.out.println("------服务器端退出!------");
    }
}

class NIOEchoClient {
    public static final String HOST_IP = "localhost";
    public static final int HOST_PORT = 9999;

    public String GetInputStr() {
        BufferedReader keyboardInput = new BufferedReader(new InputStreamReader(System.in));
        boolean flag = true;
        String result = null;
        while (flag) {
            System.out.println("请输入信息!");
            try {
                result = keyboardInput.readLine();
                if ("exit".equals(result)) {
                    flag = false;
                }
                return result;
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return result;
    }

    public void StartClient() {
        try {
            SocketChannel clientChannel = SocketChannel.open();
            clientChannel.connect(new InetSocketAddress(HOST_IP, HOST_PORT));
            ByteBuffer buffer = ByteBuffer.allocate(30);
            boolean flag = true;
            while (flag) {
                buffer.clear();
                String msgString = GetInputStr();
                buffer.put(msgString.getBytes());
                buffer.flip();
                clientChannel.write(buffer);
                buffer.clear();
                int readCount = clientChannel.read(buffer);
                buffer.flip();
                System.out.println("client receive:" + new String(buffer.array(), 0, readCount));
                if ("exit".equals(msgString)) {
                    flag = false;
                    System.out.println("客户端退出!");
                }
            }
            clientChannel.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

public class Main {
    public static void main(String[] args) {
        new Thread(() -> {
            NIOEchoServer server = new NIOEchoServer();
            try {
                server.StartServer();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }).start();
        System.out.println("启动客户端!");
        NIOEchoClient client = new NIOEchoClient();
        client.StartClient();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

# AIO通信模型

NIO是基于事件驱动模式的通信操作,主要解决BIO并发问题,但是如果IO执行性能较差也会影响到性能,JDK 1.7之后开始提供AIO,由操作系统来实现IO操作,应用只是使用回调函数进行操作。这个过程是异步的,通过CompletionHandler获取异步执行结果。它的类结构如下图:

AIOClass

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

class AIOServerThread implements Runnable {
    private static final int PORT = 9999;
    private static final int bufferSize = 50;
    private CountDownLatch latch = null;
    private AsynchronousServerSocketChannel serverChannel = null;

    class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServerThread> {

        class ServerLogicHandler implements CompletionHandler<Integer, ByteBuffer> {
            private AsynchronousSocketChannel clientChannel;
            private boolean exitFlag = false;

            public ServerLogicHandler(AsynchronousSocketChannel clientChannel) {
                this.clientChannel = clientChannel;
            }

            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                // TODO Auto-generated method stub
                buffer.flip();
                String readMessageString = new String(buffer.array(), 0, buffer.remaining()).trim();
                System.out.println("server received:" + readMessageString);
                if ("exit".equalsIgnoreCase(readMessageString)) {
                    readMessageString = "服务器退出!";
                    this.exitFlag = true;
                }
                ServerResponse("server response:" + readMessageString + "\n");

            }

            @Override
            public void failed(Throwable exc, ByteBuffer buffer) {
                // TODO Auto-generated method stub
                closeClient();
            }

            private void closeClient() {
                System.out.println("中断此客户端!");
                try {
                    this.clientChannel.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            public void ServerResponse(String result) {
                ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
                buffer.put(result.getBytes());
                buffer.flip();
                this.clientChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {

                    @Override
                    public void completed(Integer result, ByteBuffer buffer) {
                        // TODO Auto-generated method stub
                        if (buffer.hasRemaining()) {
                            clientChannel.write(buffer, buffer, this);
                        } else {
                            if (exitFlag == false) {
                                ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
                                clientChannel.read(readBuffer, readBuffer, new ServerLogicHandler(clientChannel));
                            }
                        }
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer buffer) {
                        // TODO Auto-generated method stub
                        closeClient();
                    }
                });
            }
        }

        @Override
        public void completed(AsynchronousSocketChannel channel, AIOServerThread aioServerThread) {
            // TODO Auto-generated method stub
            aioServerThread.serverChannel.accept(aioServerThread, this);
            ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
            channel.read(buffer, buffer, new ServerLogicHandler(channel));

        }

        @Override
        public void failed(Throwable exc, AIOServerThread aioServerThread) {
            // TODO Auto-generated method stub
            System.out.println("服务器连接失败...");
            aioServerThread.latch.countDown();
        }

    }

    public AIOServerThread() {
        this.latch = new CountDownLatch(1);
        try {
            this.serverChannel = AsynchronousServerSocketChannel.open();
            this.serverChannel.bind(new InetSocketAddress(PORT));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("服务器在端口:" + PORT + " 上监听!");
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        this.serverChannel.accept(this, new AcceptHandler());
        try {
            this.latch.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("服务器连接失败!");

    }

}

class AIOClientThread implements Runnable {
    private static final String HOST = "localhost";
    private static final int PORT = 9999;
    private static final int bufferSize = 50;
    private CountDownLatch latch;
    private AsynchronousSocketChannel clientChannel = null;

    public AIOClientThread() {
        try {
            this.clientChannel = AsynchronousSocketChannel.open();
            this.clientChannel.connect(new InetSocketAddress(HOST, PORT));
            this.latch = new CountDownLatch(1);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {

        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            // TODO Auto-generated method stub
            buffer.flip();
            System.out.println(new String(buffer.array(), 0, buffer.remaining()));
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            // TODO Auto-generated method stub
            System.out.println("出现问题,客户端被关闭!");
            try {
                clientChannel.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            latch.countDown();
        }

    }

    class ClientWriterHandler implements CompletionHandler<Integer, ByteBuffer> {

        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            // TODO Auto-generated method stub
            if (buffer.hasRemaining()) {
                clientChannel.write(buffer, buffer, this);
            } else {
                ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
                clientChannel.read(readBuffer, readBuffer, new ClientReadHandler());
            }

        }

        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            // TODO Auto-generated method stub

        }
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            this.latch.await();
            this.clientChannel.close();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public boolean sendInfo(String msg) {
        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
        buffer.put(msg.getBytes());
        buffer.flip();
        this.clientChannel.write(buffer, buffer, new ClientWriterHandler());
        if ("exit".equalsIgnoreCase(msg)) {
            return false;
        }
        return true;
    }

    public String GetInputStr() {
        BufferedReader keyboardInput = new BufferedReader(new InputStreamReader(System.in));
        boolean flag = true;
        String result = null;
        while (flag) {
            System.out.println("请输入信息!");
            try {
                result = keyboardInput.readLine();
                if ("exit".equals(result)) {
                    flag = false;
                }
                return result;
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return result;
    }

    public void startSendMsg() {
        while (sendInfo(GetInputStr())) {
        }
        System.out.println("客户端退出!");
    }

}

public class Main {
    public static void main(String[] args) {
        new Thread(new AIOServerThread()).start();
        AIOClientThread clientThread = new AIOClientThread();
        new Thread(clientThread).start();
        clientThread.startSendMsg();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256