linux IO 多路复用编程及示例代码
关于 Linux 系统编程 IO 多路复用相关的笔记总结及示例代码。
常用的 3 种 IO 复用模型:
- select,时间复杂度 O(n)
- poll,时间复杂度 O(n)
- epoll,时间复杂度 O(1)
1. IO 复用模型对比
select
它仅仅知道了,有 I/O 事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以 select 具有 O(n) 的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。
poll
poll 本质上和 select 没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个 fd 对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的。
epoll
epoll 可以理解为 event poll,不同于忙轮询和无差别轮询,epoll 会把哪个流发生了怎样的 I/O 事件通知我们。所以 epoll 实际上是事件驱动(每个事件关联上 fd)的,此时我们对这些流的操作都是有意义的。(复杂度降低到了 O(1))。
select,poll,epoll 本质上都是同步 I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步 I/O 则无需自己负责进行读写,异步 I/O 的实现会负责把数据从内核拷贝到用户空间。
2. 函数原型及示例代码
select
#include <sys/select.h>
int select(int maxfdp1,
fd_set *restrict read_fds,
fd_set *restrict write_fds,
fd_set *restrict exceptfds,
struct timeval *restrict tvptr);
maxfdp1 表示最大文件描述符编号+1,通常不应超过 FD_SETSIZE(1024)
tvptr 表示等待的时间长度,粒度为秒和微秒,为 NULL 表示永远等待。
tvptr->tv_sec==0 && tvptr->tv_usec==0 表示不等待,立即返回,即不阻塞 select 函数。
select 只有当指定的描述符中的一个准备好,或者捕捉到一个信号或者等待超时将会返回。
捕捉到信号时,select 返回 -1,并且设置 errno=EINTR
如果 select 超过返回,函数返回值为 0
常用函数/宏
int FD_ISSET(int fd, fd_set *fdset);
void FD_CLR(int fd, fd_set *fdset);
void FD_SET(int fd, fd_set *fdset);
void FD_ZERO(fd_set *fdset);
select 返回值:
-1 表示出错,比如捕捉到了信号等
0 表示没有文件描述符准备好,比如时间超时
>0 表示已经准备好的文件描述符数量,如果同一描述符的读写都准备好,那么将对其计数2次
// 示例代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
int max(int a, int b)
{
return a > b ? a : b;
}
int main(int argc, char* argv[])
{
struct sockaddr_in server_addr1, server_addr2;
int sock_fd1, sock_fd2;
int one = 1;
fd_set fdsr;
int rval;
if ((sock_fd1 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}
if ((sock_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}
if (setsockopt(sock_fd1, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
if (setsockopt(sock_fd2, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
server_addr1.sin_family = AF_INET;
server_addr1.sin_port = htons(5200);
server_addr1.sin_addr.s_addr = INADDR_ANY;
memset(server_addr1.sin_zero, 0, sizeof(server_addr1.sin_zero));
server_addr2.sin_family = AF_INET;
server_addr2.sin_port = htons(5300);
server_addr2.sin_addr.s_addr = INADDR_ANY;
memset(server_addr2.sin_zero, 0, sizeof(server_addr2.sin_zero));
if (bind(sock_fd1, (struct sockaddr *)&server_addr1, sizeof(server_addr1)) == -1) {
perror("bind");
exit(1);
}
if (bind(sock_fd2, (struct sockaddr *)&server_addr2, sizeof(server_addr2)) == -1) {
perror("bind");
exit(1);
}
if (listen(sock_fd1, 100) == -1) {
perror("listen");
exit(1);
}
if (listen(sock_fd2, 100) == -1) {
perror("listen");
exit(1);
}
while (1)
{
FD_ZERO(&fdsr);
FD_SET(sock_fd1, &fdsr);
FD_SET(sock_fd2, &fdsr);
rval = select(max(sock_fd1, sock_fd2) + 1, &fdsr, NULL, NULL, NULL);
if (rval < 0)
{
if (errno == EINTR)
continue;
perror("select");
exit(1);
}
if (FD_ISSET(sock_fd1, &fdsr))
{
int new_client = accept(sock_fd1, NULL, 0);
if (new_client != -1)
{
char *msg = "server1 say hello\n";
send(new_client, msg, strlen(msg), 0);
close(new_client);
}
}
if (FD_ISSET(sock_fd2, &fdsr))
{
int new_client = accept(sock_fd2, NULL, 0);
if (new_client != -1)
{
char *msg = "server2 say hello\n";
send(new_client, msg, strlen(msg), 0);
close(new_client);
}
}
}
return 0;
}
pselect
pselect 是 select 的变体,主要区别:
1. pselect 提供纳秒级别的超时,而 select 提供微秒级别
2. pselect 不允许改变 tsptr 参数,而 select 在某些平台实现上允许改变
3. pselect 允许屏蔽信号,通过 sigmask 指定,如果 sigmash 为 NULL,则与 select 相同
pselect 原型:
#include <sys/select.h>
int pselect(int maxfdp1,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict exceptfds,
const struct timespec *restrict tsptr,
const sigset_t *restrict sigmask);
poll
poll 类似于 select
#include <poll.h>
int poll(struct pollfd fdarray[],
nfds_t nfds,
int timeout);
struct pollfd {
int fd; /* file descriptor to check, or < 0 to ignore */
short events; /* events of interest on fd */
short revents; /* events that occurred on fd */
};
events 可选值如下:
* POLLIN
* POLLRDNORM
* POLLRDBAND
* POLLPRI
* POLLOUT
* POLLWRNORM
* POLLWRBAND
* POLLERR
* POLLHUP
* POLLNVAL
revents 由内核设置,说明每个描述符发生了哪些事件。
timeout == -1 表示永远等待,或者捕捉到一个信号,返回 -1,errno 设置为 EINTR
timeout == 0,不等待,立即返回
timeout > 0 表示等待 timeout 毫秒,如果超时返回 0
// 示例代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <sys/socket.h>
#include <netinet/in.h>
int main(int argc, char* argv[])
{
struct sockaddr_in server_addr1, server_addr2;
int sock_fd1, sock_fd2;
int one = 1;
struct pollfd poll_fd[2];
int rval;
if ((sock_fd1 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}
if ((sock_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}
if (setsockopt(sock_fd1, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
if (setsockopt(sock_fd2, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
server_addr1.sin_family = AF_INET;
server_addr1.sin_port = htons(5200);
server_addr1.sin_addr.s_addr = INADDR_ANY;
memset(server_addr1.sin_zero, 0, sizeof(server_addr1.sin_zero));
server_addr2.sin_family = AF_INET;
server_addr2.sin_port = htons(5300);
server_addr2.sin_addr.s_addr = INADDR_ANY;
memset(server_addr2.sin_zero, 0, sizeof(server_addr2.sin_zero));
if (bind(sock_fd1, (struct sockaddr *)&server_addr1, sizeof(server_addr1)) == -1) {
perror("bind");
exit(1);
}
if (bind(sock_fd2, (struct sockaddr *)&server_addr2, sizeof(server_addr2)) == -1) {
perror("bind");
exit(1);
}
if (listen(sock_fd1, 100) == -1) {
perror("listen");
exit(1);
}
if (listen(sock_fd2, 100) == -1) {
perror("listen");
exit(1);
}
poll_fd[0].fd = sock_fd1;
poll_fd[0].events = POLLIN;
poll_fd[1].fd = sock_fd2;
poll_fd[1].events = POLLIN;
while (1)
{
rval = poll(poll_fd, 2, -1);
if (rval < 0)
{
if (errno == EINTR)
continue;
perror("poll");
exit(1);
}
if (poll_fd[0].revents & POLLIN)
{
int new_client = accept(sock_fd1, NULL, 0);
if (new_client != -1)
{
char *msg = "server1 say hello\n";
send(new_client, msg, strlen(msg), 0);
close(new_client);
}
}
if (poll_fd[1].revents & POLLIN)
{
int new_client = accept(sock_fd2, NULL, 0);
if (new_client != -1)
{
char *msg = "server2 say hello\n";
send(new_client, msg, strlen(msg), 0);
close(new_client);
}
}
}
return 0;
}
epoll
#include <sys/epoll.h>
int epoll_create(int size);
创建一个 epoll 实例,size 参数要大于 0,linux 内核大于 2.6.8,size 参数被忽略。
函数返回一个文件描述符。不再使用时,应当使用 close() 函数关闭返回的文件描述符。
int epoll_create1(int flags);
flags 为 0 时,其表现与 epoll_create 一样。
flags 可选的标志为 EPOLL_CLOEXEC,其效果与 open() 函数的 O_CLOEXEC 标志相似。
在创建 epoll 实例时原子性地指定标志,避免先创建,再指定标志这种分开操作,在多线程环境时,会有竞争的问题。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
epfd 为 epoll_create/epoll_create1 返回的文件描述符
op 为操作类型,可选值如下:
EPOLL_CTL_ADD
EPOLL_CTL_MOD
EPOLL_CTL_DEL
fd 为要关联的文件描述符
event->events 表示 event 类型,可选值如下:
EPOLLIN
EPOLLOUT
EPOLLRDHUP
EPOLLPRI
EPOLLERR
EPOLLHUP
EPOLLET
EPOLLONESHOT
EPOLLWAKEUP
EPOLLEXCLUSIVE
返回 0 表示成功,-1 表示失败,通过 errno 获取具体的失败信息。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
int epoll_pwait2(int epfd, struct epoll_event *events,
int maxevents, const struct timespec *timeout,
const sigset_t *sigmask);
int timeout 为毫秒,-1 为永远等待
timespec timeout 为纳秒,NULL 为永远等待
返回值大于 0 表示就绪状态的文件描述符的数量,0 表示超时,-1 表示失败,通过 errno 查看具体错误详情。
// 示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
int main(int argc, char* argv[])
{
struct sockaddr_in server_addr1, server_addr2;
int sock_fd1, sock_fd2;
int one = 1;
int epoll_fd;
struct epoll_event ev[2], events[100];
int rval;
int i;
if ((sock_fd1 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}
if ((sock_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}
if (setsockopt(sock_fd1, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
if (setsockopt(sock_fd2, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
server_addr1.sin_family = AF_INET;
server_addr1.sin_port = htons(5200);
server_addr1.sin_addr.s_addr = INADDR_ANY;
memset(server_addr1.sin_zero, 0, sizeof(server_addr1.sin_zero));
server_addr2.sin_family = AF_INET;
server_addr2.sin_port = htons(5300);
server_addr2.sin_addr.s_addr = INADDR_ANY;
memset(server_addr2.sin_zero, 0, sizeof(server_addr2.sin_zero));
if (bind(sock_fd1, (struct sockaddr *)&server_addr1, sizeof(server_addr1)) == -1) {
perror("bind");
exit(1);
}
if (bind(sock_fd2, (struct sockaddr *)&server_addr2, sizeof(server_addr2)) == -1) {
perror("bind");
exit(1);
}
if (listen(sock_fd1, 100) == -1) {
perror("listen");
exit(1);
}
if (listen(sock_fd2, 100) == -1) {
perror("listen");
exit(1);
}
epoll_fd = epoll_create(100);
if (epoll_fd == -1)
{
perror("epoll_create");
exit(1);
}
ev[0].events = EPOLLIN;
ev[0].data.fd = sock_fd1;
ev[1].events = EPOLLIN;
ev[1].data.fd = sock_fd2;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd1, &ev[0]) == -1 ||
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd2, &ev[1]) == -1)
{
perror("epoll_ctl");
exit(1);
}
while (1)
{
rval = epoll_wait(epoll_fd, events, 100, -1);
if (rval < 0)
{
if (errno == EINTR)
continue;
perror("epoll_wait");
exit(1);
}
for(i = 0; i < rval; i++)
{
if (events[i].events & EPOLLIN)
{
int new_client = accept(events[i].data.fd, NULL, 0);
if (new_client != -1)
{
char msg[1024];
snprintf(msg, 1024, "server socket %d say hello\n", events[i].data.fd);
send(new_client, msg, strlen(msg), 0);
close(new_client);
}
}
}
}
close(epoll_fd);
return 0;
}
文章评论