Linux 基于 Epoll 的主从 Reactor 多线程模型

Linux 基于 Epoll 的主从 Reactor 多线程模型

八月 02, 2025 次阅读

该项目的源代码地址:GitHub

什么是 Reactor 模型

Reactor 模型 是一种基于事件驱动的高性能网络编程模型,广泛应用于高并发场景中。其核心思想是通过I/O多路复用和事件分发机制,高效地处理网络请求,避免线程阻塞和资源浪费。

核心概念

Reactor 模型 的核心是事件驱动机制,通过监听 I/O 事件(如连接建立、数据读写等),将事件分发给相应的处理器(Handler)进行处理。它主要由以下两个角色组成:

1.Reactor(反应器):负责监听和分发事件。它通过 I/O 多路复用(如 select、poll 或 epoll)监听多个连接的事件,并将事件分发给对应的处理器。

2.Handler(处理器):负责具体的事件处理逻辑,如数据读取、业务处理和响应发送。

模型分类

Reactor 模型 根据线程和资源的分配方式,分为以下三种实现方式:

  • 单 Reactor 单线程模型: Reactor 和所有 Handler 都运行在同一个线程中。 优点:模型简单,无需处理多线程同步问题。 缺点:性能有限,无法充分利用多核 CPU,且单线程阻塞会导致整个系统不可用。

  • 单 Reactor 多线程模型: Reactor 负责事件监听和分发,具体的业务处理交由线程池完成。 优点:充分利用多核 CPU,提升并发处理能力。 缺点:Reactor 仍是单线程运行,高并发场景下可能成为瓶颈。

  • 主从 Reactor 多线程模型: 主 Reactor 负责监听新连接并将其分配给从 Reactor。 从 Reactor 负责具体的 I/O 事件处理,并将任务交给线程池完成。 优点:主从分离,职责明确,适合高并发场景,充分利用多核资源。

工作流程

以主从 Reactor 多线程模型为例,其工作流程如下:

1.主 Reactor 监听新连接事件,通过 Acceptor 接收连接。

2.新连接被分配给从 Reactor,从 Reactor 负责监听该连接的 I/O 事件。

3.当有 I/O 事件发生时,从 Reactor 调用对应的 Handler 进行处理。

4.Handler 读取数据后,将任务交给线程池处理业务逻辑。

5.线程池完成业务处理后,将结果返回给 Handler,Handler 再通过网络发送响应。

基于 Epoll 的主从 Reactor 多线程模型

Epoll 是 Linux 下的一种 I/O 多路复用机制,它提供了高效的事件通知机制。基于 Epoll 的主从 Reactor 多线程模型,可以充分利用多核 CPU,实现高并发处理。

核心组件

1.主 Reactor:监听新连接事件,通过 Acceptor 接收连接,并将其分配给从 Reactor。

2.从 Reactor:监听已连接的 I/O 事件,调用 Handler 进行处理。

3.Handler:负责具体的事件处理逻辑,如数据读取、业务处理和响应发送。

4.TimerManager:链接管理器,根据用户的活跃状态决定是否关闭链接。

而我们的主从 Reactor 处理的是简单的计算器任务,因此暂不需将任务交给线程池处理业务逻辑。

工作流程

1.主 Reactor 监听新连接事件,通过 Acceptor 接收连接。

2.新连接被分配给从 Reactor,从 Reactor 负责监听该连接的 I/O 事件。

3.当有 I/O 事件发生时,从 Reactor 调用对应的 Handler 进行处理。

4.Handler 读取数据后,进行业务逻辑处理,并通过网络发送响应。

注意事项

1.主 Reactor 和从 Reactor 应该运行在不同的线程中,以充分利用多核 CPU。

2.从 Reactor 应该使用非阻塞 I/O,因为 Epoll 使用的是边缘触发模式,我们需要对数据进行完整读取。

3.一般在读取到用户的数据后处理完直接将数据发送给用户,但是若发送给用户后用户缓冲区仍然有数据,说明用户未读取完,我们需要对用户的写事件 EPOLLOUT 进行监听,直到用户读取完数据。

代码实现

Connection.hpp 连接类实现解析

1. 头文件保护与基本包含

#ifndef _CONNECTION_HPP_
#define _CONNECTION_HPP_ 1
// 防止头文件重复包含的标准保护措施

#include <iostream>
#include <memory>       // 智能指针支持
#include <functional>   // 函数对象支持
#include "common.hpp"   // 项目通用头文件

2. 前向声明与类型定义

class Connection;
// Connection类的前向声明,用于func_t类型定义

using func_t = std::function<void(std::weak_ptr<Connection>)>;
// 定义回调函数类型:
// 参数为Connection的弱指针,无返回值
// 使用weak_ptr避免循环引用

class EventLoop; 
// EventLoop类的前向声明

3. Connection类定义

class Connection{
public:
    // 构造函数:初始化socket描述符,默认不关注写事件
    Connection(int sock)
    :sock_(sock),
    write_care_(false){}

    // 获取socket文件描述符
    int Sockfd(){ return sock_; }

    // 追加数据到输入缓冲区
    void AppendInBuffer(const std::string &info)
    {
        inbuffer_ += info;
    }

    // 追加数据到输出缓冲区
    void AppendOutBuffer(const std::string &info)
    {
        outbuffer_ += info;
    }

    // 获取输入缓冲区引用
    std::string &Inbuffer()
    {
        return inbuffer_;
    }

    // 获取输出缓冲区引用
    std::string &OutBuffer()
    {
        return outbuffer_;
    }

4. 成员变量详解

private:
    int sock_;              // 套接字文件描述符
    std::string inbuffer_;  // 输入数据缓冲区
    std::string outbuffer_; // 输出数据缓冲区

public:
    // 所属EventLoop的弱引用(避免循环引用)
    std::weak_ptr<EventLoop> el;

    // 三个核心回调函数:
    func_t recv_cb;    // 读事件回调
    func_t send_cb;    // 写事件回调 
    func_t except_cb;  // 异常事件回调

    // 客户端信息
    std::string ip_;     // 客户端IP地址
    uint16_t port_;      // 客户端端口号
    
    // 写事件关注标志
    // true表示需要监听EPOLLOUT事件
    bool write_care_;  
};

5. 设计要点说明

5.1 缓冲区管理
  • 使用std::string作为缓冲区容器
  • 提供Append方法追加数据
  • 返回引用避免不必要的拷贝
5.2 回调机制
  • 三个关键回调函数通过std::function封装
  • 使用weak_ptr传递自身引用,防止循环引用
  • 回调由EventLoop事件循环触发
5.3 资源管理
  • 原始socket描述符由RAII管理
  • weak_ptr关联EventLoop避免内存泄漏
  • 写事件标志实现动态EPOLLOUT注册

Epoll.hpp 封装类实现解析

#ifndef _EPOLL_HPP_
#define _EPOLL_HPP_ 1

#include <iostream>
#include <sys/epoll.h>  // epoll系统调用头文件
#include "nocopy.hpp"   // 禁止拷贝的基类
#include "log.hpp"      // 日志系统头文件

// 默认epoll_wait超时时间(毫秒)
inline const int default_time_out = 3000;

// 错误码枚举
enum {
    epoll_create_error = 1,  // epoll创建失败错误码
};

/**
 * @brief Epoll封装类,继承nocopy禁止拷贝构造
 */
class Epoll: public nocopy
{
public:
    /**
     * @brief 构造函数:创建epoll实例
     */
    Epoll()
    :timeout(default_time_out)  // 初始化超时时间
    {
        epfd = epoll_create1(0);  // 创建epoll实例
        if(epfd == -1)
        {
            // 记录创建失败日志(线程ID、错误码、错误信息)
            lg(Error, "thread-%d, epoll_create false, errno: %d, errstr: %s", 
               pthread_self(), errno, strerror(errno));
            exit(epoll_create_error);  // 创建失败直接退出
        }
        // 记录创建成功日志
        lg(Info, "thread-%d, epoll create success, epoll fd: %d", pthread_self(), epfd);
    }

    /**
     * @brief 等待epoll事件
     * @param events 输出参数,用于接收事件数组
     * @param num 最大事件数量
     * @return 就绪的事件数量,-1表示错误
     */
    int EpollWait(struct epoll_event events[], int num)
    {
        int n = epoll_wait(epfd, events, num, timeout);
        if(n == -1){
            lg(Error, "epoll_wait false, errno: %d, errstr: %s", errno, strerror(errno));
        }
        return n;
    }

    /**
     * @brief 控制epoll监控的文件描述符
     * @param op 操作类型 (EPOLL_CTL_ADD/MOD/DEL)
     * @param fd 要操作的文件描述符
     * @param event 要监控的事件标志
     */
    void EpollCtl(int op, int fd, uint32_t event)
    {
        if(op == EPOLL_CTL_DEL){
            // 删除操作不需要event参数
            if(epoll_ctl(epfd, op, fd, nullptr) == -1){
                lg(Error, "epoll control false, errno: %d, errstr: %s", 
                   errno, strerror(errno));
            }
        }else{
            struct epoll_event ev;
            ev.data.fd = fd;    // 关联文件描述符
            ev.events = event;    // 设置监听事件
            if(epoll_ctl(epfd, op, fd, &ev)){
                lg(Error, "epoll control false, errno: %d, errstr: %s", 
                   errno, strerror(errno));
            }
        }
    }

    /**
     * @brief 析构函数:关闭epoll文件描述符
     */
    ~Epoll()
    {
        close(epfd);  // 确保资源释放
    }

private:
    int epfd;      // epoll文件描述符
    int timeout;   // epoll_wait超时时间(毫秒)
};

#endif

设计说明

  1. 资源管理

    • 遵循RAII原则,构造函数创建epoll实例,析构函数自动释放
    • 继承nocopy禁止拷贝构造和赋值,避免文件描述符重复关闭
  2. 错误处理

    • 所有epoll系统调用都有错误日志记录
    • 创建失败直接退出程序(生产环境可改为异常)
  3. 线程安全

    • 日志中包含线程ID,便于多线程调试
    • 类本身无状态依赖,可在多线程环境使用不同实例
  4. 接口设计

    • EpollWait():封装epoll_wait,返回就绪事件数
    • EpollCtl():统一处理三种控制操作,自动处理DEL特殊情况

该接口封装在前面章节已经讲解过,主要用于管理事件监听和处理。

定时器管理系统设计解析

Timer定时器结构设计

#ifndef _TIMER_HPP_
#define _TIMER_HPP_ 1

#include <queue>
#include <vector>
#include <functional>
#include <memory>
#include <ctime>
#include <iostream>
#include <unordered_map>
#include "connection.hpp"

// 前向声明避免循环依赖
class Connection;

/**
 * @brief 定时器节点结构体
 * @note 存储连接的生命周期状态信息
 */
struct Timer
{
    // 构造函数初始化连接状态
    Timer(int expired_time_, int cnt_, std::shared_ptr<Connection> connect_)
    :expired_time(expired_time_),  // 绝对过期时间戳
    cnt(cnt_),                     // 连续活跃次数计数器
    connect(connect_){}            // 管理的连接对象
    
    int expired_time;              // Unix时间戳格式的过期时间
    int cnt;                       // 累计活跃次数(用于奖励机制)
    std::shared_ptr<Connection> connect; // 关联的连接智能指针
};

/**
 * @brief 定时器比较仿函数
 * @note 用于构建最小堆(最早过期时间在堆顶)
 */
struct time_cmp
{
    bool operator()(const std::shared_ptr<Timer> t1, const std::shared_ptr<Timer> t2)
    {
        // 注意:通过大于号实现最小堆
        return t1->expired_time > t2->expired_time;
    }
};

#endif

TimerManager定时器管理器设计

#ifndef _TIMER_MANAGER_HPP_
#define _TIMER_MANAGER_HPP_ 1

#include "timer.hpp"

// 默认配置常量
inline const int default_alive_gap = 10;  // 默认存活周期(秒)
inline const int default_alive_cnt = 5;   // 重置阈值(次)

/**
 * @brief 定时器管理核心类
 * @note 采用最小堆+哈希表双结构实现高效管理
 */
class TimerManager
{
public:
    // 初始化存活周期和计数阈值
    TimerManager(int gap = default_alive_gap, int cnt = default_alive_cnt)
        : alive_gap(gap),
          alive_cnt(cnt) {}

    /**
     * @brief 添加新连接定时器
     * @param connect 要管理的连接对象
     */
    void Push(std::shared_ptr<Connection> connect)
    {
        // 创建新定时器(当前时间+存活周期)
        std::shared_ptr<Timer> timer(new Timer(std::time(nullptr) + alive_gap, 0, connect));
        timers.emplace(timer);                // 插入最小堆
        timer_map[connect->Sockfd()] = timer; // 建立socket到定时器的映射
    }

    /**
     * @brief 检查堆顶是否过期
     * @return true表示存在过期连接需要处理
     */
    bool IsTopExpired()
    {
        while (!timers.empty() && timers.top()->expired_time <= std::time(nullptr))
        {
            int sockfd = (timers.top()->connect)->Sockfd();
            // 处理惰性删除
            if(timer_map.find(sockfd) == timer_map.end()) {
                timers.pop();
                continue;
            }
            if(timers.top()->cnt != timer_map[sockfd]->cnt || timers.top()->expired_time != timer_map[sockfd]->expired_time)
            {
                timers.pop();
                timers.emplace(timer_map[sockfd]);
            }
            else if (timers.top()->cnt >= 5)
            {
                auto timer = timers.top();
                timer->expired_time = std::time(nullptr) + alive_gap;
                timer->cnt = 0;
                timers.emplace(timer);
                timer_map[sockfd] = timer;
            }
            else
            {
                return true;
            }
        }
        return false;
    }

    /// 惰性删除元素
    void LazyDelete(int sockfd){
        timer_map.erase(sockfd);
    }

    /// 获取堆顶定时器(不弹出)
    std::shared_ptr<Timer> GetTop()
    {
        return timers.top();
    }

    /**
     * @brief 更新连接活跃时间
     * @param sockfd 要更新的socket描述符
     */
    void UpdateTime(int sockfd)
    {
        // 检测链接是否已被删除
        if(timer_map.find(sockfd) == timer_map.end()) return;
        if(timers.empty()) return;
        // 创建新的 Timer(避免共享指针副作用)
        auto new_timer = std::make_shared<Timer>(
        std::time(nullptr) + alive_gap,
        timer_map[sockfd]->cnt + 1,
        timer_map[sockfd]->connect
        );

        // 更新 timer_map
        timer_map[sockfd] = new_timer;
        timer_map[sockfd]->cnt++;       // 增加活跃计数
        timer_map[sockfd]->expired_time = time(nullptr) + alive_gap; // 重置过期时间
    }

private:
    // 数据结构
    std::priority_queue<std::shared_ptr<Timer>, 
                       std::vector<std::shared_ptr<Timer>>, 
                       time_cmp> timers;  // 最小堆
    std::unordered_map<int, std::shared_ptr<Timer>> timer_map;  // 哈希表
    
    // 配置参数
    int alive_gap;  // 存活周期(秒)
    int alive_cnt;  // 重置阈值(次)
};

#endif
惰性处理机制详解
  1. 延迟验证设计

    • 只在检查堆顶时处理过期(IsTopExpired
    • 非堆顶元素即使过期也暂不处理
    • 降低频繁检查带来的CPU开销
  2. 状态同步策略

    • 哈希表存储最新状态
    • 堆中元素可能滞后(通过cnt/expired_time以及哈希表中是否存有该元素来比较检测)
    • 发现状态不一致时重新入堆
  3. 奖励机制实现

    if (timers.top()->cnt >= alive_cnt) {
        //...重置计数器
        timer->expired_time = std::time(nullptr) + alive_gap;
        timer->cnt = 0;
    }
    • 活跃连接获得生命周期重置
    • 避免健康连接被误清理
堆处理精妙设计
  1. 双数据结构配合

    • 最小堆:快速获取最早过期连接(O(1))
    • 哈希表:快速查找特定连接(O(1))
  2. 堆更新优化

    • 不直接修改堆元素,而是重新插入
    • 通过状态比较发现过期版本
    • 摊还时间复杂度为O(logN)
  3. 批量处理能力

    while (!timers.empty() && timers.top()->expired_time <= curr_time) {
        // 批量处理所有过期连接
    }
  • 单次操作可处理多个过期连接

该设计完美平衡了精度和效率,特别适合高频网络通信场景,其中:

  • 健康连接通过奖励机制长期保持
  • 僵尸连接被快速识别清理
  • 系统开销随活跃连接数线性增长

监听器(Listener)类实现解析

类功能概述

Listener类封装了TCP服务端的监听逻辑,主要职责包括:

  1. 初始化监听socket(非阻塞模式)
  2. 持续接受客户端连接
  3. 将新连接信息通过环形队列传递给I/O线程
  4. 通过Connection弱引用实现与EventLoop的低耦合交互

代码详细注释

#ifndef _LISTENER_HPP_
#define _LISTENER_HPP_ 1

#include <iostream>
#include <memory>
#include <thread>
#include "tcp.hpp"         // TCP socket封装
#include "event_loop.hpp"  // 事件循环
#include "log.hpp"         // 日志系统

inline const uint16_t default_listen_port = 6349; // 默认监听端口

/**
 * @brief TCP监听器类
 * @note 通过环形队列实现与I/O线程的解耦
 */
class Listener
{
public:
    /**
     * @brief 构造函数
     * @param port 监听端口号
     */
    Listener(uint16_t port = default_listen_port)
        : port_(port),
        sock_(new Sock()) // 创建TCP socket封装对象
    {
    }

    /**
     * @brief 初始化监听socket
     */
    void Init()
    {
        sock_->Socket();  // 创建socket
        sock_->Bind(port_); // 绑定端口
        sock_->Listen();    // 开始监听
        SetNonBlockOrDie(sock_->GetSockfd()); // 设置为非阻塞模式
    }

    /**
     * @brief 接受客户端连接的主循环
     * @param conn 持有EventLoop引用的Connection对象
     */
    void Accepter(std::weak_ptr<Connection> conn)
    {
        auto connection = conn.lock();
        while (true)
        {
            int listen_sockfd = sock_->GetSockfd();
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            // 非阻塞accept
            int client_sockfd = accept(listen_sockfd, (sockaddr *)&client, &len);
            
            if (client_sockfd == -1)
            {
                if (errno == EWOULDBLOCK)
                    break;  // 无新连接时退出循环
                else if (errno == EINTR)
                    continue; // 被信号中断则重试
                else
                {
                    // 记录accept错误日志
                    lg(Error, "listening sock accept false, [%d]: %s", errno, strerror(errno));
                    continue;
                }
            }
            
            // 获取客户端地址信息
            std::string client_ip;
            uint16_t client_port;
            Sock::GetAddrAndPort(client, client_ip, client_port);
            lg(Info, "accept a new client [%s: %d]", client_ip.c_str(), client_port);

            // 设置客户端socket为非阻塞
            SetNonBlockOrDie(client_sockfd);

            // 通过EventLoop的环形队列传递连接信息
            auto event_loop = connection->el.lock();
            ClientInf ci{
                .sockfd = client_sockfd,
                .client_ip = client_ip,
                .client_port = client_port
            };
            event_loop->rq_->Push(ci); // 入队操作
        }
    }

    /// 获取监听socket文件描述符
    int Fd() { return sock_->GetSockfd(); }

private:
    uint16_t port_;             // 监听端口
    std::shared_ptr<Sock> sock_; // TCP socket封装对象
};

#endif

关键设计亮点

低耦合实现方式
  1. 弱引用传递

    • 通过std::weak_ptr<Connection>获取EventLoop引用
    • 避免循环依赖,不增加引用计数
  2. 环形队列中介

    event_loop->rq_->Push(ci);
    • 新连接信息通过环形队列传递
    • 监听线程与I/O线程完全解耦
  3. 非阻塞设计

    • 监听socket设置为非阻塞模式
    • 批量接受连接后立即退出循环
异常处理机制
  1. EWOULDBLOCK处理
    • 非阻塞模式下正常情况处理
  2. EINTR处理
    • 系统调用被信号中断时自动重试
  3. 错误隔离
    • 单个accept失败不影响后续连接
性能优化点
  1. 批量接受连接

    while (true) {
        int client_sockfd = accept(...);
        // ...
    }
    • 单次调用处理多个等待连接
  2. 零拷贝传递

    • 仅传递socket fd和地址信息
    • 避免数据复制开销

该实现完美体现了Reactor模式的精髓,通过:

  • 非阻塞I/O提高吞吐量
  • 队列缓冲实现生产消费者模式
  • 弱引用保持组件独立性

使得监听器可以高效稳定地处理海量连接请求。

事件循环器(EventLoop)核心实现解析

类功能概述

EventLoop是Reactor模式的核心组件,主要职责包括:

  1. 管理所有连接的生命周期
  2. 处理epoll事件分发
  3. 集成定时器管理
  4. 提供协议处理接口(OnMessage)
  5. 支持任务队列集成(TaskPush)

数据结构定义

// 客户端信息结构
struct ClientInf{
    int sockfd;            // 客户端socket描述符
    std::string client_ip; // 客户端IP地址
    uint16_t client_port;  // 客户端端口号
};

// 任务回调类型定义
using task_t = std::function<void(std::weak_ptr<RingQueue<ClientInf>>, 
                                std::weak_ptr<EventLoop>)>;

核心方法实现

1. 连接管理
/**
 * @brief 添加新连接并注册事件回调
 * @param sock 套接字描述符
 * @param event 监听事件类型(EPOLLIN/OUT等)
 * @param recv_cb 读事件回调
 * @param send_cb 写事件回调
 * @param except_cb 异常回调
 * @param ip 客户端IP(可选)
 * @param port 客户端端口(可选)
 * @param is_listensock 是否监听socket(不加入定时器)
 */
void AddConnection(int sock, uint32_t event, func_t recv_cb, 
                  func_t send_cb, func_t except_cb,
                  const std::string &ip = "0.0.0.0", 
                  uint16_t port = 0, 
                  bool is_listensock = false)
{
    // 创建新连接对象
    std::shared_ptr<Connection> new_connect(new Connection(sock));
    // 设置反向引用
    new_connect->el = shared_from_this(); 
    // 注册回调函数
    new_connect->recv_cb = recv_cb;
    new_connect->send_cb = send_cb;
    new_connect->except_cb = except_cb;
    // 记录客户端信息
    new_connect->ip_ = ip;
    new_connect->port_ = port;

    // 非监听socket加入定时管理
    if(!is_listensock) tm_->Push(new_connect);
    
    // 添加到连接表
    connections_[sock] = new_connect;
    // 注册epoll事件
    epoller_->EpollCtl(EPOLL_CTL_ADD, sock, event);
}
2. 网络I/O处理
/**
 * @brief 读取客户端数据(非阻塞模式)
 * @param connect 连接弱引用
 */
void Recv(std::weak_ptr<Connection> connect)
{
    auto connection = connect.lock();
    int sock = connection->Sockfd();
    
    while(true) {
        ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
        if(n > 0) {
            buffer[n] = 0;
            connection->AppendInBuffer(buffer); // 追加到输入缓冲区
            lg(Debug, "thread-%d, recv message from client: %s", 
               pthread_self(), buffer);
        }
        else if(n == 0) { // 客户端关闭连接
            lg(Info, "client [%s: %d] quit", 
               connection->ip_.c_str(), connection->port_);
            connection->except_cb(connection);
            return;
        }
        else { // 错误处理
            if(errno == EWOULDBLOCK) break; // 无数据可读
            else if(errno == EINTR) continue; // 被信号中断
            else {
                lg(Error, "recv from client [%s: %d] false", 
                   connection->ip_.c_str(), connection->port_);
                connection->except_cb(connection);
                return;
            }
        }
    }
    
    // 触发上层协议处理
    if(OnMessage_) OnMessage_(connection);
}

/**
 * @brief 发送数据到客户端(非阻塞模式)
 */
void Send(std::weak_ptr<Connection> connect)
{
    auto connection = connect.lock();
    std::string &outbuffer = connection->OutBuffer();
    
    while(true) {
        ssize_t n = send(connection->Sockfd(), 
                         outbuffer.c_str(), 
                         outbuffer.size(), 0);
        if(n > 0) {
            outbuffer.erase(0, n); // 移除已发送数据
            if(outbuffer.empty()) break;
        }
        else if(n == 0) return; // 不应发生的情况
        else {
            if(errno == EWOULDBLOCK) break; // 写缓冲区满
            else if(errno == EINTR) continue; // 被信号中断
            else {
                lg(Error, "send to client [%s: %d] false",
                   connection->ip_.c_str(), connection->port_);
                connection->except_cb(connection);
                return;
            }
        }
    }
    
    // 动态调整EPOLLOUT事件监听
    if(!outbuffer.empty() && !connection->write_care_) {
        // 开启写事件监听(当缓冲区有数据时)
        epoller_->EpollCtl(EPOLL_CTL_MOD,
                          connection->Sockfd(), 
                          EPOLLIN | EPOLLOUT | EPOLLET);
        connection->write_care_ = true;
    }
    else if(outbuffer.empty() && connection->write_care_) {
        // 关闭写事件监听(当缓冲区空时)
        epoller_->EpollCtl(EPOLL_CTL_MOD,
                          connection->Sockfd(), 
                          EPOLLIN | EPOLLET);
        connection->write_care_ = false;
    }
}
3. 事件分发核心
/**
 * @brief 事件分发处理(核心方法)
 */
void DisPatcher()
{
    // 等待epoll事件(阻塞调用)
    int n = epoller_->EpollWait(recvs, max_fd);
    
    for(int i = 0; i < n; ++i) {
        uint32_t events = recvs[i].events;
        int sockfd = recvs[i].data.fd;
        
        // 异常事件转化为读写事件处理
        if(events & (EPOLLERR | EPOLLHUP)) 
            events |= (EPOLLIN | EPOLLOUT);

        // 处理读事件
        if(events & EPOLLIN && connections_[sockfd]->recv_cb) {
            connections_[sockfd]->recv_cb(connections_[sockfd]);
            tm_->UpdateTime(sockfd); // 更新活跃时间
        }
        
        // 处理写事件
        if(events & EPOLLOUT && connections_[sockfd]->send_cb) {
            connections_[sockfd]->send_cb(connections_[sockfd]);
            tm_->UpdateTime(sockfd); // 更新活跃时间
        }
    }
}
4. 定时器集成
/**
 * @brief 检查并处理过期连接
 */
void Expired_check()
{
    while(tm_->IsTopExpired()) { // 检查堆顶是否过期
        auto top_time = tm_->GetTop()->connect;
        int sockfd = top_time->Sockfd();
        
        // 如果连接仍存在则触发异常关闭
        if(connections_.find(sockfd) != connections_.end())
            connections_[sockfd]->except_cb(top_time);
    }
}
5. 主事件循环
/**
 * @brief 事件主循环
 */
void Loop()
{
    while(true) {
        // 从任务队列获取新连接(如果有TaskPush回调)
        if(TaskPush_) TaskPush_(rq_, shared_from_this());
        
        // 处理epoll事件
        DisPatcher();
        
        // 检查过期连接
        Expired_check();
    }
}

关键设计解析

双回调接口设计
  1. OnMessage回调

    • 由上层协议提供
    • 在数据接收完成后触发
    • 处理协议解析和业务逻辑
  2. TaskPush回调

    • 用户自定义实现
    • 从环形队列获取新连接
    • 示例实现:
定时器集成策略
  1. 活跃度更新

    • 每次I/O操作后调用tm_->UpdateTime()
    • 重置连接的过期时间
  2. 惰性检查

    • 只在事件循环间隙检查
    • 避免频繁的定时器操作影响I/O性能
  3. 异常统一处理

    • 过期连接通过except_cb统一清理
    • 保证资源释放的一致性
性能优化点
  1. 边缘触发(EPOLLET)

    • 减少epoll事件触发次数
    • 需要配合非阻塞I/O
  2. 动态事件注册

    • 只在需要时监听EPOLLOUT
    • 减少不必要的epoll事件
  3. 批量事件处理

    • 单次epoll_wait处理多个事件
    • 减少系统调用次数

该实现通过清晰的接口划分和高效的事件处理机制,完美支持了高并发网络编程的需求,同时保持了良好的扩展性和可维护性。

完整代码

而后,我们将之前的计算服务协议以及基于信号量的循环队列集成到该EventLoop中,形成一个完整的网络服务框架。

计算服务协议讲解链接跳转:计算服务协议

基于信号量的环形队列讲解链接跳转:基于信号量的环形队列

下面我们将这些组合起来,就是实现了完整的Reactor模式的网络服务框架:

服务端主程序实现解析

#include <iostream>
#include <functional>
#include <memory>
#include <unordered_map>
#include <thread>
#include <vector>
#include "log.hpp"
#include "event_loop.hpp"
#include "ring_queue.hpp"
#include "listener.hpp"
#include "server_cal.hpp"

const size_t default_thread_num = 5;  // 默认工作线程数

ServerCal sc;  // 业务逻辑处理器实例

/**
 * @brief 消息处理回调函数
 * @param wconnectiion 客户端连接弱引用
 * @note 处理客户端输入并返回计算结果
 */
void MessageHandler(std::weak_ptr<Connection> wconnectiion) {
    auto connection = wconnectiion.lock();
    std::string &inf = connection->Inbuffer();  // 获取输入缓冲区
    std::string outinf;
    
    while (true) {
        outinf = sc.Calculator(inf);  // 调用业务逻辑处理
        if (outinf.empty()) return;   // 无输出则结束
        
        connection->AppendOutBuffer(outinf);  // 写入输出缓冲区
        
        // 通过EventLoop发送响应
        auto wsender = connection->el;
        auto sender = wsender.lock();
        sender->Send(connection);
    }
}

/**
 * @brief 任务获取回调函数
 * @param wrq 环形队列弱引用
 * @param wel 事件循环弱引用
 * @note 从队列获取新连接并注册到EventLoop
 */
void TaskPush(std::weak_ptr<RingQueue<ClientInf>> wrq, std::weak_ptr<EventLoop> wel) {
    auto rq = wrq.lock();
    auto el = wel.lock();
    
    if (auto client_inf = rq->Pop()) {  // 从队列获取新连接
        el->AddConnection(
            client_inf->sockfd, 
            EPOLLIN | EPOLLET,  // 边缘触发模式
            std::bind(&EventLoop::Recv, el, std::placeholders::_1),  // 读回调
            std::bind(&EventLoop::Send, el, std::placeholders::_1),  // 写回调
            std::bind(&EventLoop::Except, el, std::placeholders::_1), // 异常回调
            client_inf->client_ip, 
            client_inf->client_port
        );
    }
}

/**
 * @brief 监听线程处理函数
 * @param rq 环形队列
 * @param port 监听端口
 */
void ListenHandler(std::shared_ptr<RingQueue<ClientInf>> rq, uint16_t port) {
    std::shared_ptr<Listener> lt(new Listener(port));  // 创建监听器
    lt->Init();  // 初始化监听socket
    
    // 主EventLoop只负责监听
    std::shared_ptr<EventLoop> baser(new EventLoop(rq));
    baser->AddConnection(
        lt->Fd(), 
        EPOLLIN | EPOLLET, 
        std::bind(&Listener::Accepter, lt, std::placeholders::_1),  // 接受新连接
        nullptr,  // 无需写回调
        nullptr,  // 无需异常回调
        "0.0.0.0", 
        0, 
        true  // 标记为监听socket
    );
    baser->Loop();  // 启动事件循环
}

/**
 * @brief 工作线程处理函数
 * @param rq 环形队列
 */
void EventHandler(std::shared_ptr<RingQueue<ClientInf>> rq) {
    // 创建工作EventLoop,设置消息处理和任务获取回调
    std::shared_ptr<EventLoop> task_handler(
        new EventLoop(rq, MessageHandler, TaskPush)
    );
    task_handler->Loop();  // 启动事件循环
}

int main(int argc, char *argv[]) {
    // 参数处理
    uint16_t port;
    if(argc == 1) port = 6667;  // 默认端口
    else if(argc == 2) port = std::stoi(argv[1]);
    else {
        std::cerr << "Usage: " << argv[0] << " [port]" << std::endl;
        return 1;
    }

    // 创建环形队列
    std::shared_ptr<RingQueue<ClientInf>> rq(new RingQueue<ClientInf>);
    
    // 启动监听线程
    std::thread base_thread(ListenHandler, rq, port);
    
    // 创建工作线程池
    std::vector<std::thread> threads;
    for (int i = 0; i < default_thread_num; ++i) {
        threads.emplace_back(std::thread(EventHandler, rq));
    }
    
    // 等待所有线程结束
    for (auto &thread : threads) {
        if (thread.joinable()) thread.join();
    }
    base_thread.join();

    return 0;
}

核心架构说明

  1. 线程分工

    • 监听线程:专门处理新连接接入,通过Listener将连接信息放入环形队列
    • 工作线程:从队列获取连接,通过EventLoop处理具体业务
  2. 回调机制

    • MessageHandler:处理客户端请求并返回计算结果
    • TaskPush:将新连接注册到工作线程的EventLoop
  3. 关键组件

    • RingQueue:线程安全的环形队列,连接监听线程和工作线程
    • EventLoop:事件驱动核心,每个工作线程独立实例
    • Listener:封装监听socket处理
  4. 业务处理

    • 通过ServerCal类实现具体业务逻辑
    • 计算结果通过EventLoop::Send返回客户端

详细代码链接请查看:完整代码