news 2026/5/4 2:51:30

仿照Muduo的高并发服务器:EventLoop模块及与TimeWheel模块联调

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
仿照Muduo的高并发服务器:EventLoop模块及与TimeWheel模块联调

本期接着深入编写项目代码

相关代码上传至gitee:喜欢可以点个赞谢谢

目录

EventLoop模块

Eventfd机制

设计思路

源码

TimeWheel时间轮模块整合

设计思想

源码

EventLoop模块与TimeWheel模块联调整合


EventLoop模块

Eventfd机制

eventfd是本项目中的一种事件通知机制,用于创建一个描述符用于实现事件通知
eventfd本质在内核里边管理的就是一个计数器,创建eventfd就会在内核中创建一个计数器(结构) ,每当向eventfd中写入一个数值--用于表示事件通知次数,可以使用read进行数据的读取,读取到的数据就是通知的次数

假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,再去read读取出来的数字就是3,读取之后计数清0。

表达式

//主函数 #include <sys/eventfd.h> int eventfd(unsigned int initval, int flags); // 这并非标准C++类型,而是在 <sys/eventfd.h> 中定义的 typedef typedef uint64_t eventfd_t; // 封装的读取函数,等价于 read(fd, value, sizeof(uint64_t)) int eventfd_read(int fd, eventfd_t *value); // 封装的写入函数,等价于 write(fd, value, sizeof(uint64_t)) int eventfd_write(int fd, eventfd_t value);

参数 (Parameters)

eventfd()函数接受两个参数:

  • unsigned int initval:这是计数器的初始值。它决定了新创建的eventfd对象的“起始状态”。例如,如果你想在创建时就触发一个事件,可以将其设置为1;如果想等待事件发生,则设为0

  • int flags:这是行为控制标志,通过按位或(|)组合使用。

    • EFD_CLOEXEC:为新的文件描述符设置 close-on-exec 标志。这是一个非常好的实践,可以防止文件描述符意外泄漏到通过exec()系列函数启动的子进程中。

    • EFD_NONBLOCK:为文件描述符设置非阻塞模式。这使得后续的read在计数器为0时不会阻塞等待,而是立即返回-1并设置errnoEAGAIN,这在事件循环中至关重要。

    • EFD_SEMAPHORE:为read操作提供信号量语义

      • 未设置此标志(默认):read将返回计数器的当前值,并将其重置为0。这就像一个事件计数器,一次读取耗尽所有事件。

      • 设置此标志read将返回1,并将计数器减1。这就像一个经典的信号量,允许多个线程安全地消耗同一个“资源”。

返回值 (Return Value)

  • 成功时 (>= 0):返回一个非负整数,即文件描述符(file descriptor)。你可以像操作普通文件一样,将这个描述符用于readwritepollselectepollclose等系统调用。

  • 失败时 (-1):返回-1,并设置全局变量errno来指示具体的错误原因。常见的错误码包括:

    • EINVAL:传入的flags参数无效(例如,使用了不被支持的标志组合)。

    • EMFILE:当前进程已达到其可打开的文件描述符数量上限。

    • ENFILE:系统全局的文件描述符数量已达上限。

    • ENODEV:系统内存不足,无法创建新对象

示例代码:

#include <iostream> #include <thread> #include <cstring> #include <sys/eventfd.h> #include <unistd.h> void consumer(int efd) { std::cout << "[消费者] 等待事件..." << std::endl; uint64_t value = 0; // read 默认阻塞,直到 eventfd 计数器非零 ssize_t n = read(efd, &value, sizeof(value)); if (n == sizeof(value)) { std::cout << "[消费者] 收到事件!计数器值 = " << value << std::endl; // 此时 eventfd 的计数器已被内核重置为 0(默认非 EFD_SEMAPHORE 模式) } else { std::cerr << "[消费者] 读取出错" << std::endl; } } void producer(int efd) { std::cout << "[生产者] 工作开始..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作 uint64_t value = 1; // 写入任意非零值即可触发事件 std::cout << "[生产者] 工作完成,触发事件..." << std::endl; ssize_t n = write(efd, &value, sizeof(value)); if (n == sizeof(value)) { std::cout << "[生产者] 事件已发送" << std::endl; } else { std::cerr << "[生产者] 发送失败" << std::endl; } } int main() { // 创建一个初始值为 0 的 eventfd 对象 int efd = eventfd(0, 0); if (efd == -1) { std::cerr << "创建 eventfd 失败: " << strerror(errno) << std::endl; return 1; } std::cout << "eventfd 文件描述符: " << efd << std::endl; // 启动消费者(等待事件)和生产者(触发事件) std::thread t1(consumer, efd); std::thread t2(producer, efd); t1.join(); t2.join(); close(efd); return 0; }

结果为:

设计思路

作用:进行事件监控,以及事件处理的模块
关键点:这个模块与线程是一一对应关联的。

监控了一个连接,而这个连接一旦就绪,就要进行事件处理。
但是如果这个描述符,在多个线程中都触发了事件,进行处理,就会存在线程安全问题。
因此我们需要将一个连接的事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行。

如何保证一个连接的所有操作都在eventloop对应的线程中?
解决方案:给eventloop模块中,添加一个任务队列,
对连接的所有操作,都进行一次封装,将对接的操作并不直接执行,而是当作任务添加到任务队列中。

eventloop处理流程
1. 在线程中对描述符进行事件监控
2. 有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中)
3. 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务一一执行

这样就保证了链接的所有操作全在单一线程内部进行

模块设计

1. 事件监控
使用Poller模块
有事件就绪则进行事件处理

2. 执行任务队列中的任务
一个线程安全的任务队列

注意点
因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行 。因此需要有一个事件通知的东西,能够唤醒事件监控的阻塞

源码

EventLoop.hpp

#pragma once #include"Poller.hpp" #include<sys/eventfd.h> #include <atomic> #include <mutex> #include<memory> #include <thread> namespace ImMuduo { class EventLoop { using Functor = std::function<void()>; public: EventLoop(); ~EventLoop(); //判断当前执行的任务是否属于当前线程,如果不属于,就将任务添加到任务队列中;否则直接执行任务 void RunInLoop(const Functor& task); //执行任务队列中的任务 void QueueLoop(const Functor& task); //判断当前线程是否是事件循环线程 bool IsInLoop(); //添加或者修改事件循环线程的事件监控 void UpdateEvent(Channel* channel); //删除事件循环线程的事件监控 void RemoveEvent(Channel* channel); //执行所有任务 void RunAllTasks(); //启动事件循环线程:事件监控——等待执行——执行任务 void Start(); //停止事件循环线程 void Stop(); private: static int CreateEventfd(); void ReadEventfd(); void WeakUpEventfd(); private: int eventfd_;//eventfd唤醒的IO事件监控可能的阻塞 std::unique_ptr<Channel> eventfdChannel_;//eventfd的事件监控 Poller poller_;//所有描述符的事件监控 std::vector<Functor> tasks_;//任务队列 std::mutex mutex_;//互斥锁,保护任务队列 std::atomic<bool> running_;//事件循环运行标志 std::thread::id threadId_;//线程ID }; }

EventLoop.cpp

#include "EventLoop.hpp" #include "Log.hpp" #include <unistd.h> #include <cstring> namespace ImMuduo { int EventLoop::CreateEventfd() { int fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (fd < 0) { ERROR("Failed to create eventfd: %s", strerror(errno)); std::abort(); } return fd; } void EventLoop::ReadEventfd() { uint64_t val = 0; ssize_t n = ::read(eventfd_, &val, sizeof(val)); if (n < 0) { if (errno != EAGAIN) { ERROR("ReadEventfd failed: %s", strerror(errno)); } } else if (n != sizeof(val)) { WARN("ReadEventfd read %zd bytes, expected %zu", n, sizeof(val)); } } void EventLoop::WeakUpEventfd() { uint64_t val = 1; ssize_t n = ::write(eventfd_, &val, sizeof(val)); if (n != sizeof(val)) { ERROR("WeakUpEventfd write failed: %s", strerror(errno)); } } EventLoop::EventLoop() :eventfd_(CreateEventfd()), eventfdChannel_(std::make_unique<Channel>(eventfd_, &poller_)), running_(false), threadId_(std::this_thread::get_id()) { eventfdChannel_->SetReadCallback( [this]() { ReadEventfd(); }); eventfdChannel_->EnableRead(); } EventLoop::~EventLoop() { if (running_) { Stop(); } ::close(eventfd_); } void EventLoop::RunAllTasks() { std::vector<Functor> tasks; { std::lock_guard<std::mutex> lock(mutex_); tasks_.swap(tasks); } for (auto &task : tasks) { task(); } } void EventLoop::Start() { running_ = true; while (running_) { std::vector<Channel*> channels; poller_.Poll(channels); for (auto &channel : channels) { channel->HandleEvent(); } RunAllTasks(); } } void EventLoop::Stop() { running_ = false; WeakUpEventfd(); } bool EventLoop::IsInLoop() { return std::this_thread::get_id() == threadId_; } void EventLoop::RunInLoop(const Functor& task) { if (IsInLoop()) { task(); } else { QueueLoop(task); } } void EventLoop::QueueLoop(const Functor& task) { { std::lock_guard<std::mutex> lock(mutex_); tasks_.push_back(task); } //唤醒有可能因为没有事件就绪,而导致的epoll阻塞; //其实就是给eventfd写入一个数据,eventfd就会触发可读事件 WeakUpEventfd(); } void EventLoop::UpdateEvent(Channel* channel) { poller_.UpdateChannel(channel); } void EventLoop::RemoveEvent(Channel* channel) { poller_.RemoveChannel(channel); } }

TimeWheel时间轮模块整合

设计思想

timerfd:实现内核每隔一段时间,给进程一次超时事件(timerfd可读)

timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务

要实现一个完整的秒级定时器,就需要将这两个功能整合到一起。其中timerfd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimeTask,执行一下所有的过期定时任务

timerfd事件可以由EventLoop触发。

源码

timewheel.hpp

#pragma once #include "EventLoop.hpp" #include <memory> #include <cstdint> #include <vector> #include <functional> #include <unordered_map> namespace ImMuduo { using Task=std::function<void()>; using Release=std::function<void()>; class TimeTask { public: TimeTask(uint64_t id,uint32_t timeout,const Task& task); ~TimeTask(); //设置定时器任务对象被销毁时需要执行的任务 void SetRelease(const Release& release); //获取定时器任务对象超时时间 uint32_t GetTimeout()const; //获取定时器任务对象ID uint64_t GetId()const; private: uint64_t id_; //定时器任务对象ID uint32_t timeout_; //定时器任务超时时间 Task task_; //定时器需要执行的任务 Release release_; //定时器任务对象被销毁时需要执行的任务 }; class TimeWheel { using SharedTask=std::shared_ptr<TimeTask>; using WeakTask=std::weak_ptr<TimeTask>; void RemoveTimer(uint64_t id); static int CreateTimerfd(); void ReadTimerfd(); void OnTime(); public: explicit TimeWheel(EventLoop* eventLoop); //添加定时器任务对象到时间轮中 void TimerAdd(uint64_t id,uint32_t timeout,const Task& task); //刷新定时任务 void TimerRefresh(uint64_t id); //取消定时任务 void TimerCancel(uint64_t id); //执行定时任务 void TimerRunTask(); ~TimeWheel()=default; private: //添加定时器任务对象到时间轮中 void TimerAddInLoop(uint64_t id,uint32_t timeout,const Task& task); //刷新定时任务 void TimerRefreshInLoop(uint64_t id); //取消定时任务 void TimerCancelInLoop(uint64_t id); private: int ticks_; //当前秒针,走到哪里执行哪里任务 int capacity_; //最大延迟时间 std::vector<std::vector<SharedTask>>wheel_; //时间轮,包含多个槽,每个槽中包含多个定时器任务对象 std::unordered_map<uint64_t,WeakTask> taskMap_; //定时器任务对象ID与定时器任务对象的映射关系 int timerfd_; //定时器文件描述符——可读事件回调 EventLoop* eventLoop_; //事件循环指针 std::unique_ptr<Channel> timerChannel_; //定时器通道指针,用于注册定时器事件到epoll }; }

timewheel.cpp

#include "timewheel.hpp" #include "Log.hpp" #include <sys/timerfd.h> #include <cstring> #include <unistd.h> #include <algorithm> namespace ImMuduo { int TimeWheel::CreateTimerfd() { int fd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (fd < 0) { ERROR("Failed to create timerfd: %s", strerror(errno)); std::abort(); } return fd; } TimeTask::TimeTask(uint64_t id,uint32_t timeout,const Task& task) :id_(id),timeout_(timeout),task_(task),release_() {} TimeTask::~TimeTask() { task_(); release_(); } void TimeTask::SetRelease(const Release& release) { release_=release; } uint32_t TimeTask::GetTimeout()const { return timeout_; } uint64_t TimeTask::GetId()const { return id_; } TimeWheel::TimeWheel(EventLoop* eventLoop) :ticks_(0), capacity_(60), wheel_(capacity_), timerfd_(CreateTimerfd()), eventLoop_(eventLoop), timerChannel_(std::make_unique<Channel>(timerfd_, nullptr)) { timerChannel_->SetReadCallback([this]() { OnTime(); }); timerChannel_->EnableRead();//启动可读事件回调 } void TimeWheel::ReadTimerfd() { uint64_t val; ssize_t n = ::read(timerfd_, &val, sizeof(val)); if (n < 0) { if (errno != EAGAIN) { ERROR("ReadTimerfd failed: %s", strerror(errno)); } } else if (n != sizeof(val)) { WARN("ReadTimerfd read %zd bytes, expected %zu", n, sizeof(val)); } } void TimeWheel::OnTime() { ReadTimerfd(); TimerRunTask(); } void TimeWheel::RemoveTimer(uint64_t id) { if(taskMap_.find(id)!=taskMap_.end()) { taskMap_.erase(id); } } //定时任务考虑到对连接的问题,需要考虑线程安全问题 //定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题 //如果不想加锁,那就把对定期的所有操作,都放到一个线程中进行 void TimeWheel::TimerAdd(uint64_t id,uint32_t timeout,const Task& task) { eventLoop_->RunInLoop([this,id,timeout,task](){ TimerAddInLoop(id,timeout,task); }); } void TimeWheel::TimerAddInLoop(uint64_t id,uint32_t timeout,const Task& task) { SharedTask taskPtr=std::make_shared<TimeTask>(id,timeout,task); //bind写法 // taskPtr->SetRelease(std::bind(&TimeWheel::RemoveTimer,this,id)); //推荐lambda表达式 taskPtr->SetRelease([this,id](){ this->RemoveTimer(id); }); taskMap_[id]=WeakTask(taskPtr); int pos=(ticks_+timeout)%capacity_; wheel_[pos].push_back(taskPtr); } void TimeWheel::TimerRefresh(uint64_t id) { eventLoop_->RunInLoop([this,id](){ TimerRefreshInLoop(id); }); } void TimeWheel::TimerRefreshInLoop(uint64_t id) { auto it=taskMap_.find(id); //通过定时器的weak_ptr获取shared_ptr的定时器任务对象 if(it==taskMap_.end()) { return ;//没找到刷新任务,无法刷新,无法延迟 } SharedTask taskPtr=it->second.lock();//获取对应的shared_ptr的定时器任务对象 if(!taskPtr) { return ;//weak_ptr已经失效,无法刷新 } int dlay=taskPtr->GetTimeout(); int pos=(ticks_+dlay)%capacity_; wheel_[pos].push_back(taskPtr); } void TimeWheel::TimerCancel(uint64_t id) { eventLoop_->RunInLoop([this, id]() { TimerCancelInLoop(id); }); } void TimeWheel::TimerCancelInLoop(uint64_t id) { auto it = taskMap_.find(id); if (it == taskMap_.end()) return; SharedTask taskPtr = it->second.lock(); if (taskPtr) { TimeTask* raw = taskPtr.get(); for (auto &bucket : wheel_) { auto bucketIt = std::find_if(bucket.begin(), bucket.end(), [raw](const SharedTask& p) { return p.get() == raw; }); if (bucketIt != bucket.end()) { bucket.erase(bucketIt); break; } } } taskMap_.erase(id); } void TimeWheel::TimerRunTask() { ticks_=(ticks_+1)%capacity_; wheel_[ticks_].clear();//走到哪里释放哪里的任务对象 } }

EventLoop模块与TimeWheel模块联调整合

EventLoop.hpp

#pragma once #include"Poller.hpp" #include"Poller.hpp" #include"timewheel.hpp" #include<sys/eventfd.h> #include <atomic> #include <mutex> #include<memory> #include <thread> namespace ImMuduo { class EventLoop { using Functor = std::function<void()>; public: EventLoop(); ~EventLoop(); //判断当前执行的任务是否属于当前线程,如果不属于,就将任务添加到任务队列中;否则直接执行任务 void RunInLoop(const Functor& task); //执行任务队列中的任务 void QueueLoop(const Functor& task); //判断当前线程是否是事件循环线程 bool IsInLoop(); //添加或者修改事件循环线程的事件监控 void UpdateEvent(Channel* channel); //删除事件循环线程的事件监控 void RemoveEvent(Channel* channel); //执行所有任务 void RunAllTasks(); //启动事件循环线程:事件监控——等待执行——执行任务 void Start(); //停止事件循环线程 void Stop(); //添加定时任务 void TimerAdd(uint64_t id,uint32_t timeout,const Task& task); //刷新定时任务 void TimerRefresh(uint64_t id); //取消定时任务 void TimerCancel(uint64_t id); //执行定时任务 void TimerRun(); //判断定时任务是否存在 bool TimerExist(uint64_t id); private: static int CreateEventfd(); void ReadEventfd(); void WeakUpEventfd(); private: int eventfd_;//eventfd唤醒的IO事件监控可能的阻塞 std::unique_ptr<Channel> eventfdChannel_;//eventfd的事件监控 Poller poller_;//所有描述符的事件监控 std::vector<Functor> tasks_;//任务队列 std::mutex mutex_;//互斥锁,保护任务队列 std::atomic<bool> running_;//事件循环运行标志 std::thread::id threadId_;//线程ID TimeWheel timeWheel_;//定时器 }; }

EventLoop.cpp

#include "EventLoop.hpp" #include "Log.hpp" #include <unistd.h> #include <cstring> namespace ImMuduo { EventLoop::EventLoop() :eventfd_(CreateEventfd()), eventfdChannel_(std::make_unique<Channel>(eventfd_, &poller_)), running_(false), threadId_(std::this_thread::get_id()),timeWheel_(this) { eventfdChannel_->SetReadCallback( [this]() { ReadEventfd(); }); eventfdChannel_->EnableRead(); } EventLoop::~EventLoop() { if (running_) { Stop(); } ::close(eventfd_); } int EventLoop::CreateEventfd() { int fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (fd < 0) { ERROR("Failed to create eventfd: %s", strerror(errno)); std::abort(); } return fd; } void EventLoop::ReadEventfd() { uint64_t val = 0; ssize_t n = ::read(eventfd_, &val, sizeof(val)); if (n < 0) { if (errno != EAGAIN) { ERROR("ReadEventfd failed: %s", strerror(errno)); } } else if (n != sizeof(val)) { WARN("ReadEventfd read %zd bytes, expected %zu", n, sizeof(val)); } } void EventLoop::WeakUpEventfd() { uint64_t val = 1; ssize_t n = ::write(eventfd_, &val, sizeof(val)); if (n != sizeof(val)) { ERROR("WeakUpEventfd write failed: %s", strerror(errno)); } } void EventLoop::RunAllTasks() { std::vector<Functor> tasks; { std::lock_guard<std::mutex> lock(mutex_); tasks_.swap(tasks); } for (auto &task : tasks) { task(); } } void EventLoop::Start() { running_ = true; while (running_) { std::vector<Channel*> channels; poller_.Poll(channels); for (auto &channel : channels) { channel->HandleEvent(); } RunAllTasks(); } } void EventLoop::Stop() { running_ = false; WeakUpEventfd(); } bool EventLoop::IsInLoop() { return std::this_thread::get_id() == threadId_; } void EventLoop::RunInLoop(const Functor& task) { if (IsInLoop()) { task(); } else { QueueLoop(task); } } void EventLoop::QueueLoop(const Functor& task) { { std::lock_guard<std::mutex> lock(mutex_); tasks_.push_back(task); } //唤醒有可能因为没有事件就绪,而导致的epoll阻塞; //其实就是给eventfd写入一个数据,eventfd就会触发可读事件 WeakUpEventfd(); } void EventLoop::UpdateEvent(Channel* channel) { poller_.UpdateChannel(channel); } void EventLoop::RemoveEvent(Channel* channel) { poller_.RemoveChannel(channel); } void EventLoop::TimerAdd(uint64_t id, uint32_t timeout, const Task& task) { timeWheel_.TimerAdd(id, timeout, task); } void EventLoop::TimerRefresh(uint64_t id) { timeWheel_.TimerRefresh(id); } void EventLoop::TimerCancel(uint64_t id) { timeWheel_.TimerCancel(id); } void EventLoop::TimerRun() { timeWheel_.TimerRunTask(); } bool EventLoop::TimerExist(uint64_t id) { return timeWheel_.TimerExist(id); } }

本期内容到这里就结束了,喜欢请点个赞谢谢

封面图自取:

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/4 2:49:28

MoBind框架:IMU与视频数据精准对齐技术解析

1. 项目背景与核心价值在动作捕捉与行为分析领域&#xff0c;如何实现惯性测量单元&#xff08;IMU&#xff09;数据与视频画面的精准对齐一直是个技术难点。传统方案往往面临两个痛点&#xff1a;一是IMU的绝对坐标系与视频相对坐标系存在转换误差&#xff0c;二是动态动作下传…

作者头像 李华
网站建设 2026/5/4 2:48:31

大模型预训练数据集的合规构建与高效处理实践

1. 大模型预训练数据集的行业现状与挑战当前大语言模型的性能突破高度依赖海量高质量训练数据。根据2023年MLCommons报告&#xff0c;主流千亿参数模型的预训练数据消耗量已达TB级别&#xff0c;但行业面临三大核心痛点&#xff1a;数据合规风险&#xff1a;欧盟AI法案要求训练…

作者头像 李华
网站建设 2026/5/4 2:41:25

CacheMind:用自然语言优化缓存替换策略的AI工具

1. CacheMind&#xff1a;用自然语言透视缓存替换策略的革命性工具 在处理器微架构设计中&#xff0c;缓存替换策略的优化一直是个令人头疼的问题。传统方法就像在黑暗中进行手术——工程师们需要手动分析数百万条内存访问记录&#xff0c;试图从海量数据中找出性能瓶颈的蛛丝马…

作者头像 李华
网站建设 2026/5/4 2:39:26

DMAP方法:语言模型文本分析的数学基础与实践

1. DMAP方法概述&#xff1a;语言模型文本分析的新范式DMAP&#xff08;Distributional Mapping of Text through Language Models&#xff09;是一种基于严格数学原理的文本统计分析方法&#xff0c;它通过将语言模型生成的文本映射到标准化的统计表示空间&#xff0c;从根本上…

作者头像 李华