Linux IO 多路转接--Poll

Linux IO 多路转接--Poll

八月 01, 2025 次阅读

poll 函数

poll 是 Linux/Unix 系统中的一种 I/O 多路复用 机制,用于同时监控多个文件描述符(FD)的 可读、可写、错误等事件。相比 selectpoll 没有 FD 数量限制(select 默认限制为 1024),并且使用更灵活的结构体数组管理 FD。


poll 函数原型

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数说明

参数 类型 说明
fds struct pollfd * 结构体数组,每个元素描述一个 FD 及其关注的事件
nfds nfds_t 指定 fds 数组中 有效元素的数量(即监控的 FD 数量)
timeout int 超时时间(毫秒)
- -1:阻塞等待,直到有事件发生
- 0:立即返回(非阻塞轮询)
- >0:等待指定毫秒后返回

返回值

  • >0:返回 就绪的 FD 数量(即 revents 被设置的 FD 数量)。
  • 0:超时且无事件发生。
  • -1:出错,并设置 errno(如 EINTR 被信号中断)。

struct pollfd 结构体

poll 的核心是 struct pollfd 数组,每个结构体描述一个 FD 及其关注的事件。

结构体定义

struct pollfd {
    int   fd;         /* 文件描述符 */
    short events;     /* 要监控的事件(输入) */
    short revents;    /* 实际发生的事件(输出) */
};

字段说明

字段 类型 说明
fd int 文件描述符(如 socket、pipe 等)
- 如果 fd = -1poll 会忽略该结构体
events short 要监控的事件(用户设置)
- 使用 POLLINPOLLOUT 等宏组合
revents short 实际发生的事件(内核返回)
- 由 poll 填充,表示哪些事件已就绪

eventsrevents 的可用事件

eventsrevents 使用相同的宏定义,但:

  • events输入参数(告诉 poll 要监控哪些事件)。
  • revents输出参数poll 返回时填充哪些事件真正发生)。

他们都是通过位图管理的,每个事件对应一个位,多个事件通过位运算组合。

常用事件宏

说明
POLLIN 数据可读(如 TCP 接收缓冲区非空)
POLLOUT 数据可写(如 TCP 发送缓冲区未满)
POLLERR 发生错误(仅 revents 可用)
POLLHUP 连接挂断(如对端关闭)
POLLNVAL FD 未打开(fd 无效,仅 revents 可用)

示例

struct pollfd fds[1];
fds[0].fd = sockfd;
fds[0].events = POLLIN | POLLOUT;  // 监控可读和可写

int ret = poll(fds, 1, 1000);      // 等待 1 秒

if (fds[0].revents & POLLIN) {
    // sockfd 可读,调用 recv()
}
if (fds[0].revents & POLLOUT) {
    // sockfd 可写,调用 send()
}
if (fds[0].revents & POLLERR) {
    // 发生错误
}

poll 的工作流程

  1. 初始化 struct pollfd 数组,设置 fdevents
  2. 调用 poll,传入数组、FD 数量和超时时间。
  3. poll 阻塞,直到:
    • 有 FD 就绪。
    • 超时(timeout 毫秒)。
    • 被信号中断(返回 -1errno = EINTR)。
  4. 检查 revents,处理就绪的 FD。
  5. 循环调用 poll,继续监控。

poll 示例代码(监控多个 FD)

#include <stdio.h>
#include <poll.h>
#include <unistd.h>

int main() {
    struct pollfd fds[2];

    // 监控 stdin(文件描述符 0)
    fds[0].fd = STDIN_FILENO;
    fds[0].events = POLLIN;

    // 监控 stdout(文件描述符 1)
    fds[1].fd = STDOUT_FILENO;
    fds[1].events = POLLOUT;

    while (1) {
        int ret = poll(fds, 2, 3000); // 3 秒超时

        if (ret == -1) {
            perror("poll");
            break;
        } else if (ret == 0) {
            printf("Timeout!\n");
            continue;
        }

        // 检查 stdin 是否可读
        if (fds[0].revents & POLLIN) {
            printf("stdin is readable!\n");
            char buf[1024];
            read(STDIN_FILENO, buf, sizeof(buf));
        }

        // 检查 stdout 是否可写
        if (fds[1].revents & POLLOUT) {
            printf("stdout is writable!\n");
        }
    }

    return 0;
}

下面我们利用poll来实现一个简单的 TCP 多用户通信服务器程序

服务端实现

tcp.hpp

这里的 TCP 接口封装和之前的接口想同,没有影响的可以展开看看

poll_server.hpp

#ifndef _POLL_SERVER_HPP_
#define _POLL_SERVER_HPP_ 1

#include <iostream>
#include <poll.h>          // poll函数相关头文件
#include <sys/socket.h>    // socket相关操作头文件
#include "tcp.hpp"         // 自定义TCP工具函数头文件

// 服务器配置常量
constexpr const size_t max_fd_sz = 1 << 10;  // 最大监控文件描述符数量(1024)
const u_int16_t default_port = 8888;         // 默认服务端口号
const int default_fd = -1;                   // 无效文件描述符标识
const int none_event = 0;                    // 无事件标志

// 客户端信息结构体
struct client_inf {
    std::string client_addr;  // 客户端IP地址
    u_int16_t client_port;    // 客户端端口号
};

// 基于poll的TCP聊天服务器类
class PollServer {
public:
    // 构造函数,可指定端口号(默认8888)
    PollServer(u_int16_t port = default_port)
        : cur_sz(0),          // 初始化当前监控数量为0
          port_(port)         // 设置服务端口号
    {
    }

    // 初始化服务器
    void Init() {
        // 1. 创建监听套接字
        listen_sockfd = tcp::Socket();
        lg(Info, "listening sock create success, sockfd: %d", listen_sockfd);
        
        // 2. 绑定端口
        tcp::Bind(listen_sockfd, port_);
        lg(Info, "listening sock bind success");
        
        // 3. 开始监听连接
        tcp::Listen(listen_sockfd);

        // 4. 初始化poll监控数组
        event_fds[0].fd = listen_sockfd;  // 监听套接字放在数组首位
        event_fds[0].events = POLLIN;      // 监听读事件(新连接)
        ++cur_sz;                          // 监控数量增加
        
        // 初始化其余位置为无效状态
        for (int i = 1; i < max_fd_sz; ++i) {
            event_fds[i].fd = default_fd;    // 标记为无效fd
            event_fds[i].events = none_event; // 不监听任何事件
        }
    }

    // 启动服务器主循环
    void Start() {
        while (true) {
            // 调用poll监控所有文件描述符,超时时间3秒
            int n = poll(event_fds, cur_sz, 3000);
            
            if (n == -1) {  // poll出错
                lg(Error, "poll false, errno: %d, errstr: %s", errno, strerror(errno));
            } 
            else if (n == 0) {  // 超时,无事件发生
                lg(Info, "None client join and no client send message");
            }
            else {  // 有事件发生
                lg(Info, "Get a new link");
                Dispatcher();  // 处理事件
            }
        }
    }

private:
    // 事件分发处理函数
    void Dispatcher() {
        // 遍历所有监控的文件描述符
        for (int i = 0; i < cur_sz; ++i) {
            if (event_fds[i].fd == default_fd)  // 跳过无效fd
                continue;
                
            // 检查是否有读事件发生
            if (event_fds[i].revents & POLLIN) {
                if (i == 0) {    // 监听套接字有事件(新连接)
                    Accept();
                }
                else {           // 客户端套接字有事件(数据到达)
                    Recv(i);
                }
            }
        }
    }

    // 接受新客户端连接
    void Accept() {
        // 检查是否达到最大连接数
        if (cur_sz == max_fd_sz) {
            lg(Warning, "The client is full, the connection is refused");
            return;
        }
        
        // 接受新连接
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int client_sockfd = accept(listen_sockfd, (sockaddr*)&client, &len);
        if (client_sockfd == -1) {  // 接受连接失败
            lg(Error, "listening sock accept false, [%d]: %s", errno, strerror(errno));
            return;
        }
        
        // 获取客户端地址信息
        std::string client_addr;
        u_int16_t client_port;
        tcp::GetAddrAndPort(client, client_addr, client_port);
        lg(Info, "accept a new client [%s: %d]", client_addr.c_str(), client_port);
        
        // 在poll数组中寻找空闲位置存放新连接
        for (int i = 1; i <= cur_sz; ++i) {
            if (i == cur_sz)  // 需要扩容监控数组
                ++cur_sz;
            if (event_fds[i].fd == default_fd) {  // 找到空闲位置
                event_fds[i].fd = client_sockfd;   // 设置客户端fd
                event_fds[i].events = POLLIN;     // 监听读事件
                event_fds[i].revents = none_event; // 重置事件标志
                clients[i].client_addr = client_addr;    // 保存客户端地址
                clients[i].client_port = client_port;    // 保存客户端端口
                break;
            }
        }
    }

    // 接收客户端数据并广播
    void Recv(int client_pos) {
        int sockfd = event_fds[client_pos].fd;
        
        // 接收客户端数据
        ssize_t n = recv(sockfd, client_buffer, sizeof(client_buffer) - 1, 0);
        
        if (n == -1) {  // 接收出错
            lg(Error, "recv false, errno: %d, errstr: %s", errno, strerror(errno));
        }
        else if (n == 0) {  // 客户端关闭连接
            lg(Info, "client [%s: %d] quit, bye!", 
                clients[client_pos].client_addr.c_str(), 
                clients[client_pos].client_port);
            event_fds[client_pos].fd = default_fd;  // 标记为无效
        }
        else {  // 成功接收数据
            client_buffer[n] = 0;  // 添加字符串结束符
            
            // 广播给所有客户端(简单群聊功能)
            for (int i = 1; i < cur_sz; ++i) {
                if (event_fds[i].fd != default_fd) {  // 只发给有效客户端
                    if (send(event_fds[i].fd, client_buffer, n, 0) == -1) {
                        lg(Error, "send data to client [%s: %d] false", 
                            clients[i].client_addr.c_str(), 
                            clients[i].client_port);
                    }
                }
            }
        }
    }

private:
    size_t cur_sz;                   // 当前监控的文件描述符数量
    int listen_sockfd;               // 监听套接字文件描述符
    u_int16_t port_;                 // 服务端口号
    struct pollfd event_fds[max_fd_sz];  // poll监控的文件描述符数组
    struct client_inf clients[max_fd_sz]; // 客户端信息数组
    char client_buffer[1024];        // 数据接收缓冲区
};

#endif

代码整体结构

  1. 基础架构

    • 使用poll I/O多路复用技术实现TCP聊天服务器
    • 支持同时处理多个客户端连接
    • 实现简单的群聊功能(一个客户端发消息,所有客户端都能收到)
  2. 核心机制

    • 使用固定大小的数组管理所有客户端连接(max_fd_sz=1024)
    • 主循环通过poll监控所有活跃连接
    • 采用事件驱动模型,只有发生实际I/O时才进行处理
  3. 关键数据结构

    • pollfd数组:监控所有socket的文件描述符和事件状态
    • client_inf数组:存储每个客户端的地址信息
    • 环形缓冲区:用于临时存储接收到的消息
  4. 工作流程

    • 初始化监听socket并加入poll监控
    • 主循环中调用poll等待事件
    • 有新连接时accept并加入监控
    • 有数据到达时读取并广播给所有客户端
    • 客户端断开时清理资源

客户端代码

#include <iostream>
#include <cstring>
#include <unistd.h>
#include <thread>
#include "tcp.hpp"
using namespace std;

/**
 * TCP客户端实现 - 支持断线重连的简易聊天客户端
 * 功能:连接服务器,发送消息并接收服务器回显
 * 编译:g++ client.cpp -o client -lpthread
 * 使用:./client <server_ip> <server_port>
 */

// 带重试机制的连接函数
int Connect(int sockfd, const struct sockaddr_in &server) {
    int cnt = 5;  // 最大重试次数

    // 尝试连接服务器,失败时自动重试
    while (connect(sockfd, (struct sockaddr *)&server, sizeof(server)) == -1) {
        if (cnt == 0) {  // 重试次数耗尽
            printf("Sorry, I am unable to connect to the designated server\n");
            return -1;
        }
        --cnt;
        printf("Try to reconnect..., %d chances remaining\n", cnt);
        sleep(2);  // 间隔2秒重试
    }
    return 0;  // 连接成功
}

// 消息接收处理线程函数
void Handler(const int sockfd, const std::string server_ip, const u_int16_t server_port) {
    while (true) {
        char buffer[1024];
        // 接收服务器消息(不关心发送方信息)
        ssize_t m = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
        
        if (m > 0) {  // 收到有效消息
            buffer[m] = 0;  // 添加字符串结束符
            cout << "server echo# " << buffer << endl;
        }
        else if (m == 0) {  // 服务器关闭连接
            printf("server [%s: %d] quit\n", server_ip.c_str(), server_port);
            
            // 尝试重新连接
            int sockfd = tcp::Socket();
            sockaddr_in server_addr{};
            server_addr.sin_family = AF_INET;
            server_addr.sin_port = htons(server_port);
            inet_pton(AF_INET, server_ip.c_str(), &server_addr.sin_addr); 
            
            sleep(1); // 等待1秒后再试
            if(Connect(sockfd, server_addr) == 0) 
                continue;  // 重连成功则继续
                
            exit(1);  // 重连失败退出
        }
        else {  // 接收出错
            break;
        }
    }
}

int main(int argc, char *argv[]) {
    // 参数校验
    if (argc != 3) {
        cerr << "Usage: " << argv[0] << " server_ip server_port" << endl;
        return 1;
    }

    // 解析参数
    string server_ip = argv[1];
    uint16_t server_port = stoi(argv[2]);

    // 初始化服务器地址结构
    struct sockaddr_in server;
    bzero(&server, sizeof(server));
    server.sin_port = htons(server_port);      // 设置端口
    server.sin_family = AF_INET;               // IPv4协议
    server.sin_addr.s_addr = inet_addr(server_ip.c_str());  // IP地址转换

    // 创建套接字并连接服务器
    int sockfd = tcp::Socket();
    if (Connect(sockfd, server) == -1)
        return 1;

    // 启动接收线程
    thread receiver(Handler, sockfd, server_ip, server_port);

    // 主线程处理用户输入和发送
    while (true) {
        string inbuffer;
        usleep(100);  // 微小延迟避免CPU占用过高
        cout << "Please Enter# ";
        getline(cin, inbuffer);  // 获取用户输入
        
        // 发送消息到服务器
        ssize_t n = send(sockfd, inbuffer.c_str(), inbuffer.size(), 0);
        if (n == -1) {
            perror("send");
        }
    }

    receiver.join();  // 等待接收线程结束(实际不会执行到这里)
    return 0;
}

功能说明

  1. 核心功能

    • 实现了一个支持断线自动重连的TCP聊天客户端
    • 采用多线程架构:主线程处理用户输入,子线程接收服务器消息
    • 支持基本的命令行参数配置(服务器IP和端口)
  2. 关键特性

    • 自动重连机制:连接失败时自动重试5次(间隔2秒)
    • 断线恢复:检测到服务器断开后自动尝试重新连接
    • 简单交互界面:显示输入提示和服务器回显
  3. 工作流程

    • 启动时连接指定服务器
    • 成功连接后:
      • 主线程:循环读取用户输入并发送到服务器
      • 子线程:循环接收服务器消息并显示
    • 连接断开时:
      • 自动尝试重新建立连接
      • 重连失败则退出程序

代码测试

而后,我们将服务端和客户端启动后即可实现双方的通信了,服务端创建和调用实现如下:

#include <iostream>
#include <memory>

#include "poll_server.hpp"

using namespace std;

int main(int argc, char *argv[])
{
    int port;
    if(argc == 1)
    {
        port = 7777;
    }else if(argc == 2)
    {
        port = stoi(argv[1]);
    }else{
        cerr << "Usage: " << argv[0] << "(default port: 8080)" << endl << 
        "OR" << endl << argv[0] << " port" << endl;
        return 1;
    }
    unique_ptr<PollServer> ps(new PollServer(port));
    ps->Init();
    ps->Start();
    return 0;
}

然后,我们对代码进行测试:

# 客户端1
╭─ljx@VM-16-15-debian ~/linux_review/web_io/Poll
╰─➤  ./poll_client.o 127.0.0.1 7777
Please Enter# 123
server echo# 123Please Enter#
hello
Please Enter# server echo# hello

# 客户端2
╭─ljx@VM-16-15-debian ~/linux_review/web_io/Poll
╰─➤  ./poll_client.o 127.0.0.1 7777
Please Enter# server echo# 123
server echo# hello

# 客户端3
╭─ljx@VM-16-15-debian ~/linux_review/web_io/Poll
╰─➤  ./poll_client.o 127.0.0.1 7777
Please Enter# server echo# 123
server echo# hello

客户端1 的数据可以顺利发送给其他两个客户端