Java Socket NIO

基础

NIO:Non-Blocking IO 非阻塞 IO,主要用于网络连接中非阻塞的读写,提供多路非阻塞式的高伸缩性网络 I/O 。异步 I/O 的一个优势在于,可以同时根据大量的输入和输出执行 I/O。同步程序常常需要轮询,或者创建很多线程处理大量的连接。使用异步 I/O,可以监听任何数量的通道上的事件,不用轮询也不用额外的线程。

Selector

选择器:是 Java NIO 中能够检测一到多个 NIO 通道,是多路复用器,能够监听通道是否为读写事件做好准备。因此一个单独的线程可以管理多个通道,从而管理多个网络连接。
Selector 用来支持异步 I/O 操作(非阻塞I/O操作),Channel 必须处于非阻塞模式下(因此 FileChannel, Selector 不能一起使用)。Selector 是非阻塞 I/O 的核心,所有希望采用非阻塞 I/O 进行通信的通道,都应该注册到 Selector 对象。

SelectionKey

SelectionKey 包含监听的不同类型事件:

  • OP_READ
  • OP_WRITE
  • OP_CONNECT
  • OP_ACCEPT

常用 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    public abstract class SelectionKey {
// 获取 channel
public abstract SelectableChannel channel();
// 获取 Selector
public abstract Selector selector();
// 获取注册的监听事件集合
public abstract int interestOps();
// 更换监听事件
public abstract SelectionKey interestOps(int ops);
// 获取已经 ready 了的事件集合
public abstract int readyOps();
// 取消监听的事件
public abstract void cancel();
// 判断事件
public final boolean isReadable() {...}
public final boolean isWritable() {...}
public final boolean isConnectable() {...}
public final boolean isAcceptable() {...}
// 添加附加对象
public final Object attach(Object ob) {...}
// 获取附件对象
public final Object attachment() {...}
}

常用 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    public abstract class Selector implements Closeable {
// 创建 Selector
public static Selector open() throws IOException {...}
public abstract boolean isOpen();

// 返回所有的注册事件集
public abstract Set<SelectionKey> keys();
// 返回 select 已经就绪的注册事件集
public abstract Set<SelectionKey> selectedKeys();

// 非阻塞,如果监听事件没准备好立即返回 0
public abstract int selectNow() throws IOException;
// 阻塞等待直到注册事件准备好,返回监听的事件
public abstract int select(long timeout) throws IOException;
public abstract int select() throws IOException;

// 如果 select 阻塞了,直接唤醒 select 立马返回
public abstract Selector wakeup();
// 关闭
public abstract void close() throws IOException;
}

SelectableChannel 支持选择器的通道

表示可以支持多路复用的通道,支持阻塞和非阻塞模式。(默认情况下,所有的 Channel 都是阻塞的),需要设置为非阻塞模式,才能使用 NIO 特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract class SelectableChannel
extends AbstractInterruptibleChannel
implements Channel {
// false 表示设置为非阻塞模式
public abstract SelectableChannel configureBlocking(boolean block)
throws IOException;
// 判断是否为阻塞模式
public abstract boolean isBlocking();
// 注册选择器
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException {...}
// 通道是否注册一个或多个选择器
public abstract boolean isRegistered();
// 返回通道和该选择器的注册关系
public abstract SelectionKey keyFor(Selector sel);
}

注册通道

通道必须处于非阻塞模式,可以监听多个事件。

1
2
3
4
5
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
// 监听多个事件
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
SelectionKey key = channel.register(selector, interestSet);

参考示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}

SocketChannel

对应于 java.net.Socket 类。

创建 SocketChannel

默认创建一个阻塞 SocketChannel,两种方法的差异:

  • open()
    后续还需要手动配置为非阻塞后,监听连接事件。
  • open(SocketAddress remote)
    创建并同步等待连接 remote,直到连接成功后返回。不需要再手动连接。

支持的非阻塞事件

  • OP_CONNECT
  • OP_READ
  • OP_WRITE

不支持 Accept 事件

常见 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public abstract class SocketChannel
extends AbstractSelectableChannel
implements ByteChannel, ScatteringByteChannel,
GatheringByteChannel, NetworkChannel {
// 创建 SocketChannel
public static SocketChannel open() throws IOException {...}
// 创建 SocketChannel ,同步等待连接 remote
public static SocketChannel open(SocketAddress remote)
throws IOException;

// 绑定
public abstract SocketChannel bind(SocketAddress local)
throws IOException;
// 连接,非阻塞模式会直接返回连接结果;阻塞模式会等待直到连接成功
public abstract boolean connect(SocketAddress remote) throws IOException;
// 完成连接,非阻塞模式直接返回连接是否完成;阻塞模式会等待直到连接完成
public abstract boolean finishConnect() throws IOException;
// 当前通道是否已经连接
public abstract boolean isConnected();
// 当前通道是否正在连接,一般是第一次连接后调用
// 用来判断是否正在连接,并接着调用 finishConnect
public abstract boolean isConnectionPending();
// 设置 socket 选项
public abstract <T> SocketChannel setOption(SocketOption<T> name, T value)
throws IOException;

// 获取当前 socket
public abstract Socket socket();
// 获取源地址和端口,目标地址和端口
public abstract SocketAddress getRemoteAddress() throws IOException;
public abstract SocketAddress getLocalAddress() throws IOException;

// 通道读写缓存
public abstract int read(ByteBuffer dst) throws IOException;
public abstract long read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
public final long read(ByteBuffer[] dsts) throws IOException {...}
public abstract int write(ByteBuffer src) throws IOException;
public abstract long write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
public final long write(ByteBuffer[] srcs) throws IOException {...}

// 返回支持的 SelectionKey 事件
public final int validOps() {...}

// 关闭通道的读功能
public abstract SocketChannel shutdownInput() throws IOException;
// 关闭通道的写功能
public abstract SocketChannel shutdownOutput() throws IOException;
}

ServerSocketChannel

对应于 java.net.ServerSocket 类。

创建 ServerSocketChannel

创建 ServerSocketChannel 也很简单:ServerSocketChannel server = ServerSocketChannel.open();

支持的非阻塞事件

仅支持 OP_ACCEPT 事件。

常见 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public abstract class ServerSocketChannel
extends AbstractSelectableChannel
implements NetworkChannel {
// 创建 ServerSocketChannel
public static ServerSocketChannel open() throws IOException {...}
// 返回支持的事件
public final int validOps() {...}
// 绑定
public final ServerSocketChannel bind(SocketAddress local)
throws IOException{...}
public abstract ServerSocketChannel bind(SocketAddress local, int backlog)
throws IOException;
// 设置 socket 选项
public abstract <T> ServerSocketChannel setOption(SocketOption<T> name, T value)
throws IOException;
// 获取当前 socket
public abstract ServerSocket socket();
// 监听连接,非阻塞模式下立即返回
public abstract SocketChannel accept() throws IOException;
// 获取源地址和端口
public abstract SocketAddress getLocalAddress() throws IOException;
}

客户端和服务端 TCP 通信流程

整个通信流程参考 TCP 通信流程,只是实现方式不一样。本例实现一个聊天室功能。

服务端初始化

  • 创建 Selector
    public static Selector open() throws IOException;,创建选择器。
  • 创建 ServerSocketChannel
    public static ServerSocketChannel open() throws IOException;,创建服务端通道,并配置为非阻塞。
  • 绑定 bind
    public final ServerSocketChannel bind(SocketAddress local){...},绑定指定地址和端口。
  • 注册 Accept 事件
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);,注册后服务端选择器开始监听客户端的连接。

服务端轮询监听并处理事件

selector.select(),阻塞等待。监听客户端的连接及输入,从这里可以看出 nio 并不完全是异步,还是会有阻塞。

客户端初始化

  • 创建 Selector
    public static Selector open() throws IOException;,创建选择器。
  • 创建 SocketChannel
    public static SocketChannel open() throws IOException;,创建客户端通道,并配置为非阻塞。
  • 注册 Connect 事件
    socketChannel.register(selector, SelectionKey.OP_CONNECT);,注册事件后准备连接。
  • 连接 connect
    public abstract boolean connect(SocketAddress remote) throws IOException;,连接服务器。

客户端轮询监听并处理事件

selector.select(),阻塞等待。监听服务端反馈和输入。

数据通信

  • 服务端接受连接后监听客户端写入
    服务端接受客户端连接后,拿到客户端 SocketChannel,并同步注册 Read 事件,监听客户端输入。通过该通道读写缓冲区实现数据通信。
  • 客户端监听服务端发送的消息
    客户端接受到服务端连接响应后,客户端 SocketChannel 注册 Read 事件,监听该通道来自服务端的输入。

服务端和客户端关闭

客户端和服务端分别关闭 Selector, ServerSocketChannel, SocketChannel

服务端存在的问题:客户端的 SocketChannel 关闭后,服务端此通道并没有断开连接,并且该通道注册到选择器的读事件,会被反复触发。也就是说服务端 select 一直都会返回 OP_READ,但是通道中读入缓冲区的数据实际总是为 -1,即并没有数据。为什么会反复触发?

1
2
3
4
5
6
7
8
9
10
11
12
// 服务端打印信息,服务端 10000,客户端 62676
client: java.nio.channels.SocketChannel[connected local=/127.0.0.1:10000
remote=/127.0.0.1:62676]
client.isOpen(): true
client.isConnected(): true
client.isConnectionPending(): false
client.isRegistered(): true
// 1 表示为 OP_READ 事件
client.keyFor(selector).interestOps(): 1
client.keyFor(selector).readyOps(): 1
// 通道读入缓冲区实际值为 -1,即没有数据
count: -1

TCP 通信示例

本例实现一个聊天室功能。

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
public class TestNIOTCPServer {

// private static final String HOST_NAME = "127.0.0.1";
private static final int PORT = 10000;
private static final int BUFF_CAP = 1024;

private Selector selector = null;
private ServerSocketChannel serverSocketChannel = null;
// 输入信息编解码器
private Charset charset = Charset.forName("UTF-8");
// 接收信息缓冲区
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFF_CAP);

/**
* 服务端通道和选择器初始化,绑定端口,配置为非阻塞,并开启监听连接
* @throws IOException
*/
private void init() throws IOException{
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress localAddress = new InetSocketAddress(PORT);
serverSocketChannel.bind(localAddress);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server start on port: " + PORT);
}

/**
* 服务器轮询监听通道,select 会阻塞直到监听到事件
*/
private void listen(){
System.out.println("listen...");
try {
while (selector.select() > 0){
for (SelectionKey selectionKey : selector.selectedKeys()){
handleSelectorKey(selectionKey);
}
// 清空已经处理的事件
selector.selectedKeys().clear();
}
} catch (IOException e){
e.printStackTrace();
} finally {
try {
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
if (selector != null){
selector.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 处理接收到的事件
* @param selectionKey: 接收到的事件
* @throws IOException
*/
private void handleSelectorKey (SelectionKey selectionKey) throws IOException {
System.out.println("handleSelectorKey, key = " + selectionKey.readyOps());
if (selectionKey.isAcceptable()){
// 选择器监听到客户端连接
SocketChannel client = serverSocketChannel.accept();
client.configureBlocking(false);
// 接收客户端连接后,客户端注册通道读事件
client.register(selector, SelectionKey.OP_READ);
System.out.println(client.getRemoteAddress() + " connected.");
}
if (selectionKey.isReadable()){
// 选择器监听到客户端通道写入
SocketChannel client = (SocketChannel) selectionKey.channel();
StringBuilder builder = new StringBuilder();
rBuffer.clear();
int count = 0;

// System.out.println("client: " + client.toString());
// System.out.println("opened: " + client.isOpen());
// System.out.println("connected: " + client.isConnected());
// System.out.println("connectionPending: " + client.isConnectionPending());
// System.out.println("isRegistered: " + client.isRegistered());
// System.out.println("interest keys: " + client.keyFor(selector).interestOps());
// System.out.println("ready keys: " + client.keyFor(selector).readyOps());

try {
// 通道读入缓冲区
while ((count = client.read(rBuffer)) > 0){
rBuffer.flip();
builder.append(charset.decode(rBuffer));
}
System.out.println(client.toString() + ":" + builder);
if (count < 0){
// 当客户端退出时,服务端的 socket 通道仍然打开并处于连接状态
// 服务端的 select 一直会被触发,但是读取的结果一直是 -1
// 所以可以认为此时客户端已经断开,读取结束后关闭通道,使 key 失效
System.out.println(client.toString() + " has been closed.");
selectionKey.cancel();
if (client != null){
client.close();
}
}
} catch (IOException e){
e.printStackTrace();
// 读取异常,可能是客户端已经断开连接,关闭该通道
System.out.println(client.toString() + " disconnected.");
selectionKey.cancel();
if (client != null){
client.close();
}
}
if (builder.length() > 0){
dispatchInfoToAllClient(client, builder.toString());
}
}
}

/**
* 当前客户端输入信息,广播到其他客户端
* @param client: 接收到输入信息的当前客户端
* @param info: 接收到的信息
* @throws IOException: 其他客户端写入异常
*/
private void dispatchInfoToAllClient(SocketChannel client, String info) throws IOException {
// System.out.println("dispatchInfoToAllClient, info = " + info);
String name = "[" + client.getRemoteAddress() + "-" + client.hashCode() + "]";
for (SelectionKey key : selector.keys()){
Channel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel){
SocketChannel dest = (SocketChannel) targetChannel;
if (!client.equals(dest)) {
// 通道写入缓冲区
dest.write(charset.encode(name + ":" + info));
}
}
}
}

public static void main(String[] args) throws IOException {
TestNIOTCPServer server = new TestNIOTCPServer();
server.init();
server.listen();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public class TestNIOTCPClient {

private static final String HOST_NAME = "127.0.0.1";
// private static final String HOST_NAME = "10.20.6.25";
private static final int PORT = 10000;
private static final int BUFF_CAP = 1024;

private Selector selector = null;
private SocketChannel client = null;
// 通道读写信息编解码器
private Charset charset = Charset.forName("UTF-8");

// 缓冲区,读取服务端写入的信息
ByteBuffer rBuffer = ByteBuffer.allocate(BUFF_CAP);
private volatile boolean isQuit = false;

/**
* 客户端通道,选择器初始化,连接到指定服务器,并监听连接事件
* 开启后台线程读取屏幕输入信息,写入服务器
* @throws IOException
*/
private void init() throws IOException {
selector = Selector.open();
client = SocketChannel.open();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_CONNECT);
InetSocketAddress remote = new InetSocketAddress(HOST_NAME, PORT);
client.connect(remote);

// 开启线程读取客户端输入
ReadThread readThread = new ReadThread();
readThread.setDaemon(true);
readThread.start();
}

/**
* 客户端轮询监听通道,当事件到达后处理事件
* 如果输入 exit/quit ,客户端退出
*/
private void listen(){
try {
while (selector.select() > 0 && !isQuit){
for (SelectionKey selectionKey : selector.selectedKeys()){
handleSelectionKey(selectionKey);
}
selector.selectedKeys().clear();
}
System.out.println("listen: quit...");
} catch (IOException e){
e.printStackTrace();
} finally {
try {
if (selector != null){
System.out.println("close selector");
selector.close();
}
if (client != null){
System.out.println("close client");
client.close();
}
} catch (IOException e){
e.printStackTrace();
}
}
}

/**
* 处理监听到的事件
* @param selectionKey
* @throws IOException
*/
private void handleSelectionKey(SelectionKey selectionKey) throws IOException {
if (selectionKey.isConnectable()){
// SocketChannel client = (SocketChannel) selectionKey.channel();
System.out.println("connected: " + client.isConnected());
System.out.println("connectionPending: " + client.isConnectionPending());
if (client.isConnectionPending()){
client.finishConnect();
System.out.println("connected: " + client.getRemoteAddress());

// 客户端连接成功后,注册通道读事件,读取从服务端写入的数据
client.register(selector, SelectionKey.OP_READ);
}
}
if (selectionKey.isReadable()){
// 选择器监听到服务端写入事件,读取后显示在客户端
//SocketChannel client = (SocketChannel) selectionKey.channel();
StringBuilder builder = new StringBuilder();
rBuffer.clear();
while (client.read(rBuffer) > 0){
rBuffer.flip();
builder.append(charset.decode(rBuffer));
}
System.out.println(builder);
}
}

/**
* 读取屏幕输入的线程
*/
private class ReadThread extends Thread{
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
try {
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (isQuited(line)) {
scanner.close();
isQuit = true;
selector.wakeup();
break;
} else {
client.write(charset.encode(line));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 判断是否退出
* @param value
* @return
*/
private boolean isQuited(String value) {
value = value.trim();
return value.equalsIgnoreCase("quit")
|| value.equalsIgnoreCase("exit");
}

public static void main(String[] args) throws IOException {
TestNIOTCPClient client = new TestNIOTCPClient();
client.init();
client.listen();
}
}

DatagramChannel

[todo]
http://ifeve.com/datagram-channel/
http://blog.csdn.net/foart/article/details/47608475
http://www.365mini.com/page/java-nio-course-27.htm
网络编程第四版-425

参考文档

  1. Java NIO系列教程
  2. java nio SocketChannel 服务器端与多客户端信息交互(聊天功能)
  3. 疯狂 Java 讲义
0%