0%

三种IO模型

本文简述三种IO模型

客户端与服务端建立连接的过程

涉及到的内核调用

socket

功能:创建通信端点并返回描述符

函数

int socket(int domain, int type, int protocol)

参数

domain:通信域,选择用于通信的协议族,如ipv4,ipv6

type:socket的类型,包括:SOCK_STREAM,SOCK_DGRAM等

返回值

成功,新创建的socket的文件描述符;失败,返回-1

bind

功能:为Socket分配地址,这个地址指向一个地址结构

函数

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen)

参数

sockfd:Socket的文件描述符

addr:服务端通信的地址和端口

addrlen:addr结构体的大小

返回值

成功,返回0,失败,返回-1

listen

功能:标记一个Socket的文件描述符为活跃状态,标记后,这个Socket可以用来接收连接请求

函数

int listen(int sockfd, int backlog);

参数

sockfd:Socket的文件描述符

backlog:定义了sockfd的待接收的阻塞队列最大长度,如果连接请求在队列中已满,客户端会收到错误提示,或者重试连接

返回

accept

功能:从阻塞连接队列中,拿出第一个连接请求,创建一个新的已连接状态的Socket

函数

int accept(int sockfd, struct sockaddr * addr, socklen_t *addrlen);

int accept4(int sockfd, struct sockaddr addr, socklen_t addrlen, int flags)

参数

sockfd:通过socket,bind,listen创建的socket文件描述符

sockaddr:指向sockaddr结构体,保存对端的地址

addrlen:addr结构体的大小

返回

成功,返回非负数的已接收的文件描述符;失败,返回-1

客户端代码

由于客户端在不同IO模型中都是通用的,所以先列举下客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.io.*;
import java.net.Socket;

public class SocketClient {

private static final String SERVER_IP = "192.168.174.101";

private static final Integer PORT = 9090;

public static void start() throws IOException {
Socket client = new Socket(SERVER_IP,PORT);
OutputStream out = client.getOutputStream();

InputStream in = System.in;
BufferedReader reader = new BufferedReader(new InputStreamReader(in));

while(true){
String line = reader.readLine();
if(line != null ){
byte[] bb = line.getBytes();
for (byte b : bb) {
out.write(b);
}
}
}
}

public static void main(String[] args) throws IOException {
start();
}
}

BIO

Server 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class BIOServer {

private static final Integer port = 9090;

private static final Integer backlog = 20;

public void start() throws IOException {
// socket bind listen
ServerSocket serverSocket = new ServerSocket(port, backlog);
System.out.println("new socket " + port);
while (true) {
// 监听 server accept,创建一个新的用来连接客户端Socket
Socket client = serverSocket.accept();
System.out.println("accept " + getClientInfo(client));
// 创建线程从socket中一直读数据
new Thread(() -> {
InputStream inputStream = null;
try {
inputStream = client.getInputStream();

BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
char[] data = new char[1024];
while (true) {
int num = reader.read(data);
if (num > 0) {
System.out.println("read some data is :" + num + " val :" + new String(data, 0, num));
} else if (num == 0) {
System.out.println("read nothing");
} else {
System.out.println("read " + num);
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(getClientInfo(client) + " 端口连接");
}).start();
}
}

private String getClientInfo(Socket client) {
return "client: " + client.getInetAddress().getHostAddress() + client.getPort();
}

public static void main(String[] args) throws IOException {
BIOServer bioServer = new BIOServer();
bioServer.start();
}

}

存在问题

  1. 每个连接都要创建一个线程,创建线程是要发起一个软中断,从用户态切换到内核态,调用内核指令(创建线程是一个很耗时的操作)
  2. 当连接的客户端数量多时,线程数量增多,对内存的消耗大

NIO

Server 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;

public class NIOServer {

LinkedList<SocketChannel> clients = new LinkedList<>();

public void start() throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9090));
ssc.configureBlocking(false);
while (true) {
SocketChannel client = ssc.accept();
if (client != null) {
client.configureBlocking(false);
System.out.println("accept client port: " + client.socket().getPort());
clients.add(client);
}
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
for (SocketChannel c : clients) {
int num = c.read(byteBuffer);
if (num > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
String rec = new String(bytes);
System.out.println(c.socket().getPort() + ": " + rec);
byteBuffer.clear();
}
}
}

}

public static void main(String[] args) throws IOException {
NIOServer nioServer = new NIOServer();
nioServer.start();
}

}

相比BIO的改进

  1. 不需要创建多个线程,节省了创建线程和内存开销

存在问题

  1. 由客户端对所有连接执行系统调用read(软中断,用户态切换到内核态),其中很多连接中没有数据,导致很多read调用是无意义的,造成资源的浪费
  2. 随着连接的客户端增多,每次循环都要O(n)复杂度的read系统调用

select & poll 的多路复用

Server 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

public class MultiplexServer {

private ServerSocketChannel server = null;

private Selector selector = null;

private void initServer() throws IOException {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(9090));
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
}

public void start() throws IOException {
initServer();
while (true) {
Set<SelectionKey> keySet = selector.keys();
while (selector.select() > 0) {
// 返回有状态的fd集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历有状态的fds,判断是 listen 还是通信
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
} else if (key.isWritable()) {
writeHandler(key);
}
}
}
}

}

private void acceptHandler(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel channel = ssc.accept();
channel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
channel.register(selector, SelectionKey.OP_READ, byteBuffer);
System.out.println("有新客户端连接:" + channel.getRemoteAddress());
}

private void readHandler(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
while (true) {
read = channel.read(buffer);
if (read > 0) {
System.out.println("read client: " + new String(buffer.array(), 0, read));
channel.register(key.selector(), SelectionKey.OP_WRITE, buffer);
} else if (read == 0) {
break;
} else {
channel.close();
System.out.println("client: " + channel.getRemoteAddress() + " closed");
break;
}
}
}

private void writeHandler(SelectionKey key) throws IOException {
System.out.println("write handler");
SocketChannel ssc = (SocketChannel) key.channel();
String sendText = "response message ------";
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.put(sendText.getBytes(StandardCharsets.UTF_8));
buffer.flip();
while (buffer.hasRemaining()) {
ssc.write(buffer);
}
ssc.register(selector, SelectionKey.OP_READ, buffer);
}

public static void main(String[] args) throws IOException {
MultiplexServer server = new MultiplexServer();
server.start();
}

}

相比NIO的改进

select&poll的多路复用和NIO,都要遍历所有的连接询问状态,区别是,NIO的遍历由用户发起,涉及用户态到内核态的切换的系统调用;多路复用器只有一次系统调用,内核根据用户传过来的fds,遍历,返回所有有状态的fds,再由用户read

存在问题

  1. 每次select或poll都要传递所有的fds,其中,select要将整个fds列表传递给内核,有大量的内存拷贝
  2. 内核需要遍历全量fds,O(N)的时间复杂度,不是所有的fd都有数据要读,所以有一定的浪费

epoll 的多路复用

epoll 的原理

完成epoll操作一共有三个步骤,即三个函数互相配合:epoll_createepoll_ctlepoll_wait

先用epoll_create创建一个epoll对象epfd,再通过epoll_ctl将需要监视的socket添加到epfd中,最后调用epoll_wait等待数据。 当执行epoll_create时 ,系统会在内核中创建一个红黑树和就绪链表。 当执行epoll_ctl放入socket时 ,epoll会检测上面的红黑树是否存在这个socket,存在的话就立即返回,不存在就添加。然后给内核中断处理程序注册一个回调函数,告诉内核,如果这个socket句柄的中断到了,就把它放到准备就绪list链表里。如果网卡有数据到达,向cpu发出中断信号,cpu响应中断,中断程序就会执行前面的回调函数。红黑树是自平衡的二叉排序树,适合频繁插入和删除的场景。增删查一般时间复杂度是 O(logn)。

epoll_wait就只检查就绪链表,如果链表不为空,就返回就绪链表中就绪的socket,否则就等待。只将有事件发生的 Socket 集合传递给应用程序,不需要像 select/poll 那样轮询扫描整个集合(包含有和无事件的 Socket ),大大提高了检测的效率

image-20221008035527210

epoll 的实现流程

epoll_create 创建 epoll 对象

epoll_create是内核提供的一个创建epoll对象的系统调用,当用户进程调用 epoll_create 时,内核会创建一个 struct eventpoll

1
2
3
4
5
6
7
8
struct eventpoll {
// 等待队列,存放阻塞在epoll上的进程
wait_queue_head_t wq;
// 就绪队列,存放IO就绪的socket连接
struct list_head rdllist;
// 红黑树,用来管理所有监听的socket连接
struct rb_root rbr;
}

epoll_ctl 向 epoll 对象中添加要监听的Socket

  1. epoll中用struct epitem表示一个Socket连接,epoll 使用红黑树管理大量的socket连接,struct epitem 就是红黑树上的一个个节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    struct epitem
    {
    //指向所属epoll对象
    struct eventpoll *ep;
    //注册的感兴趣的事件,也就是用户空间的epoll_event
    struct epoll_event event;
    //指向epoll对象中的就绪队列
    struct list_head rdllink;
    //指向epoll中对应的红黑树节点
    struct rb_node rbn;
    //指向epitem所表示的socket->file结构以及对应的fd
    struct epoll_filefd ffd;
    }
  2. 在内核中创建完struct epitem后,在Socket中的等待队列上创建等待项wait_queue_t并注册epoll的回调函数ep_poll_callback

  3. ep_poll_callback中可以根据Socket等待队列中的等待项wait,通过eppoll_entry 关联 epitem后,将epitem插入到epoll中的红黑树struct rb_root rbr

epoll_wait同步阻塞获取IO就绪的Socket

  1. 用户程序调用epoll_wait后,内核查找epoll中的就绪队列event_poll->rdllist是否有IO就绪的epitem,如果有,将就绪的socket信息封装到epoll_event返回

相比于 select&poll 的多路复用的改进

  1. 不需要用户程序传递fds,由内核通过红黑树维护所有的fd,减少内存拷贝
  2. 不需要内核遍历全量fds,而是通过callback将IO就绪的Socket放到就绪队列中,提高了性能

总结

本文简述了三种IO模型,包括BIO、NIO、多路复用器,分别描述了三者的原理,比较了优缺点

Todo1:本文没有对三者进行实验,未能产生数据上的比较分析,在实现细节上有很多地方还没有搞清楚,待后续学习并补充

Todo2:本文未能从系统调用的过程进行分析,待补充

参考文献