本文简述三种IO模型
客户端与服务端建立连接的过程
涉及到的内核调用
socket
功能:创建通信端点并返回描述符
函数
int socket(int domain, int type, int protocol)
参数
domain:通信域,选择用于通信的协议族,如ipv4,ipv6
type:socket的类型,包括:SOCK_STREAM,SOCK_DGRAM等
返回值
成功,新创建的socket的文件描述符;失败,返回-1
bind
功能:为Socket分配地址,这个地址指向一个地址结构
函数
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
参数
sockfd:Socket的文件描述符
addr:服务端通信的地址和端口
addrlen:addr结构体的大小
返回值
成功,返回0,失败,返回-1
listen
功能:标记一个Socket的文件描述符为活跃状态,标记后,这个Socket可以用来接收连接请求
函数
int listen(int sockfd, int backlog);
参数
sockfd:Socket的文件描述符
backlog:定义了sockfd的待接收的阻塞队列最大长度,如果连接请求在队列中已满,客户端会收到错误提示,或者重试连接
返回
accept
功能:从阻塞连接队列中,拿出第一个连接请求,创建一个新的已连接状态的Socket
函数
int accept(int sockfd, struct sockaddr * addr, socklen_t
*addrlen);
int accept4(int sockfd, struct sockaddr addr, socklen_t
addrlen, int flags)
参数
sockfd:通过socket,bind,listen创建的socket文件描述符
sockaddr:指向sockaddr结构体,保存对端的地址
addrlen:addr结构体的大小
返回
成功,返回非负数的已接收的文件描述符;失败,返回-1
客户端代码
由于客户端在不同IO模型中都是通用的,所以先列举下客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import java.io.*; import java.net.Socket;
public class SocketClient {
private static final String SERVER_IP = "192.168.174.101";
private static final Integer PORT = 9090;
public static void start() throws IOException { Socket client = new Socket(SERVER_IP,PORT); OutputStream out = client.getOutputStream();
InputStream in = System.in; BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while(true){ String line = reader.readLine(); if(line != null ){ byte[] bb = line.getBytes(); for (byte b : bb) { out.write(b); } } } } public static void main(String[] args) throws IOException { start(); } }
|
BIO
Server 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket;
public class BIOServer {
private static final Integer port = 9090;
private static final Integer backlog = 20;
public void start() throws IOException { ServerSocket serverSocket = new ServerSocket(port, backlog); System.out.println("new socket " + port); while (true) { Socket client = serverSocket.accept(); System.out.println("accept " + getClientInfo(client)); new Thread(() -> { InputStream inputStream = null; try { inputStream = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); char[] data = new char[1024]; while (true) { int num = reader.read(data); if (num > 0) { System.out.println("read some data is :" + num + " val :" + new String(data, 0, num)); } else if (num == 0) { System.out.println("read nothing"); } else { System.out.println("read " + num); client.close(); break; } } } catch (IOException e) { e.printStackTrace(); } System.out.println(getClientInfo(client) + " 端口连接"); }).start(); } }
private String getClientInfo(Socket client) { return "client: " + client.getInetAddress().getHostAddress() + client.getPort(); }
public static void main(String[] args) throws IOException { BIOServer bioServer = new BIOServer(); bioServer.start(); }
}
|
存在问题
- 每个连接都要创建一个线程,创建线程是要发起一个软中断,从用户态切换到内核态,调用内核指令(创建线程是一个很耗时的操作)
- 当连接的客户端数量多时,线程数量增多,对内存的消耗大
NIO
Server 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.LinkedList;
public class NIOServer {
LinkedList<SocketChannel> clients = new LinkedList<>();
public void start() throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.bind(new InetSocketAddress(9090)); ssc.configureBlocking(false); while (true) { SocketChannel client = ssc.accept(); if (client != null) { client.configureBlocking(false); System.out.println("accept client port: " + client.socket().getPort()); clients.add(client); } ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096); for (SocketChannel c : clients) { int num = c.read(byteBuffer); if (num > 0) { byteBuffer.flip(); byte[] bytes = new byte[byteBuffer.limit()]; byteBuffer.get(bytes); String rec = new String(bytes); System.out.println(c.socket().getPort() + ": " + rec); byteBuffer.clear(); } } }
}
public static void main(String[] args) throws IOException { NIOServer nioServer = new NIOServer(); nioServer.start(); }
}
|
相比BIO的改进
- 不需要创建多个线程,节省了创建线程和内存开销
存在问题
- 由客户端对所有连接执行系统调用read(软中断,用户态切换到内核态),其中很多连接中没有数据,导致很多read调用是无意义的,造成资源的浪费
- 随着连接的客户端增多,每次循环都要O(n)复杂度的read系统调用
select & poll 的多路复用
Server 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Set;
public class MultiplexServer {
private ServerSocketChannel server = null;
private Selector selector = null;
private void initServer() throws IOException { server = ServerSocketChannel.open(); server.configureBlocking(false); server.bind(new InetSocketAddress(9090)); selector = Selector.open(); server.register(selector, SelectionKey.OP_ACCEPT); }
public void start() throws IOException { initServer(); while (true) { Set<SelectionKey> keySet = selector.keys(); while (selector.select() > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { acceptHandler(key); } else if (key.isReadable()) { readHandler(key); } else if (key.isWritable()) { writeHandler(key); } } } }
}
private void acceptHandler(SelectionKey key) throws IOException { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel channel = ssc.accept(); channel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(4096); channel.register(selector, SelectionKey.OP_READ, byteBuffer); System.out.println("有新客户端连接:" + channel.getRemoteAddress()); }
private void readHandler(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); int read = 0; while (true) { read = channel.read(buffer); if (read > 0) { System.out.println("read client: " + new String(buffer.array(), 0, read)); channel.register(key.selector(), SelectionKey.OP_WRITE, buffer); } else if (read == 0) { break; } else { channel.close(); System.out.println("client: " + channel.getRemoteAddress() + " closed"); break; } } }
private void writeHandler(SelectionKey key) throws IOException { System.out.println("write handler"); SocketChannel ssc = (SocketChannel) key.channel(); String sendText = "response message ------"; ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.put(sendText.getBytes(StandardCharsets.UTF_8)); buffer.flip(); while (buffer.hasRemaining()) { ssc.write(buffer); } ssc.register(selector, SelectionKey.OP_READ, buffer); }
public static void main(String[] args) throws IOException { MultiplexServer server = new MultiplexServer(); server.start(); }
}
|
相比NIO的改进
select&poll的多路复用和NIO,都要遍历所有的连接询问状态,区别是,NIO的遍历由用户发起,涉及用户态到内核态的切换的系统调用;多路复用器只有一次系统调用,内核根据用户传过来的fds,遍历,返回所有有状态的fds,再由用户read
存在问题
- 每次select或poll都要传递所有的fds,其中,select要将整个fds列表传递给内核,有大量的内存拷贝
- 内核需要遍历全量fds,O(N)的时间复杂度,不是所有的fd都有数据要读,所以有一定的浪费
epoll 的多路复用
epoll 的原理
完成epoll操作一共有三个步骤,即三个函数互相配合:epoll_create
,epoll_ctl
,epoll_wait
先用epoll_create
创建一个epoll对象epfd,再通过epoll_ctl
将需要监视的socket添加到epfd中,最后调用epoll_wait
等待数据。
当执行epoll_create
时
,系统会在内核中创建一个红黑树和就绪链表。 当执行epoll_ctl放入socket时
,epoll会检测上面的红黑树是否存在这个socket,存在的话就立即返回,不存在就添加。然后给内核中断处理程序注册一个回调函数,告诉内核,如果这个socket句柄的中断到了,就把它放到准备就绪list链表里。如果网卡有数据到达,向cpu发出中断信号,cpu响应中断,中断程序就会执行前面的回调函数。红黑树是自平衡的二叉排序树,适合频繁插入和删除的场景。增删查一般时间复杂度是
O(logn)。
epoll_wait就只检查就绪链表,如果链表不为空,就返回就绪链表中就绪的socket,否则就等待。只将有事件发生的
Socket 集合传递给应用程序,不需要像 select/poll
那样轮询扫描整个集合(包含有和无事件的 Socket
),大大提高了检测的效率
epoll 的实现流程
epoll_create 创建 epoll 对象
epoll_create
是内核提供的一个创建epoll对象的系统调用,当用户进程调用
epoll_create 时,内核会创建一个 struct eventpoll
1 2 3 4 5 6 7 8
| struct eventpoll { wait_queue_head_t wq; struct list_head rdllist; struct rb_root rbr; }
|
epoll_ctl 向 epoll
对象中添加要监听的Socket
epoll中用struct epitem
表示一个Socket连接,epoll
使用红黑树管理大量的socket连接,struct epitem
就是红黑树上的一个个节点
1 2 3 4 5 6 7 8 9 10 11 12 13
| struct epitem { struct eventpoll *ep; struct epoll_event event; struct list_head rdllink; struct rb_node rbn; struct epoll_filefd ffd; }
|
在内核中创建完struct epitem
后,在Socket中的等待队列上创建等待项wait_queue_t
并注册epoll的回调函数ep_poll_callback
在ep_poll_callback
中可以根据Socket等待队列中的等待项wait
,通过eppoll_entry
关联
epitem
后,将epitem
插入到epoll
中的红黑树struct rb_root rbr
中
epoll_wait同步阻塞获取IO就绪的Socket
- 用户程序调用
epoll_wait
后,内核查找epoll中的就绪队列event_poll->rdllist
是否有IO就绪的epitem,如果有,将就绪的socket信息封装到epoll_event
返回
相比于 select&poll
的多路复用的改进
- 不需要用户程序传递fds,由内核通过红黑树维护所有的fd,减少内存拷贝
- 不需要内核遍历全量fds,而是通过callback将IO就绪的Socket放到就绪队列中,提高了性能
总结
本文简述了三种IO模型,包括BIO、NIO、多路复用器,分别描述了三者的原理,比较了优缺点
Todo1:本文没有对三者进行实验,未能产生数据上的比较分析,在实现细节上有很多地方还没有搞清楚,待后续学习并补充
Todo2:本文未能从系统调用的过程进行分析,待补充
参考文献