Linux 基于 Epoll 的主从 Reactor 多线程模型
该项目的源代码地址:GitHub
什么是 Reactor 模型
Reactor 模型 是一种基于事件驱动的高性能网络编程模型,广泛应用于高并发场景中。其核心思想是通过I/O多路复用和事件分发机制,高效地处理网络请求,避免线程阻塞和资源浪费。
核心概念
Reactor 模型 的核心是事件驱动机制,通过监听 I/O 事件(如连接建立、数据读写等),将事件分发给相应的处理器(Handler)进行处理。它主要由以下两个角色组成:
1.Reactor(反应器):负责监听和分发事件。它通过 I/O 多路复用(如 select、poll 或 epoll)监听多个连接的事件,并将事件分发给对应的处理器。
2.Handler(处理器):负责具体的事件处理逻辑,如数据读取、业务处理和响应发送。
模型分类
Reactor 模型 根据线程和资源的分配方式,分为以下三种实现方式:
单 Reactor 单线程模型: Reactor 和所有 Handler 都运行在同一个线程中。 优点:模型简单,无需处理多线程同步问题。 缺点:性能有限,无法充分利用多核 CPU,且单线程阻塞会导致整个系统不可用。
单 Reactor 多线程模型: Reactor 负责事件监听和分发,具体的业务处理交由线程池完成。 优点:充分利用多核 CPU,提升并发处理能力。 缺点:Reactor 仍是单线程运行,高并发场景下可能成为瓶颈。
主从 Reactor 多线程模型: 主 Reactor 负责监听新连接并将其分配给从 Reactor。 从 Reactor 负责具体的 I/O 事件处理,并将任务交给线程池完成。 优点:主从分离,职责明确,适合高并发场景,充分利用多核资源。
工作流程
以主从 Reactor 多线程模型为例,其工作流程如下:
1.主 Reactor 监听新连接事件,通过 Acceptor 接收连接。
2.新连接被分配给从 Reactor,从 Reactor 负责监听该连接的 I/O 事件。
3.当有 I/O 事件发生时,从 Reactor 调用对应的 Handler 进行处理。
4.Handler 读取数据后,将任务交给线程池处理业务逻辑。
5.线程池完成业务处理后,将结果返回给 Handler,Handler 再通过网络发送响应。
基于 Epoll 的主从 Reactor 多线程模型
Epoll 是 Linux 下的一种 I/O 多路复用机制,它提供了高效的事件通知机制。基于 Epoll 的主从 Reactor 多线程模型,可以充分利用多核 CPU,实现高并发处理。
核心组件
1.主 Reactor:监听新连接事件,通过 Acceptor 接收连接,并将其分配给从 Reactor。
2.从 Reactor:监听已连接的 I/O 事件,调用 Handler 进行处理。
3.Handler:负责具体的事件处理逻辑,如数据读取、业务处理和响应发送。
4.TimerManager:链接管理器,根据用户的活跃状态决定是否关闭链接。
而我们的主从 Reactor 处理的是简单的计算器任务,因此暂不需将任务交给线程池处理业务逻辑。
工作流程
1.主 Reactor 监听新连接事件,通过 Acceptor 接收连接。
2.新连接被分配给从 Reactor,从 Reactor 负责监听该连接的 I/O 事件。
3.当有 I/O 事件发生时,从 Reactor 调用对应的 Handler 进行处理。
4.Handler 读取数据后,进行业务逻辑处理,并通过网络发送响应。
注意事项
1.主 Reactor 和从 Reactor 应该运行在不同的线程中,以充分利用多核 CPU。
2.从 Reactor 应该使用非阻塞 I/O,因为 Epoll 使用的是边缘触发模式,我们需要对数据进行完整读取。
3.一般在读取到用户的数据后处理完直接将数据发送给用户,但是若发送给用户后用户缓冲区仍然有数据,说明用户未读取完,我们需要对用户的写事件 EPOLLOUT 进行监听,直到用户读取完数据。
代码实现
Connection.hpp 连接类实现解析
1. 头文件保护与基本包含
#ifndef _CONNECTION_HPP_
#define _CONNECTION_HPP_ 1
// 防止头文件重复包含的标准保护措施
#include <iostream>
#include <memory> // 智能指针支持
#include <functional> // 函数对象支持
#include "common.hpp" // 项目通用头文件
2. 前向声明与类型定义
class Connection;
// Connection类的前向声明,用于func_t类型定义
using func_t = std::function<void(std::weak_ptr<Connection>)>;
// 定义回调函数类型:
// 参数为Connection的弱指针,无返回值
// 使用weak_ptr避免循环引用
class EventLoop;
// EventLoop类的前向声明
3. Connection类定义
class Connection{
public:
// 构造函数:初始化socket描述符,默认不关注写事件
Connection(int sock)
:sock_(sock),
write_care_(false){}
// 获取socket文件描述符
int Sockfd(){ return sock_; }
// 追加数据到输入缓冲区
void AppendInBuffer(const std::string &info)
{
inbuffer_ += info;
}
// 追加数据到输出缓冲区
void AppendOutBuffer(const std::string &info)
{
outbuffer_ += info;
}
// 获取输入缓冲区引用
std::string &Inbuffer()
{
return inbuffer_;
}
// 获取输出缓冲区引用
std::string &OutBuffer()
{
return outbuffer_;
}
4. 成员变量详解
private:
int sock_; // 套接字文件描述符
std::string inbuffer_; // 输入数据缓冲区
std::string outbuffer_; // 输出数据缓冲区
public:
// 所属EventLoop的弱引用(避免循环引用)
std::weak_ptr<EventLoop> el;
// 三个核心回调函数:
func_t recv_cb; // 读事件回调
func_t send_cb; // 写事件回调
func_t except_cb; // 异常事件回调
// 客户端信息
std::string ip_; // 客户端IP地址
uint16_t port_; // 客户端端口号
// 写事件关注标志
// true表示需要监听EPOLLOUT事件
bool write_care_;
};
5. 设计要点说明
5.1 缓冲区管理
- 使用
std::string作为缓冲区容器 - 提供
Append方法追加数据 - 返回引用避免不必要的拷贝
5.2 回调机制
- 三个关键回调函数通过
std::function封装 - 使用
weak_ptr传递自身引用,防止循环引用 - 回调由EventLoop事件循环触发
5.3 资源管理
- 原始socket描述符由RAII管理
weak_ptr关联EventLoop避免内存泄漏- 写事件标志实现动态EPOLLOUT注册
Epoll.hpp 封装类实现解析
#ifndef _EPOLL_HPP_
#define _EPOLL_HPP_ 1
#include <iostream>
#include <sys/epoll.h> // epoll系统调用头文件
#include "nocopy.hpp" // 禁止拷贝的基类
#include "log.hpp" // 日志系统头文件
// 默认epoll_wait超时时间(毫秒)
inline const int default_time_out = 3000;
// 错误码枚举
enum {
epoll_create_error = 1, // epoll创建失败错误码
};
/**
* @brief Epoll封装类,继承nocopy禁止拷贝构造
*/
class Epoll: public nocopy
{
public:
/**
* @brief 构造函数:创建epoll实例
*/
Epoll()
:timeout(default_time_out) // 初始化超时时间
{
epfd = epoll_create1(0); // 创建epoll实例
if(epfd == -1)
{
// 记录创建失败日志(线程ID、错误码、错误信息)
lg(Error, "thread-%d, epoll_create false, errno: %d, errstr: %s",
pthread_self(), errno, strerror(errno));
exit(epoll_create_error); // 创建失败直接退出
}
// 记录创建成功日志
lg(Info, "thread-%d, epoll create success, epoll fd: %d", pthread_self(), epfd);
}
/**
* @brief 等待epoll事件
* @param events 输出参数,用于接收事件数组
* @param num 最大事件数量
* @return 就绪的事件数量,-1表示错误
*/
int EpollWait(struct epoll_event events[], int num)
{
int n = epoll_wait(epfd, events, num, timeout);
if(n == -1){
lg(Error, "epoll_wait false, errno: %d, errstr: %s", errno, strerror(errno));
}
return n;
}
/**
* @brief 控制epoll监控的文件描述符
* @param op 操作类型 (EPOLL_CTL_ADD/MOD/DEL)
* @param fd 要操作的文件描述符
* @param event 要监控的事件标志
*/
void EpollCtl(int op, int fd, uint32_t event)
{
if(op == EPOLL_CTL_DEL){
// 删除操作不需要event参数
if(epoll_ctl(epfd, op, fd, nullptr) == -1){
lg(Error, "epoll control false, errno: %d, errstr: %s",
errno, strerror(errno));
}
}else{
struct epoll_event ev;
ev.data.fd = fd; // 关联文件描述符
ev.events = event; // 设置监听事件
if(epoll_ctl(epfd, op, fd, &ev)){
lg(Error, "epoll control false, errno: %d, errstr: %s",
errno, strerror(errno));
}
}
}
/**
* @brief 析构函数:关闭epoll文件描述符
*/
~Epoll()
{
close(epfd); // 确保资源释放
}
private:
int epfd; // epoll文件描述符
int timeout; // epoll_wait超时时间(毫秒)
};
#endif
设计说明
资源管理:
- 遵循RAII原则,构造函数创建epoll实例,析构函数自动释放
- 继承
nocopy禁止拷贝构造和赋值,避免文件描述符重复关闭
错误处理:
- 所有epoll系统调用都有错误日志记录
- 创建失败直接退出程序(生产环境可改为异常)
线程安全:
- 日志中包含线程ID,便于多线程调试
- 类本身无状态依赖,可在多线程环境使用不同实例
接口设计:
EpollWait():封装epoll_wait,返回就绪事件数EpollCtl():统一处理三种控制操作,自动处理DEL特殊情况
该接口封装在前面章节已经讲解过,主要用于管理事件监听和处理。
定时器管理系统设计解析
Timer定时器结构设计
#ifndef _TIMER_HPP_
#define _TIMER_HPP_ 1
#include <queue>
#include <vector>
#include <functional>
#include <memory>
#include <ctime>
#include <iostream>
#include <unordered_map>
#include "connection.hpp"
// 前向声明避免循环依赖
class Connection;
/**
* @brief 定时器节点结构体
* @note 存储连接的生命周期状态信息
*/
struct Timer
{
// 构造函数初始化连接状态
Timer(int expired_time_, int cnt_, std::shared_ptr<Connection> connect_)
:expired_time(expired_time_), // 绝对过期时间戳
cnt(cnt_), // 连续活跃次数计数器
connect(connect_){} // 管理的连接对象
int expired_time; // Unix时间戳格式的过期时间
int cnt; // 累计活跃次数(用于奖励机制)
std::shared_ptr<Connection> connect; // 关联的连接智能指针
};
/**
* @brief 定时器比较仿函数
* @note 用于构建最小堆(最早过期时间在堆顶)
*/
struct time_cmp
{
bool operator()(const std::shared_ptr<Timer> t1, const std::shared_ptr<Timer> t2)
{
// 注意:通过大于号实现最小堆
return t1->expired_time > t2->expired_time;
}
};
#endif
TimerManager定时器管理器设计
#ifndef _TIMER_MANAGER_HPP_
#define _TIMER_MANAGER_HPP_ 1
#include "timer.hpp"
// 默认配置常量
inline const int default_alive_gap = 10; // 默认存活周期(秒)
inline const int default_alive_cnt = 5; // 重置阈值(次)
/**
* @brief 定时器管理核心类
* @note 采用最小堆+哈希表双结构实现高效管理
*/
class TimerManager
{
public:
// 初始化存活周期和计数阈值
TimerManager(int gap = default_alive_gap, int cnt = default_alive_cnt)
: alive_gap(gap),
alive_cnt(cnt) {}
/**
* @brief 添加新连接定时器
* @param connect 要管理的连接对象
*/
void Push(std::shared_ptr<Connection> connect)
{
// 创建新定时器(当前时间+存活周期)
std::shared_ptr<Timer> timer(new Timer(std::time(nullptr) + alive_gap, 0, connect));
timers.emplace(timer); // 插入最小堆
timer_map[connect->Sockfd()] = timer; // 建立socket到定时器的映射
}
/**
* @brief 检查堆顶是否过期
* @return true表示存在过期连接需要处理
*/
bool IsTopExpired()
{
while (!timers.empty() && timers.top()->expired_time <= std::time(nullptr))
{
int sockfd = (timers.top()->connect)->Sockfd();
// 处理惰性删除
if(timer_map.find(sockfd) == timer_map.end()) {
timers.pop();
continue;
}
if(timers.top()->cnt != timer_map[sockfd]->cnt || timers.top()->expired_time != timer_map[sockfd]->expired_time)
{
timers.pop();
timers.emplace(timer_map[sockfd]);
}
else if (timers.top()->cnt >= 5)
{
auto timer = timers.top();
timer->expired_time = std::time(nullptr) + alive_gap;
timer->cnt = 0;
timers.emplace(timer);
timer_map[sockfd] = timer;
}
else
{
return true;
}
}
return false;
}
/// 惰性删除元素
void LazyDelete(int sockfd){
timer_map.erase(sockfd);
}
/// 获取堆顶定时器(不弹出)
std::shared_ptr<Timer> GetTop()
{
return timers.top();
}
/**
* @brief 更新连接活跃时间
* @param sockfd 要更新的socket描述符
*/
void UpdateTime(int sockfd)
{
// 检测链接是否已被删除
if(timer_map.find(sockfd) == timer_map.end()) return;
if(timers.empty()) return;
// 创建新的 Timer(避免共享指针副作用)
auto new_timer = std::make_shared<Timer>(
std::time(nullptr) + alive_gap,
timer_map[sockfd]->cnt + 1,
timer_map[sockfd]->connect
);
// 更新 timer_map
timer_map[sockfd] = new_timer;
timer_map[sockfd]->cnt++; // 增加活跃计数
timer_map[sockfd]->expired_time = time(nullptr) + alive_gap; // 重置过期时间
}
private:
// 数据结构
std::priority_queue<std::shared_ptr<Timer>,
std::vector<std::shared_ptr<Timer>>,
time_cmp> timers; // 最小堆
std::unordered_map<int, std::shared_ptr<Timer>> timer_map; // 哈希表
// 配置参数
int alive_gap; // 存活周期(秒)
int alive_cnt; // 重置阈值(次)
};
#endif
惰性处理机制详解
延迟验证设计:
- 只在检查堆顶时处理过期(
IsTopExpired) - 非堆顶元素即使过期也暂不处理
- 降低频繁检查带来的CPU开销
- 只在检查堆顶时处理过期(
状态同步策略:
- 哈希表存储最新状态
- 堆中元素可能滞后(通过
cnt/expired_time以及哈希表中是否存有该元素来比较检测) - 发现状态不一致时重新入堆
奖励机制实现:
if (timers.top()->cnt >= alive_cnt) { //...重置计数器 timer->expired_time = std::time(nullptr) + alive_gap; timer->cnt = 0; }- 活跃连接获得生命周期重置
- 避免健康连接被误清理
堆处理精妙设计
双数据结构配合:
- 最小堆:快速获取最早过期连接(O(1))
- 哈希表:快速查找特定连接(O(1))
堆更新优化:
- 不直接修改堆元素,而是重新插入
- 通过状态比较发现过期版本
- 摊还时间复杂度为O(logN)
批量处理能力:
while (!timers.empty() && timers.top()->expired_time <= curr_time) { // 批量处理所有过期连接 }
- 单次操作可处理多个过期连接
该设计完美平衡了精度和效率,特别适合高频网络通信场景,其中:
- 健康连接通过奖励机制长期保持
- 僵尸连接被快速识别清理
- 系统开销随活跃连接数线性增长
监听器(Listener)类实现解析
类功能概述
Listener类封装了TCP服务端的监听逻辑,主要职责包括:
- 初始化监听socket(非阻塞模式)
- 持续接受客户端连接
- 将新连接信息通过环形队列传递给I/O线程
- 通过Connection弱引用实现与EventLoop的低耦合交互
代码详细注释
#ifndef _LISTENER_HPP_
#define _LISTENER_HPP_ 1
#include <iostream>
#include <memory>
#include <thread>
#include "tcp.hpp" // TCP socket封装
#include "event_loop.hpp" // 事件循环
#include "log.hpp" // 日志系统
inline const uint16_t default_listen_port = 6349; // 默认监听端口
/**
* @brief TCP监听器类
* @note 通过环形队列实现与I/O线程的解耦
*/
class Listener
{
public:
/**
* @brief 构造函数
* @param port 监听端口号
*/
Listener(uint16_t port = default_listen_port)
: port_(port),
sock_(new Sock()) // 创建TCP socket封装对象
{
}
/**
* @brief 初始化监听socket
*/
void Init()
{
sock_->Socket(); // 创建socket
sock_->Bind(port_); // 绑定端口
sock_->Listen(); // 开始监听
SetNonBlockOrDie(sock_->GetSockfd()); // 设置为非阻塞模式
}
/**
* @brief 接受客户端连接的主循环
* @param conn 持有EventLoop引用的Connection对象
*/
void Accepter(std::weak_ptr<Connection> conn)
{
auto connection = conn.lock();
while (true)
{
int listen_sockfd = sock_->GetSockfd();
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 非阻塞accept
int client_sockfd = accept(listen_sockfd, (sockaddr *)&client, &len);
if (client_sockfd == -1)
{
if (errno == EWOULDBLOCK)
break; // 无新连接时退出循环
else if (errno == EINTR)
continue; // 被信号中断则重试
else
{
// 记录accept错误日志
lg(Error, "listening sock accept false, [%d]: %s", errno, strerror(errno));
continue;
}
}
// 获取客户端地址信息
std::string client_ip;
uint16_t client_port;
Sock::GetAddrAndPort(client, client_ip, client_port);
lg(Info, "accept a new client [%s: %d]", client_ip.c_str(), client_port);
// 设置客户端socket为非阻塞
SetNonBlockOrDie(client_sockfd);
// 通过EventLoop的环形队列传递连接信息
auto event_loop = connection->el.lock();
ClientInf ci{
.sockfd = client_sockfd,
.client_ip = client_ip,
.client_port = client_port
};
event_loop->rq_->Push(ci); // 入队操作
}
}
/// 获取监听socket文件描述符
int Fd() { return sock_->GetSockfd(); }
private:
uint16_t port_; // 监听端口
std::shared_ptr<Sock> sock_; // TCP socket封装对象
};
#endif
关键设计亮点
低耦合实现方式
弱引用传递:
- 通过
std::weak_ptr<Connection>获取EventLoop引用 - 避免循环依赖,不增加引用计数
- 通过
环形队列中介:
event_loop->rq_->Push(ci);- 新连接信息通过环形队列传递
- 监听线程与I/O线程完全解耦
非阻塞设计:
- 监听socket设置为非阻塞模式
- 批量接受连接后立即退出循环
异常处理机制
- EWOULDBLOCK处理:
- 非阻塞模式下正常情况处理
- EINTR处理:
- 系统调用被信号中断时自动重试
- 错误隔离:
- 单个accept失败不影响后续连接
性能优化点
批量接受连接:
while (true) { int client_sockfd = accept(...); // ... }- 单次调用处理多个等待连接
零拷贝传递:
- 仅传递socket fd和地址信息
- 避免数据复制开销
该实现完美体现了Reactor模式的精髓,通过:
- 非阻塞I/O提高吞吐量
- 队列缓冲实现生产消费者模式
- 弱引用保持组件独立性
使得监听器可以高效稳定地处理海量连接请求。
事件循环器(EventLoop)核心实现解析
类功能概述
EventLoop是Reactor模式的核心组件,主要职责包括:
- 管理所有连接的生命周期
- 处理epoll事件分发
- 集成定时器管理
- 提供协议处理接口(OnMessage)
- 支持任务队列集成(TaskPush)
数据结构定义
// 客户端信息结构
struct ClientInf{
int sockfd; // 客户端socket描述符
std::string client_ip; // 客户端IP地址
uint16_t client_port; // 客户端端口号
};
// 任务回调类型定义
using task_t = std::function<void(std::weak_ptr<RingQueue<ClientInf>>,
std::weak_ptr<EventLoop>)>;
核心方法实现
1. 连接管理
/**
* @brief 添加新连接并注册事件回调
* @param sock 套接字描述符
* @param event 监听事件类型(EPOLLIN/OUT等)
* @param recv_cb 读事件回调
* @param send_cb 写事件回调
* @param except_cb 异常回调
* @param ip 客户端IP(可选)
* @param port 客户端端口(可选)
* @param is_listensock 是否监听socket(不加入定时器)
*/
void AddConnection(int sock, uint32_t event, func_t recv_cb,
func_t send_cb, func_t except_cb,
const std::string &ip = "0.0.0.0",
uint16_t port = 0,
bool is_listensock = false)
{
// 创建新连接对象
std::shared_ptr<Connection> new_connect(new Connection(sock));
// 设置反向引用
new_connect->el = shared_from_this();
// 注册回调函数
new_connect->recv_cb = recv_cb;
new_connect->send_cb = send_cb;
new_connect->except_cb = except_cb;
// 记录客户端信息
new_connect->ip_ = ip;
new_connect->port_ = port;
// 非监听socket加入定时管理
if(!is_listensock) tm_->Push(new_connect);
// 添加到连接表
connections_[sock] = new_connect;
// 注册epoll事件
epoller_->EpollCtl(EPOLL_CTL_ADD, sock, event);
}
2. 网络I/O处理
/**
* @brief 读取客户端数据(非阻塞模式)
* @param connect 连接弱引用
*/
void Recv(std::weak_ptr<Connection> connect)
{
auto connection = connect.lock();
int sock = connection->Sockfd();
while(true) {
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if(n > 0) {
buffer[n] = 0;
connection->AppendInBuffer(buffer); // 追加到输入缓冲区
lg(Debug, "thread-%d, recv message from client: %s",
pthread_self(), buffer);
}
else if(n == 0) { // 客户端关闭连接
lg(Info, "client [%s: %d] quit",
connection->ip_.c_str(), connection->port_);
connection->except_cb(connection);
return;
}
else { // 错误处理
if(errno == EWOULDBLOCK) break; // 无数据可读
else if(errno == EINTR) continue; // 被信号中断
else {
lg(Error, "recv from client [%s: %d] false",
connection->ip_.c_str(), connection->port_);
connection->except_cb(connection);
return;
}
}
}
// 触发上层协议处理
if(OnMessage_) OnMessage_(connection);
}
/**
* @brief 发送数据到客户端(非阻塞模式)
*/
void Send(std::weak_ptr<Connection> connect)
{
auto connection = connect.lock();
std::string &outbuffer = connection->OutBuffer();
while(true) {
ssize_t n = send(connection->Sockfd(),
outbuffer.c_str(),
outbuffer.size(), 0);
if(n > 0) {
outbuffer.erase(0, n); // 移除已发送数据
if(outbuffer.empty()) break;
}
else if(n == 0) return; // 不应发生的情况
else {
if(errno == EWOULDBLOCK) break; // 写缓冲区满
else if(errno == EINTR) continue; // 被信号中断
else {
lg(Error, "send to client [%s: %d] false",
connection->ip_.c_str(), connection->port_);
connection->except_cb(connection);
return;
}
}
}
// 动态调整EPOLLOUT事件监听
if(!outbuffer.empty() && !connection->write_care_) {
// 开启写事件监听(当缓冲区有数据时)
epoller_->EpollCtl(EPOLL_CTL_MOD,
connection->Sockfd(),
EPOLLIN | EPOLLOUT | EPOLLET);
connection->write_care_ = true;
}
else if(outbuffer.empty() && connection->write_care_) {
// 关闭写事件监听(当缓冲区空时)
epoller_->EpollCtl(EPOLL_CTL_MOD,
connection->Sockfd(),
EPOLLIN | EPOLLET);
connection->write_care_ = false;
}
}
3. 事件分发核心
/**
* @brief 事件分发处理(核心方法)
*/
void DisPatcher()
{
// 等待epoll事件(阻塞调用)
int n = epoller_->EpollWait(recvs, max_fd);
for(int i = 0; i < n; ++i) {
uint32_t events = recvs[i].events;
int sockfd = recvs[i].data.fd;
// 异常事件转化为读写事件处理
if(events & (EPOLLERR | EPOLLHUP))
events |= (EPOLLIN | EPOLLOUT);
// 处理读事件
if(events & EPOLLIN && connections_[sockfd]->recv_cb) {
connections_[sockfd]->recv_cb(connections_[sockfd]);
tm_->UpdateTime(sockfd); // 更新活跃时间
}
// 处理写事件
if(events & EPOLLOUT && connections_[sockfd]->send_cb) {
connections_[sockfd]->send_cb(connections_[sockfd]);
tm_->UpdateTime(sockfd); // 更新活跃时间
}
}
}
4. 定时器集成
/**
* @brief 检查并处理过期连接
*/
void Expired_check()
{
while(tm_->IsTopExpired()) { // 检查堆顶是否过期
auto top_time = tm_->GetTop()->connect;
int sockfd = top_time->Sockfd();
// 如果连接仍存在则触发异常关闭
if(connections_.find(sockfd) != connections_.end())
connections_[sockfd]->except_cb(top_time);
}
}
5. 主事件循环
/**
* @brief 事件主循环
*/
void Loop()
{
while(true) {
// 从任务队列获取新连接(如果有TaskPush回调)
if(TaskPush_) TaskPush_(rq_, shared_from_this());
// 处理epoll事件
DisPatcher();
// 检查过期连接
Expired_check();
}
}
关键设计解析
双回调接口设计
OnMessage回调:
- 由上层协议提供
- 在数据接收完成后触发
- 处理协议解析和业务逻辑
TaskPush回调:
- 用户自定义实现
- 从环形队列获取新连接
- 示例实现:
定时器集成策略
活跃度更新:
- 每次I/O操作后调用
tm_->UpdateTime() - 重置连接的过期时间
- 每次I/O操作后调用
惰性检查:
- 只在事件循环间隙检查
- 避免频繁的定时器操作影响I/O性能
异常统一处理:
- 过期连接通过
except_cb统一清理 - 保证资源释放的一致性
- 过期连接通过
性能优化点
边缘触发(EPOLLET):
- 减少epoll事件触发次数
- 需要配合非阻塞I/O
动态事件注册:
- 只在需要时监听EPOLLOUT
- 减少不必要的epoll事件
批量事件处理:
- 单次epoll_wait处理多个事件
- 减少系统调用次数
该实现通过清晰的接口划分和高效的事件处理机制,完美支持了高并发网络编程的需求,同时保持了良好的扩展性和可维护性。
完整代码
#ifndef _EVENT_LOOP_HPP_
#define _EVENT_LOOP_HPP_ 1
#include <iostream>
#include <unordered_map>
#include <memory>
#include <functional>
#include "tcp.hpp"
#include "epoll.hpp"
#include "log.hpp"
#include "common.hpp"
#include "ring_queue.hpp"
#include "timer_manager.hpp"
#include "connection.hpp"
class Connection;
class EventLoop;
struct ClientInf;
class Timer;
class TimerManager;
using task_t = std::function<void(std::weak_ptr<RingQueue<ClientInf>>, std::weak_ptr<EventLoop>)>;
inline constexpr size_t max_fd = 1 << 10;
inline const uint16_t default_port = 7777;
inline thread_local char buffer[1024];
struct ClientInf{
int sockfd;
std::string client_ip;
uint16_t client_port;
};
class EventLoop: public std::enable_shared_from_this<EventLoop>
{
public:
// 当 Task_Push 为空时,即代表不从上层循环队列当中获取任务,即主任务循环器的选择,因为主任务循环器只负责任务 Push,不参与 Pop
EventLoop(std::shared_ptr<RingQueue<ClientInf>> rq = nullptr, func_t OnMessage = nullptr, task_t TaskPush = nullptr)
:epoller_(new Epoll()),
OnMessage_(OnMessage),
TaskPush_(TaskPush),
rq_(rq),
tm_(new TimerManager())
{}
void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb,
const std::string &ip = "0.0.0.0", uint16_t port = 0, bool is_listensock = false)
{
std::shared_ptr<Connection> new_connect(new Connection(sock));
new_connect->el = shared_from_this();
new_connect->recv_cb = recv_cb;
new_connect->send_cb = send_cb;
new_connect->except_cb = except_cb;
new_connect->ip_ = ip;
new_connect->port_ = port;
if(!is_listensock) tm_->Push(new_connect);
connections_[sock] = new_connect;
epoller_->EpollCtl(EPOLL_CTL_ADD, sock, event);
}
void Recv(std::weak_ptr<Connection> connect)
{
auto connection = connect.lock();
int sock = connection->Sockfd();
while(true)
{
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
if(n > 0)
{
buffer[n] = 0;
connection->AppendInBuffer(buffer);
lg(Debug, "thread-%d, recv message from client: %s", pthread_self(),buffer);
}else if(n == 0){
lg(Info, "client [%s: %d] quit", connection->ip_.c_str(), connection->port_);
connection->except_cb(connection);
return;
}else{
if(errno == EWOULDBLOCK) break;
else if(errno == EINTR) continue;
else {
lg(Error, "recv from client [%s: %d] false", connection->ip_.c_str(), connection->port_);
connection->except_cb(connection);
return;
}
}
}
if(OnMessage_)
OnMessage_(connection);
}
void Send(std::weak_ptr<Connection> connect)
{
auto connection = connect.lock();
std::string &outbuffer = connection->OutBuffer();
while(true)
{
ssize_t n = send(connection->Sockfd(), outbuffer.c_str(), outbuffer.size(), 0);
if(n > 0)
{
outbuffer.erase(0, n);
if(outbuffer.empty()) break;
}
else if(n == 0) return;
else{
if(errno == EWOULDBLOCK) break;
else if(errno == EINTR) continue;
else{
lg(Error, "send to client [%s: %d] false", connection->ip_.c_str(), connection->port_);
connection->except_cb(connection);
return;
}
}
// 写缓冲满
if(!outbuffer.empty() && !connection->write_care_)
{
// 开启写时间关心
epoller_->EpollCtl(EPOLL_CTL_MOD,connection->Sockfd(), EPOLLIN | EPOLLOUT | EPOLLET);
}else if(outbuffer.empty() && connection->write_care_){
// 关闭写事件特别关心
epoller_->EpollCtl(EPOLL_CTL_MOD,connection->Sockfd(), EPOLLIN | EPOLLET);
}
}
}
void Except(std::weak_ptr<Connection> connect)
{
auto connection = connect.lock();
int fd = connection->Sockfd();
lg(Warning, "client [%s: %d] handler exception", connection->ip_.c_str(), connection->port_);
epoller_->EpollCtl(EPOLL_CTL_DEL, fd, 0);
lg(Debug, "client [%s: %d] close done", connection->ip_.c_str(), connection->port_);
close(fd);
connections_.erase(fd);
tm_->LazyDelete(fd);
}
void DisPatcher()
{
int n = epoller_->EpollWait(recvs, max_fd);
for(int i = 0; i < n; ++i)
{
uint32_t events = recvs[i].events;
int sockfd = recvs[i].data.fd;
// 异常事件转化为读写问题处理
if(events & (EPOLLERR | EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT);
if(events & EPOLLIN && connections_[sockfd]->recv_cb) {
connections_[sockfd]->recv_cb(connections_[sockfd]);
tm_->UpdateTime(sockfd);
}
if(events & EPOLLOUT && connections_[sockfd]->send_cb) {
connections_[sockfd]->send_cb(connections_[sockfd]);
tm_->UpdateTime(sockfd);
}
}
}
void Expired_check()
{
while(tm_->IsTopExpired()){
auto top_time = tm_->GetTop()->connect;
int sockfd = top_time->Sockfd();
// 找不到说明是客户端主动释放的
if(connections_.find(sockfd) != connections_.end())
connections_[sockfd]->except_cb(top_time);
}
}
void Loop()
{
while(true)
{
if(TaskPush_) TaskPush_(rq_, shared_from_this());
DisPatcher();
Expired_check();
}
}
public:
std::shared_ptr<RingQueue<ClientInf>> rq_;
private:
std::unordered_map<int, std::shared_ptr<Connection>> connections_;
std::shared_ptr<Epoll> epoller_;
std::shared_ptr<TimerManager> tm_;
struct epoll_event recvs[max_fd];
func_t OnMessage_;
task_t TaskPush_;
};
#endif
而后,我们将之前的计算服务协议以及基于信号量的循环队列集成到该EventLoop中,形成一个完整的网络服务框架。
计算服务协议讲解链接跳转:计算服务协议
基于信号量的环形队列讲解链接跳转:基于信号量的环形队列
下面我们将这些组合起来,就是实现了完整的Reactor模式的网络服务框架:
服务端主程序实现解析
#include <iostream>
#include <functional>
#include <memory>
#include <unordered_map>
#include <thread>
#include <vector>
#include "log.hpp"
#include "event_loop.hpp"
#include "ring_queue.hpp"
#include "listener.hpp"
#include "server_cal.hpp"
const size_t default_thread_num = 5; // 默认工作线程数
ServerCal sc; // 业务逻辑处理器实例
/**
* @brief 消息处理回调函数
* @param wconnectiion 客户端连接弱引用
* @note 处理客户端输入并返回计算结果
*/
void MessageHandler(std::weak_ptr<Connection> wconnectiion) {
auto connection = wconnectiion.lock();
std::string &inf = connection->Inbuffer(); // 获取输入缓冲区
std::string outinf;
while (true) {
outinf = sc.Calculator(inf); // 调用业务逻辑处理
if (outinf.empty()) return; // 无输出则结束
connection->AppendOutBuffer(outinf); // 写入输出缓冲区
// 通过EventLoop发送响应
auto wsender = connection->el;
auto sender = wsender.lock();
sender->Send(connection);
}
}
/**
* @brief 任务获取回调函数
* @param wrq 环形队列弱引用
* @param wel 事件循环弱引用
* @note 从队列获取新连接并注册到EventLoop
*/
void TaskPush(std::weak_ptr<RingQueue<ClientInf>> wrq, std::weak_ptr<EventLoop> wel) {
auto rq = wrq.lock();
auto el = wel.lock();
if (auto client_inf = rq->Pop()) { // 从队列获取新连接
el->AddConnection(
client_inf->sockfd,
EPOLLIN | EPOLLET, // 边缘触发模式
std::bind(&EventLoop::Recv, el, std::placeholders::_1), // 读回调
std::bind(&EventLoop::Send, el, std::placeholders::_1), // 写回调
std::bind(&EventLoop::Except, el, std::placeholders::_1), // 异常回调
client_inf->client_ip,
client_inf->client_port
);
}
}
/**
* @brief 监听线程处理函数
* @param rq 环形队列
* @param port 监听端口
*/
void ListenHandler(std::shared_ptr<RingQueue<ClientInf>> rq, uint16_t port) {
std::shared_ptr<Listener> lt(new Listener(port)); // 创建监听器
lt->Init(); // 初始化监听socket
// 主EventLoop只负责监听
std::shared_ptr<EventLoop> baser(new EventLoop(rq));
baser->AddConnection(
lt->Fd(),
EPOLLIN | EPOLLET,
std::bind(&Listener::Accepter, lt, std::placeholders::_1), // 接受新连接
nullptr, // 无需写回调
nullptr, // 无需异常回调
"0.0.0.0",
0,
true // 标记为监听socket
);
baser->Loop(); // 启动事件循环
}
/**
* @brief 工作线程处理函数
* @param rq 环形队列
*/
void EventHandler(std::shared_ptr<RingQueue<ClientInf>> rq) {
// 创建工作EventLoop,设置消息处理和任务获取回调
std::shared_ptr<EventLoop> task_handler(
new EventLoop(rq, MessageHandler, TaskPush)
);
task_handler->Loop(); // 启动事件循环
}
int main(int argc, char *argv[]) {
// 参数处理
uint16_t port;
if(argc == 1) port = 6667; // 默认端口
else if(argc == 2) port = std::stoi(argv[1]);
else {
std::cerr << "Usage: " << argv[0] << " [port]" << std::endl;
return 1;
}
// 创建环形队列
std::shared_ptr<RingQueue<ClientInf>> rq(new RingQueue<ClientInf>);
// 启动监听线程
std::thread base_thread(ListenHandler, rq, port);
// 创建工作线程池
std::vector<std::thread> threads;
for (int i = 0; i < default_thread_num; ++i) {
threads.emplace_back(std::thread(EventHandler, rq));
}
// 等待所有线程结束
for (auto &thread : threads) {
if (thread.joinable()) thread.join();
}
base_thread.join();
return 0;
}
核心架构说明
线程分工:
- 监听线程:专门处理新连接接入,通过
Listener将连接信息放入环形队列 - 工作线程:从队列获取连接,通过
EventLoop处理具体业务
- 监听线程:专门处理新连接接入,通过
回调机制:
MessageHandler:处理客户端请求并返回计算结果TaskPush:将新连接注册到工作线程的EventLoop
关键组件:
RingQueue:线程安全的环形队列,连接监听线程和工作线程EventLoop:事件驱动核心,每个工作线程独立实例Listener:封装监听socket处理
业务处理:
- 通过
ServerCal类实现具体业务逻辑 - 计算结果通过
EventLoop::Send返回客户端
- 通过
详细代码链接请查看:完整代码