Linux IO 多路转接--Poll
八月 01, 2025
次阅读
poll 函数
poll 是 Linux/Unix 系统中的一种 I/O 多路复用 机制,用于同时监控多个文件描述符(FD)的 可读、可写、错误等事件。相比 select,poll 没有 FD 数量限制(select 默认限制为 1024),并且使用更灵活的结构体数组管理 FD。
poll 函数原型
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
fds |
struct pollfd * |
结构体数组,每个元素描述一个 FD 及其关注的事件 |
nfds |
nfds_t |
指定 fds 数组中 有效元素的数量(即监控的 FD 数量) |
timeout |
int |
超时时间(毫秒): - -1:阻塞等待,直到有事件发生- 0:立即返回(非阻塞轮询)- >0:等待指定毫秒后返回 |
返回值
>0:返回 就绪的 FD 数量(即revents被设置的 FD 数量)。0:超时且无事件发生。-1:出错,并设置errno(如EINTR被信号中断)。
struct pollfd 结构体
poll 的核心是 struct pollfd 数组,每个结构体描述一个 FD 及其关注的事件。
结构体定义
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 要监控的事件(输入) */
short revents; /* 实际发生的事件(输出) */
};
字段说明
| 字段 | 类型 | 说明 |
|---|---|---|
fd |
int |
文件描述符(如 socket、pipe 等) - 如果 fd = -1,poll 会忽略该结构体 |
events |
short |
要监控的事件(用户设置) - 使用 POLLIN、POLLOUT 等宏组合 |
revents |
short |
实际发生的事件(内核返回) - 由 poll 填充,表示哪些事件已就绪 |
events 和 revents 的可用事件
events 和 revents 使用相同的宏定义,但:
events是 输入参数(告诉poll要监控哪些事件)。revents是 输出参数(poll返回时填充哪些事件真正发生)。
他们都是通过位图管理的,每个事件对应一个位,多个事件通过位运算组合。
常用事件宏
| 宏 | 说明 |
|---|---|
POLLIN |
数据可读(如 TCP 接收缓冲区非空) |
POLLOUT |
数据可写(如 TCP 发送缓冲区未满) |
POLLERR |
发生错误(仅 revents 可用) |
POLLHUP |
连接挂断(如对端关闭) |
POLLNVAL |
FD 未打开(fd 无效,仅 revents 可用) |
示例
struct pollfd fds[1];
fds[0].fd = sockfd;
fds[0].events = POLLIN | POLLOUT; // 监控可读和可写
int ret = poll(fds, 1, 1000); // 等待 1 秒
if (fds[0].revents & POLLIN) {
// sockfd 可读,调用 recv()
}
if (fds[0].revents & POLLOUT) {
// sockfd 可写,调用 send()
}
if (fds[0].revents & POLLERR) {
// 发生错误
}
poll 的工作流程
- 初始化
struct pollfd数组,设置fd和events。 - 调用
poll,传入数组、FD 数量和超时时间。 poll阻塞,直到:- 有 FD 就绪。
- 超时(
timeout毫秒)。 - 被信号中断(返回
-1,errno = EINTR)。
- 检查
revents,处理就绪的 FD。 - 循环调用
poll,继续监控。
poll 示例代码(监控多个 FD)
#include <stdio.h>
#include <poll.h>
#include <unistd.h>
int main() {
struct pollfd fds[2];
// 监控 stdin(文件描述符 0)
fds[0].fd = STDIN_FILENO;
fds[0].events = POLLIN;
// 监控 stdout(文件描述符 1)
fds[1].fd = STDOUT_FILENO;
fds[1].events = POLLOUT;
while (1) {
int ret = poll(fds, 2, 3000); // 3 秒超时
if (ret == -1) {
perror("poll");
break;
} else if (ret == 0) {
printf("Timeout!\n");
continue;
}
// 检查 stdin 是否可读
if (fds[0].revents & POLLIN) {
printf("stdin is readable!\n");
char buf[1024];
read(STDIN_FILENO, buf, sizeof(buf));
}
// 检查 stdout 是否可写
if (fds[1].revents & POLLOUT) {
printf("stdout is writable!\n");
}
}
return 0;
}
下面我们利用poll来实现一个简单的 TCP 多用户通信服务器程序
服务端实现
tcp.hpp
这里的 TCP 接口封装和之前的接口想同,没有影响的可以展开看看
// tcp.hpp
#ifndef _TCP_HPP_
#define _TCP_HPP_ 1
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h> // sockaddr_in结构体定义
#include <arpa/inet.h> // 网络地址转换函数
#include <cstring> // bzero函数
#include "log.hpp" // 自定义日志头文件
// 错误码枚举定义
enum
{
socket_error = 1, // socket创建失败
bind_error, // bind失败
listen_error, // listen失败
connect_error, // connect失败
accept_error, // accept失败
};
// 线程本地存储的地址缓冲区,用于IP地址转换
inline thread_local char addr_buffer[1024];
// TCP网络操作命名空间
namespace tcp
{
// 创建TCP socket
// 返回值: 成功返回socket文件描述符,失败退出程序
int Socket()
{
// 创建IPv4 TCP socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
{
perror("socket");
exit(socket_error); // 创建失败退出程序
}
return sockfd;
}
// 绑定socket到指定端口
// 参数: sockfd - socket文件描述符
// port - 要绑定的端口号
void Bind(int sockfd, int port)
{
struct sockaddr_in local; // IPv4地址结构
bzero(&local, sizeof(local)); // 清空结构体
// 设置地址族、IP地址和端口
local.sin_family = AF_INET; // IPv4
local.sin_addr.s_addr = INADDR_ANY; // 监听所有网络接口
local.sin_port = htons(port); // 主机字节序转网络字节序
// 绑定socket
int n = bind(sockfd, (struct sockaddr *)(&local), sizeof(local));
if (n != 0)
{
perror("bind");
exit(bind_error); // 绑定失败退出程序
}
}
// 从sockaddr_in结构体中提取IP和端口
// 参数: addr_in - 网络地址结构体
// ip - 输出参数,存储IP地址字符串
// port - 输出参数,存储端口号
void GetAddrAndPort(struct sockaddr_in &addr_in, std::string &ip, uint16_t &port)
{
port = ntohs(addr_in.sin_port); // 网络字节序转主机字节序
// 将IP地址转换为点分十进制字符串
inet_ntop(AF_INET, &addr_in.sin_addr, addr_buffer, sizeof(addr_buffer) - 1);
ip = addr_buffer; // 存储到输出参数
}
// 开始监听socket连接
// 参数: listen_sock - 监听socket
// backlog - 最大挂起连接数,默认为10
void Listen(int listen_sock, int backlog = 10)
{
int n = listen(listen_sock, backlog);
if (n == -1)
{
perror("listen");
exit(listen_error); // 监听失败退出程序
}
}
// 连接到服务器
// 参数: sockfd - 客户端socket
// server_ip - 服务器IP地址
// server_port - 服务器端口号
void Connect(int sockfd, const std::string &server_ip, const u_int16_t &server_port)
{
struct sockaddr_in server; // 服务器地址结构
bzero(&server, sizeof(server)); // 清空结构体
// 设置服务器地址信息
server.sin_port = htons(server_port); // 端口号转网络字节序
server.sin_family = AF_INET; // IPv4
server.sin_addr.s_addr = inet_addr(server_ip.c_str()); // IP地址转换
// 发起连接
int n = connect(sockfd, (struct sockaddr *)&server, sizeof(server));
if (n == -1)
{
perror("connect");
exit(connect_error); // 连接失败退出程序
}
}
// 接受客户端连接
// 参数: listen_sockfd - 监听socket
// client_addr - 输出参数,存储客户端地址信息
// 返回值: 成功返回连接socket文件描述符,失败退出程序
int Accept(int listen_sockfd, sockaddr_in &client_addr)
{
socklen_t len = sizeof(client_addr); // 地址结构长度
// 接受连接
int n = accept(listen_sockfd, (struct sockaddr *)&client_addr, &len);
if (n == -1)
{
perror("accept");
exit(accept_error); // 接受失败退出程序
}
return n; // 返回新连接的socket描述符
}
};
#endif
poll_server.hpp
#ifndef _POLL_SERVER_HPP_
#define _POLL_SERVER_HPP_ 1
#include <iostream>
#include <poll.h> // poll函数相关头文件
#include <sys/socket.h> // socket相关操作头文件
#include "tcp.hpp" // 自定义TCP工具函数头文件
// 服务器配置常量
constexpr const size_t max_fd_sz = 1 << 10; // 最大监控文件描述符数量(1024)
const u_int16_t default_port = 8888; // 默认服务端口号
const int default_fd = -1; // 无效文件描述符标识
const int none_event = 0; // 无事件标志
// 客户端信息结构体
struct client_inf {
std::string client_addr; // 客户端IP地址
u_int16_t client_port; // 客户端端口号
};
// 基于poll的TCP聊天服务器类
class PollServer {
public:
// 构造函数,可指定端口号(默认8888)
PollServer(u_int16_t port = default_port)
: cur_sz(0), // 初始化当前监控数量为0
port_(port) // 设置服务端口号
{
}
// 初始化服务器
void Init() {
// 1. 创建监听套接字
listen_sockfd = tcp::Socket();
lg(Info, "listening sock create success, sockfd: %d", listen_sockfd);
// 2. 绑定端口
tcp::Bind(listen_sockfd, port_);
lg(Info, "listening sock bind success");
// 3. 开始监听连接
tcp::Listen(listen_sockfd);
// 4. 初始化poll监控数组
event_fds[0].fd = listen_sockfd; // 监听套接字放在数组首位
event_fds[0].events = POLLIN; // 监听读事件(新连接)
++cur_sz; // 监控数量增加
// 初始化其余位置为无效状态
for (int i = 1; i < max_fd_sz; ++i) {
event_fds[i].fd = default_fd; // 标记为无效fd
event_fds[i].events = none_event; // 不监听任何事件
}
}
// 启动服务器主循环
void Start() {
while (true) {
// 调用poll监控所有文件描述符,超时时间3秒
int n = poll(event_fds, cur_sz, 3000);
if (n == -1) { // poll出错
lg(Error, "poll false, errno: %d, errstr: %s", errno, strerror(errno));
}
else if (n == 0) { // 超时,无事件发生
lg(Info, "None client join and no client send message");
}
else { // 有事件发生
lg(Info, "Get a new link");
Dispatcher(); // 处理事件
}
}
}
private:
// 事件分发处理函数
void Dispatcher() {
// 遍历所有监控的文件描述符
for (int i = 0; i < cur_sz; ++i) {
if (event_fds[i].fd == default_fd) // 跳过无效fd
continue;
// 检查是否有读事件发生
if (event_fds[i].revents & POLLIN) {
if (i == 0) { // 监听套接字有事件(新连接)
Accept();
}
else { // 客户端套接字有事件(数据到达)
Recv(i);
}
}
}
}
// 接受新客户端连接
void Accept() {
// 检查是否达到最大连接数
if (cur_sz == max_fd_sz) {
lg(Warning, "The client is full, the connection is refused");
return;
}
// 接受新连接
struct sockaddr_in client;
socklen_t len = sizeof(client);
int client_sockfd = accept(listen_sockfd, (sockaddr*)&client, &len);
if (client_sockfd == -1) { // 接受连接失败
lg(Error, "listening sock accept false, [%d]: %s", errno, strerror(errno));
return;
}
// 获取客户端地址信息
std::string client_addr;
u_int16_t client_port;
tcp::GetAddrAndPort(client, client_addr, client_port);
lg(Info, "accept a new client [%s: %d]", client_addr.c_str(), client_port);
// 在poll数组中寻找空闲位置存放新连接
for (int i = 1; i <= cur_sz; ++i) {
if (i == cur_sz) // 需要扩容监控数组
++cur_sz;
if (event_fds[i].fd == default_fd) { // 找到空闲位置
event_fds[i].fd = client_sockfd; // 设置客户端fd
event_fds[i].events = POLLIN; // 监听读事件
event_fds[i].revents = none_event; // 重置事件标志
clients[i].client_addr = client_addr; // 保存客户端地址
clients[i].client_port = client_port; // 保存客户端端口
break;
}
}
}
// 接收客户端数据并广播
void Recv(int client_pos) {
int sockfd = event_fds[client_pos].fd;
// 接收客户端数据
ssize_t n = recv(sockfd, client_buffer, sizeof(client_buffer) - 1, 0);
if (n == -1) { // 接收出错
lg(Error, "recv false, errno: %d, errstr: %s", errno, strerror(errno));
}
else if (n == 0) { // 客户端关闭连接
lg(Info, "client [%s: %d] quit, bye!",
clients[client_pos].client_addr.c_str(),
clients[client_pos].client_port);
event_fds[client_pos].fd = default_fd; // 标记为无效
}
else { // 成功接收数据
client_buffer[n] = 0; // 添加字符串结束符
// 广播给所有客户端(简单群聊功能)
for (int i = 1; i < cur_sz; ++i) {
if (event_fds[i].fd != default_fd) { // 只发给有效客户端
if (send(event_fds[i].fd, client_buffer, n, 0) == -1) {
lg(Error, "send data to client [%s: %d] false",
clients[i].client_addr.c_str(),
clients[i].client_port);
}
}
}
}
}
private:
size_t cur_sz; // 当前监控的文件描述符数量
int listen_sockfd; // 监听套接字文件描述符
u_int16_t port_; // 服务端口号
struct pollfd event_fds[max_fd_sz]; // poll监控的文件描述符数组
struct client_inf clients[max_fd_sz]; // 客户端信息数组
char client_buffer[1024]; // 数据接收缓冲区
};
#endif
代码整体结构
基础架构:
- 使用poll I/O多路复用技术实现TCP聊天服务器
- 支持同时处理多个客户端连接
- 实现简单的群聊功能(一个客户端发消息,所有客户端都能收到)
核心机制:
- 使用固定大小的数组管理所有客户端连接(max_fd_sz=1024)
- 主循环通过poll监控所有活跃连接
- 采用事件驱动模型,只有发生实际I/O时才进行处理
关键数据结构:
pollfd数组:监控所有socket的文件描述符和事件状态client_inf数组:存储每个客户端的地址信息- 环形缓冲区:用于临时存储接收到的消息
工作流程:
- 初始化监听socket并加入poll监控
- 主循环中调用poll等待事件
- 有新连接时accept并加入监控
- 有数据到达时读取并广播给所有客户端
- 客户端断开时清理资源
客户端代码
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <thread>
#include "tcp.hpp"
using namespace std;
/**
* TCP客户端实现 - 支持断线重连的简易聊天客户端
* 功能:连接服务器,发送消息并接收服务器回显
* 编译:g++ client.cpp -o client -lpthread
* 使用:./client <server_ip> <server_port>
*/
// 带重试机制的连接函数
int Connect(int sockfd, const struct sockaddr_in &server) {
int cnt = 5; // 最大重试次数
// 尝试连接服务器,失败时自动重试
while (connect(sockfd, (struct sockaddr *)&server, sizeof(server)) == -1) {
if (cnt == 0) { // 重试次数耗尽
printf("Sorry, I am unable to connect to the designated server\n");
return -1;
}
--cnt;
printf("Try to reconnect..., %d chances remaining\n", cnt);
sleep(2); // 间隔2秒重试
}
return 0; // 连接成功
}
// 消息接收处理线程函数
void Handler(const int sockfd, const std::string server_ip, const u_int16_t server_port) {
while (true) {
char buffer[1024];
// 接收服务器消息(不关心发送方信息)
ssize_t m = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (m > 0) { // 收到有效消息
buffer[m] = 0; // 添加字符串结束符
cout << "server echo# " << buffer << endl;
}
else if (m == 0) { // 服务器关闭连接
printf("server [%s: %d] quit\n", server_ip.c_str(), server_port);
// 尝试重新连接
int sockfd = tcp::Socket();
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
inet_pton(AF_INET, server_ip.c_str(), &server_addr.sin_addr);
sleep(1); // 等待1秒后再试
if(Connect(sockfd, server_addr) == 0)
continue; // 重连成功则继续
exit(1); // 重连失败退出
}
else { // 接收出错
break;
}
}
}
int main(int argc, char *argv[]) {
// 参数校验
if (argc != 3) {
cerr << "Usage: " << argv[0] << " server_ip server_port" << endl;
return 1;
}
// 解析参数
string server_ip = argv[1];
uint16_t server_port = stoi(argv[2]);
// 初始化服务器地址结构
struct sockaddr_in server;
bzero(&server, sizeof(server));
server.sin_port = htons(server_port); // 设置端口
server.sin_family = AF_INET; // IPv4协议
server.sin_addr.s_addr = inet_addr(server_ip.c_str()); // IP地址转换
// 创建套接字并连接服务器
int sockfd = tcp::Socket();
if (Connect(sockfd, server) == -1)
return 1;
// 启动接收线程
thread receiver(Handler, sockfd, server_ip, server_port);
// 主线程处理用户输入和发送
while (true) {
string inbuffer;
usleep(100); // 微小延迟避免CPU占用过高
cout << "Please Enter# ";
getline(cin, inbuffer); // 获取用户输入
// 发送消息到服务器
ssize_t n = send(sockfd, inbuffer.c_str(), inbuffer.size(), 0);
if (n == -1) {
perror("send");
}
}
receiver.join(); // 等待接收线程结束(实际不会执行到这里)
return 0;
}
功能说明
核心功能:
- 实现了一个支持断线自动重连的TCP聊天客户端
- 采用多线程架构:主线程处理用户输入,子线程接收服务器消息
- 支持基本的命令行参数配置(服务器IP和端口)
关键特性:
- 自动重连机制:连接失败时自动重试5次(间隔2秒)
- 断线恢复:检测到服务器断开后自动尝试重新连接
- 简单交互界面:显示输入提示和服务器回显
工作流程:
- 启动时连接指定服务器
- 成功连接后:
- 主线程:循环读取用户输入并发送到服务器
- 子线程:循环接收服务器消息并显示
- 连接断开时:
- 自动尝试重新建立连接
- 重连失败则退出程序
代码测试
而后,我们将服务端和客户端启动后即可实现双方的通信了,服务端创建和调用实现如下:
#include <iostream>
#include <memory>
#include "poll_server.hpp"
using namespace std;
int main(int argc, char *argv[])
{
int port;
if(argc == 1)
{
port = 7777;
}else if(argc == 2)
{
port = stoi(argv[1]);
}else{
cerr << "Usage: " << argv[0] << "(default port: 8080)" << endl <<
"OR" << endl << argv[0] << " port" << endl;
return 1;
}
unique_ptr<PollServer> ps(new PollServer(port));
ps->Init();
ps->Start();
return 0;
}
然后,我们对代码进行测试:
# 客户端1
╭─ljx@VM-16-15-debian ~/linux_review/web_io/Poll
╰─➤ ./poll_client.o 127.0.0.1 7777
Please Enter# 123
server echo# 123Please Enter#
hello
Please Enter# server echo# hello
# 客户端2
╭─ljx@VM-16-15-debian ~/linux_review/web_io/Poll
╰─➤ ./poll_client.o 127.0.0.1 7777
Please Enter# server echo# 123
server echo# hello
# 客户端3
╭─ljx@VM-16-15-debian ~/linux_review/web_io/Poll
╰─➤ ./poll_client.o 127.0.0.1 7777
Please Enter# server echo# 123
server echo# hello
客户端1 的数据可以顺利发送给其他两个客户端
查看评论