linux IO 多路复用编程及示例代码

关于 Linux 系统编程 IO 多路复用相关的笔记总结及示例代码。

常用的 3 种 IO 复用模型:

  • select,时间复杂度 O(n)
  • poll,时间复杂度 O(n)
  • epoll,时间复杂度 O(1)

1. IO 复用模型对比

select

它仅仅知道了,有 I/O 事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以 select 具有 O(n) 的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。

poll

poll 本质上和 select 没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个 fd 对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的。

epoll

epoll 可以理解为 event poll,不同于忙轮询和无差别轮询,epoll 会把哪个流发生了怎样的 I/O 事件通知我们。所以 epoll 实际上是事件驱动(每个事件关联上 fd)的,此时我们对这些流的操作都是有意义的。(复杂度降低到了 O(1))。

select,poll,epoll 本质上都是同步 I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步 I/O 则无需自己负责进行读写,异步 I/O 的实现会负责把数据从内核拷贝到用户空间。

2. 函数原型及示例代码

select

#include <sys/select.h>
int select(int maxfdp1,
           fd_set *restrict read_fds,
           fd_set *restrict write_fds,
           fd_set *restrict exceptfds,
           struct timeval *restrict tvptr);

maxfdp1 表示最大文件描述符编号+1,通常不应超过 FD_SETSIZE(1024)
tvptr 表示等待的时间长度,粒度为秒和微秒,为 NULL 表示永远等待。
tvptr->tv_sec==0 && tvptr->tv_usec==0 表示不等待,立即返回,即不阻塞 select 函数。

select 只有当指定的描述符中的一个准备好,或者捕捉到一个信号或者等待超时将会返回。

捕捉到信号时,select 返回 -1,并且设置 errno=EINTR

如果 select 超过返回,函数返回值为 0

常用函数/宏
int FD_ISSET(int fd, fd_set *fdset);
void FD_CLR(int fd, fd_set *fdset);
void FD_SET(int fd, fd_set *fdset);
void FD_ZERO(fd_set *fdset);

select 返回值:
-1 表示出错,比如捕捉到了信号等
0 表示没有文件描述符准备好,比如时间超时
>0 表示已经准备好的文件描述符数量,如果同一描述符的读写都准备好,那么将对其计数2次

// 示例代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>

int max(int a, int b)
{
    return a > b ? a : b;
}

int main(int argc, char* argv[])
{
    struct sockaddr_in server_addr1, server_addr2;
    int sock_fd1, sock_fd2;
    int one = 1;
    fd_set fdsr;
    int rval;

    if ((sock_fd1 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("socket");
        exit(1);
    }
    if ((sock_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("socket");
        exit(1);
    }

    if (setsockopt(sock_fd1, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }
    if (setsockopt(sock_fd2, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }

    server_addr1.sin_family = AF_INET;
    server_addr1.sin_port = htons(5200);
    server_addr1.sin_addr.s_addr = INADDR_ANY;
    memset(server_addr1.sin_zero, 0, sizeof(server_addr1.sin_zero));

    server_addr2.sin_family = AF_INET;
    server_addr2.sin_port = htons(5300);
    server_addr2.sin_addr.s_addr = INADDR_ANY;
    memset(server_addr2.sin_zero, 0, sizeof(server_addr2.sin_zero));

    if (bind(sock_fd1, (struct sockaddr *)&server_addr1, sizeof(server_addr1)) == -1) {
        perror("bind");
        exit(1);
    }
    if (bind(sock_fd2, (struct sockaddr *)&server_addr2, sizeof(server_addr2)) == -1) {
        perror("bind");
        exit(1);
    }

    if (listen(sock_fd1, 100) == -1) {
        perror("listen");
        exit(1);
    }
    if (listen(sock_fd2, 100) == -1) {
        perror("listen");
        exit(1);
    }

    while (1)
    {
        FD_ZERO(&fdsr);
        FD_SET(sock_fd1, &fdsr);
        FD_SET(sock_fd2, &fdsr);
        rval = select(max(sock_fd1, sock_fd2) + 1, &fdsr, NULL, NULL, NULL);
        if (rval < 0)
        {
            if (errno == EINTR)
                continue;
            perror("select");
            exit(1);
        }
        if (FD_ISSET(sock_fd1, &fdsr))
        {
            int new_client = accept(sock_fd1, NULL, 0);
            if (new_client != -1)
            {
                char *msg = "server1 say hello\n";
                send(new_client, msg, strlen(msg), 0);
                close(new_client);
            }
        }
        if (FD_ISSET(sock_fd2, &fdsr))
        {
            int new_client = accept(sock_fd2, NULL, 0);
            if (new_client != -1)
            {
                char *msg = "server2 say hello\n";
                send(new_client, msg, strlen(msg), 0);
                close(new_client);
            }
        }
    }

    return 0;
}

pselect

pselect 是 select 的变体,主要区别:
1. pselect 提供纳秒级别的超时,而 select 提供微秒级别
2. pselect 不允许改变 tsptr 参数,而 select 在某些平台实现上允许改变
3. pselect 允许屏蔽信号,通过 sigmask 指定,如果 sigmash 为 NULL,则与 select 相同

pselect 原型:

#include <sys/select.h>
int pselect(int maxfdp1,
            fd_set *restrict readfds,
            fd_set *restrict writefds,
            fd_set *restrict exceptfds,
            const struct timespec *restrict tsptr,
            const sigset_t *restrict sigmask);

poll

poll 类似于 select

#include <poll.h>
int poll(struct pollfd fdarray[],
         nfds_t nfds,
         int timeout);
         
struct pollfd {
    int fd;         /* file descriptor to check, or < 0 to ignore */
    short events;   /* events of interest on fd */
    short revents;  /* events that occurred on fd */
};

events 可选值如下:
* POLLIN
* POLLRDNORM
* POLLRDBAND
* POLLPRI
* POLLOUT
* POLLWRNORM
* POLLWRBAND
* POLLERR
* POLLHUP
* POLLNVAL

revents 由内核设置,说明每个描述符发生了哪些事件。

timeout == -1 表示永远等待,或者捕捉到一个信号,返回 -1,errno 设置为 EINTR
timeout == 0,不等待,立即返回
timeout > 0 表示等待 timeout 毫秒,如果超时返回 0

// 示例代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <sys/socket.h>
#include <netinet/in.h>

int main(int argc, char* argv[])
{
    struct sockaddr_in server_addr1, server_addr2;
    int sock_fd1, sock_fd2;
    int one = 1;
    struct pollfd poll_fd[2];
    int rval;

    if ((sock_fd1 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("socket");
        exit(1);
    }
    if ((sock_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("socket");
        exit(1);
    }

    if (setsockopt(sock_fd1, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }
    if (setsockopt(sock_fd2, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }

    server_addr1.sin_family = AF_INET;
    server_addr1.sin_port = htons(5200);
    server_addr1.sin_addr.s_addr = INADDR_ANY;
    memset(server_addr1.sin_zero, 0, sizeof(server_addr1.sin_zero));

    server_addr2.sin_family = AF_INET;
    server_addr2.sin_port = htons(5300);
    server_addr2.sin_addr.s_addr = INADDR_ANY;
    memset(server_addr2.sin_zero, 0, sizeof(server_addr2.sin_zero));

    if (bind(sock_fd1, (struct sockaddr *)&server_addr1, sizeof(server_addr1)) == -1) {
        perror("bind");
        exit(1);
    }
    if (bind(sock_fd2, (struct sockaddr *)&server_addr2, sizeof(server_addr2)) == -1) {
        perror("bind");
        exit(1);
    }
    if (listen(sock_fd1, 100) == -1) {
        perror("listen");
        exit(1);
    }
    if (listen(sock_fd2, 100) == -1) {
        perror("listen");
        exit(1);
    }

    poll_fd[0].fd = sock_fd1;
    poll_fd[0].events = POLLIN;
    poll_fd[1].fd = sock_fd2;
    poll_fd[1].events = POLLIN;
    while (1)
    {
        rval = poll(poll_fd, 2, -1);
        if (rval < 0)
        {
            if (errno == EINTR)
                continue;
            perror("poll");
            exit(1);
        }
        if (poll_fd[0].revents & POLLIN)
        {
            int new_client = accept(sock_fd1, NULL, 0);
            if (new_client != -1)
            {
                char *msg = "server1 say hello\n";
                send(new_client, msg, strlen(msg), 0);
                close(new_client);
            }
        }
        if (poll_fd[1].revents & POLLIN)
        {
            int new_client = accept(sock_fd2, NULL, 0);
            if (new_client != -1)
            {
                char *msg = "server2 say hello\n";
                send(new_client, msg, strlen(msg), 0);
                close(new_client);
            }
        }
    }

    return 0;
}

epoll

#include <sys/epoll.h>
int epoll_create(int size);
创建一个 epoll 实例,size 参数要大于 0,linux 内核大于 2.6.8,size 参数被忽略。
函数返回一个文件描述符。不再使用时,应当使用 close() 函数关闭返回的文件描述符。

int epoll_create1(int flags);
flags 为 0 时,其表现与 epoll_create 一样。
flags 可选的标志为 EPOLL_CLOEXEC,其效果与 open() 函数的 O_CLOEXEC 标志相似。
在创建 epoll 实例时原子性地指定标志,避免先创建,再指定标志这种分开操作,在多线程环境时,会有竞争的问题。


int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

typedef union epoll_data {
   void        *ptr;
   int          fd;
   uint32_t     u32;
   uint64_t     u64;
} epoll_data_t;

struct epoll_event {
   uint32_t     events;      /* Epoll events */
   epoll_data_t data;        /* User data variable */
};
           
epfd 为 epoll_create/epoll_create1 返回的文件描述符

op 为操作类型,可选值如下:
EPOLL_CTL_ADD
EPOLL_CTL_MOD
EPOLL_CTL_DEL

fd 为要关联的文件描述符

event->events 表示 event 类型,可选值如下:
EPOLLIN
EPOLLOUT
EPOLLRDHUP
EPOLLPRI
EPOLLERR
EPOLLHUP
EPOLLET
EPOLLONESHOT
EPOLLWAKEUP
EPOLLEXCLUSIVE

返回 0 表示成功,-1 表示失败,通过 errno 获取具体的失败信息。


int epoll_wait(int epfd, struct epoll_event *events,
              int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
              int maxevents, int timeout,
              const sigset_t *sigmask);
int epoll_pwait2(int epfd, struct epoll_event *events,
              int maxevents, const struct timespec *timeout,
              const sigset_t *sigmask);
              
int timeout 为毫秒,-1 为永远等待
timespec timeout 为纳秒,NULL 为永远等待

返回值大于 0 表示就绪状态的文件描述符的数量,0 表示超时,-1 表示失败,通过 errno 查看具体错误详情。

// 示例代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>

int main(int argc, char* argv[])
{
    struct sockaddr_in server_addr1, server_addr2;
    int sock_fd1, sock_fd2;
    int one = 1;
    int epoll_fd;
    struct epoll_event ev[2], events[100];
    int rval;
    int i;
    if ((sock_fd1 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("socket");
        exit(1);
    }
    if ((sock_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("socket");
        exit(1);
    }

    if (setsockopt(sock_fd1, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }
    if (setsockopt(sock_fd2, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }

    server_addr1.sin_family = AF_INET;
    server_addr1.sin_port = htons(5200);
    server_addr1.sin_addr.s_addr = INADDR_ANY;
    memset(server_addr1.sin_zero, 0, sizeof(server_addr1.sin_zero));

    server_addr2.sin_family = AF_INET;
    server_addr2.sin_port = htons(5300);
    server_addr2.sin_addr.s_addr = INADDR_ANY;
    memset(server_addr2.sin_zero, 0, sizeof(server_addr2.sin_zero));

    if (bind(sock_fd1, (struct sockaddr *)&server_addr1, sizeof(server_addr1)) == -1) {
        perror("bind");
        exit(1);
    }
    if (bind(sock_fd2, (struct sockaddr *)&server_addr2, sizeof(server_addr2)) == -1) {
        perror("bind");
        exit(1);
    }
    if (listen(sock_fd1, 100) == -1) {
        perror("listen");
        exit(1);
    }
    if (listen(sock_fd2, 100) == -1) {
        perror("listen");
        exit(1);
    }

    epoll_fd = epoll_create(100);
    if (epoll_fd == -1)
    {
        perror("epoll_create");
        exit(1);
    }

    ev[0].events = EPOLLIN;
    ev[0].data.fd = sock_fd1;
    ev[1].events = EPOLLIN;
    ev[1].data.fd = sock_fd2;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd1, &ev[0]) == -1 ||
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd2, &ev[1]) == -1)
    {
        perror("epoll_ctl");
        exit(1);
    }

    while (1)
    {
        rval = epoll_wait(epoll_fd, events, 100, -1);
        if (rval < 0)
        {
            if (errno == EINTR)
                continue;
            perror("epoll_wait");
            exit(1);
        }
        for(i = 0; i < rval; i++)
        {
            if (events[i].events & EPOLLIN)
            {
               int new_client = accept(events[i].data.fd, NULL, 0);
               if (new_client != -1)
               {
                    char msg[1024];
                    snprintf(msg, 1024, "server socket %d say hello\n", events[i].data.fd);
                    send(new_client, msg, strlen(msg), 0);
                    close(new_client);
               }
            }
        }
    }
    close(epoll_fd);
    return 0;
}

文章评论

0条评论