基于主从模式的Reactor的仿muduo网络库
🌇个人主页:平凡的小苏
📚学习格言:命运给你一个低的起点,是想看你精彩的翻盘,而不是让你自甘堕落,脚下的路虽然难走,但我还能走,比起向阳而生,我更想尝试逆风翻盘。
🛸C++项目实战:C++项目实战
> 家人们更新不易,你们的👍点赞👍和⭐关注⭐真的对我真重要,各位路 过的友友麻烦多多点赞关注。 欢迎你们的私信提问,感谢你们的转发! 关注我,关注我,关注我,你们将会看到更多的优质内容!!
一、Http服务器认识
概念
HTTP(Hyper Text Transfer Protocol),超⽂本传输协议是应⽤层协议,是⼀种简单的请求-响应协议(客⼾端根据⾃⼰的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)。
实现⼀个HTTP服务器很简单,但是实现⼀个⾼性能的服务器并不简单,这个单元中将讲解基于Reactor模式的⾼性能服务器实现。
当然准确来说,因为我们要实现的服务器本⾝并不存在业务,咱们要实现的应该算是⼀个⾼性能服务器基础库,是⼀个基础组件。
二、Reactor模型
概念
Reactor 模式,是指通过⼀个或多个输⼊同时传递给服务器进⾏请求处理时的事件驱动处理模式。
服务端程序处理传⼊多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫Dispatcher 模式。
简单理解就是使⽤ I/O多路复⽤ 统⼀监听事件,收到事件后分发给处理进程或线程,是编写⾼性能⽹络服务器的必备技术之⼀。
单Reactor单线程:单I/O多路复⽤+业务处理
-
通过IO多路复⽤模型进⾏客⼾端请求监控
-
触发事件后,进⾏事件处理
-
如果是新建连接请求,则获取新建连接,并添加⾄多路复⽤模型进⾏事件监控。
(图片来源网络,侵删) -
如果是数据通信请求,则进⾏对应数据处理(接收数据,处理数据,发送响应)。
优点:所有操作均在同⼀线程中完成,思想流程较为简单,不涉及进程/线程间通信及资源争抢问题。
缺点:⽆法有效利⽤CPU多核资源,很容易达到性能瓶颈。
适⽤场景:适⽤于客⼾端数量较少,且处理速度较为快速的场景。(处理较慢或活跃连接较多,会导致串⾏处理的情况下,后处理的连接⻓时间⽆法得到响应).
单Reactor多线程:单I/O多路复⽤+线程池(业务处理)
-
Reactor线程通过I/O多路复⽤模型进⾏客⼾端请求监控
-
触发事件后,进⾏事件处理
- 如果是新建连接请求,则获取新建连接,并添加⾄多路复⽤模型进⾏事件监控。
- 如果是数据通信请求,则接收数据后分发给Worker线程池进⾏业务处理。
- ⼯作线程处理完毕后,将响应交给Reactor线程进⾏数据响应
优点:充分利⽤CPU多核资源
缺点:多线程间的数据共享访问控制较为复杂,单个Reactor 承担所有事件的监听和响应,在单线程中运⾏,⾼并发场景下容易成为性能瓶颈。
多Reactor多线程:多I/O多路复⽤+线程池(业务处理)
-
在主Reactor中处理新连接请求事件,有新连接到来则分发到⼦Reactor中监控
-
在⼦Reactor中进⾏客⼾端通信监控,有事件触发,则接收数据分发给Worker线程池
-
Worker线程池分配独⽴的线程进⾏具体的业务处理
- ⼯作线程处理完毕后,将响应交给⼦Reactor线程进⾏数据响应
优点:充分利⽤CPU多核资源,主从Reactor各司其职
⽬标定位:主从Reactor模型⾼并发服务器
我要实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的⾼效性,提⾼服务器的并发性能。
主Reactor获取到新连接后分发给⼦Reactor进⾏通信事件监控。⽽⼦Reactor线程监控各⾃的描述符的读写事件进⾏数据读写以及业务处理。(该项目从Reactor主要作用:IO事件监控+IO操作+业务处理)(比较轻量)。
当前实现中,因为并不确定组件使⽤者的使⽤意向,因此并不提供业务层⼯作线程池的实现,只实现主从Reactor,⽽Worker⼯作线程池,可由组件库的使⽤者的需要⾃⾏决定是否使⽤和实现。
三、功能模块划分
我要实现的是⼀个带有协议⽀持的Reactor模型⾼性能服务器,因此将整个项⽬的实现划分为两个⼤的模块:
-
SERVER模块:实现Reactor模型的TCP服务器;
-
协议模块:对当前的Reactor模型服务器提供应⽤层协议⽀持。
SERVER模块
SERVER模块就是对所有的连接以及线程进⾏管理,让它们各司其职,在合适的时候做合适的事,最终
完成⾼性能服务器组件的实现。
⽽具体的管理也分为三个⽅⾯:
• 监听连接管理:对监听连接进⾏管理。
• 通信连接管理:对通信连接进⾏管理。
• 超时连接管理:对超时连接进⾏管理。
基于以上的管理思想,将这个模块进⾏细致的划分⼜可以划分为以下多个⼦模块:
Buffer子模块
Buffer模块是⼀个缓冲区模块,⽤于实现通信中⽤⼾态的接收缓冲区和发送缓冲区功能
意义:1、防止接收到的数据不是一条完整的数据,因此对接收的数据进行缓冲
2、对于客户端响应的数据,应该是在套接字可写的情况下进行发送
功能设计:
1、向缓冲区中添加数据
2、从缓冲区中取出数据
Socket子模块
Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。
意义:程序中对于套接字的各项操作更加简便
功能设计:
- 创建套接字
- 绑定地址信息
- 开始监听
- 向服务器发起连接
- 获取新连接
- 接收数据
- 发送数据
- 关闭套接字
- 创建一个监听连接
- 创建一个客户端连接
Channel模块
Channel模块是对⼀个描述符需要进⾏的IO事件管理的模块,实现对描述符可读,可写,错误…事件的管理操作,以及Poller模块对描述符进⾏IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。
意义:对于描述符的监控事件在用户态更容易维护,以及触发事件后的操作流程更加的清晰
功能设计:
- 对监控事件的管理
描述符是否可读
描述符是否可写
对描述符监控可读
对描述符监控可写
解除可读事件监控
解除可写事件监控
解除所有事件监控
- 对监控事件触发后的处理
设置对于不同事件的回调处理函数,明确触发了某个事件之后应该怎么处理
Connection模块
功能:
- 这是一个对于通信连接进行整体管理的一个模块,对一个连接的操作都是通过这个模块进行的
- Connection模块,一个连接有任何的事件该怎么处理都是由这个模块来进行处理的,因为组件的设计也不知道使用者要如何处理事 件,因此只能是提供一些事件回调函数由使用者设置
• Connection模块内部包含有三个由组件使⽤者传⼊的回调函数:连接建⽴完成回调,事件回调,新数据回调,关闭回调。
• Connection模块内部包含有两个组件使⽤者提供的接⼝:数据发送接⼝,连接关闭接⼝
• Connection模块内部包含有两个⽤⼾态缓冲区:⽤⼾态接收缓冲区,⽤⼾态发送缓冲区
• Connection模块内部包含有⼀个Socket对象:完成描述符⾯向系统的IO操作
• Connection模块内部包含有⼀个Channel对象:完成描述符IO事件就绪的处理
具体处理流程如下:
-
实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描述符添加到Poller事件监控中。
-
当描述符在Poller模块中就绪了IO可读事件,则调⽤描述符对应Channel中保存的读事件处理函数,进⾏数据读取,将socket接收缓冲区全部读取到Connection管理的⽤⼾态接收缓冲区中。然后调⽤由组件使⽤者传⼊的新数据到来回调函数进⾏处理。
-
组件使⽤者进⾏数据的业务处理完毕后,通过Connection向使⽤者提供的数据发送接⼝,将数据写⼊Connection的发送缓冲区中。
-
启动描述符在Poll模块中的IO写事件监控,就绪后,调⽤Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Socket进⾏⾯向系统的实际数据发送。
Acceptor模块:
Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管理。
• Acceptor模块内部包含有⼀个Socket对象:实现监听套接字的操作
• Acceptor模块内部包含有⼀个Channel对象:实现监听套接字IO事件就绪的处理
具体处理流程如下:
- 实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接
- 为新连接构建⼀个Connection对象出来。
意义:
当获取了一个新建连接的描述符之后,需要为这个通信连接,封装一个Connection对象,设置各种不同回调
注意:
因为Acceptor模块本身并不知道一个连接产生了某个事件该如何处理,因此获取一个通信连接后,Connection的封装,以及事件回调的设置都应该由服务器模块来进行
TimerQueue模块
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管理器中添加⼀个任务,任务将在固定时间后被执⾏,同时也可以通过刷新定时任务来延迟任务的执⾏
这个模块主要是对Connection对象的⽣命周期管理,对⾮活跃连接进⾏超时后的释放功能。
TimerQueue模块内部包含有⼀个timerfd:linux系统提供的定时器。
TimerQueue模块内部包含有⼀个Channel对象:实现对timerfd的IO时间就绪回调处理
功能设计:添加定时任务、刷新定时任务、希望一个定时任务重新开始计时、取消定时任务
Poller模块
Poller模块是对epoll进⾏封装的⼀个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。
意义:对epoll进行的封装,让对描述符进行事件监控的操作更加简单
功能接口:添加事件监控、Channel模块、修改事件监控、移除事件监控
EventLoop模块
EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimerQueue模块,Socket模块的⼀个整体封装,进⾏所有描述符的事件监控。
意义:对于服务器中的所有的事件都是由EventLoop模块来完成
每一个Connection连接,都会绑定一个EventLoop模块和线程,因为外界对于连接的所有操作,都是要放到同一个线程中进行的
思想:1、对所有的连接进行事件监控
2、连接触发事件后调用回调进行处理
3、对于连接的所有操作,都要放到eventloop模块来完成
功能设计:将对连接的操作任务添加到任务队列、定时任务的添加、定时任务的刷新、定时任务的取消
具体操作流程:
1、通过Poller模块对当前模块管理内的所有描述符进⾏IO事件监控,有描述符事件就绪后,通过描述符对应的Channel进⾏事件 处理。
2、所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进⾏执⾏。
3、 由于epoll的事件监控,有可能会因为没有事件到来⽽持续阻塞,导致任务队列中的任务不能及时得到执⾏,因此创建了 eventfd,添加到Poller的事件监控中,⽤于实现每次向任务队列添加任务的时候,通过向eventfd写⼊数据来唤醒epoll的阻塞
TcpServer模块
这个模块是⼀个整体Tcp服务器模块的封装,内部封装了Acceptor模块,EventLoopThreadPool模块。
• TcpServer中包含有⼀个EventLoop对象:在超轻量使⽤场景中不需要EventLoop线程池,只需要在主线程中完成所有操作的情况。
• TcpServer模块内部包含有⼀个EventLoopThreadPool对象:其实就是EventLoop线程池,也就是⼦Reactor线程池
• TcpServer模块内部包含有⼀个Acceptor对象:⼀个TcpServer服务器,必然对应有⼀个监听套接字,能够完成获取客⼾端新连接,并处理的任务。
• TcpServer模块内部包含有⼀个std::shared_ptr的hash表:保存了所有的新建连接对应的Connection,注意,所有的Connection使⽤shared_ptr进⾏管理,这样能够保证在hash表中删除了Connection信息后,在shared_ptr计数器为0的情况下完成对Connection资源的释放操作。
意义:让组件使用者可以更加轻便的完成一个服务器的搭建
功能:对于监听连接的管理,获取一个新连接之后如何处理,由Server模块设置
对于通信连接的管理,连接产生的某个事件如何处理,由Server模块设置
对于超时连接的管理,连接非活跃超时是否关闭,由Server模块设置
对于事件监控的管理,启动多少个线程,有多少个EventLoop,由Server设置
事件回调函数的设置,一个连接产生了一个事件,对于这个事件如何处理,只有组件使用者知道,因此一个事件的处理回调,一定 是组件使用者,设置给TcpServer,TcpServer设置给各个Connection连接
具体流程如下:
-
在实例化TcpServer对象过程中,完成BaseLoop的设置,Acceptor对象的实例化,以及EventLoop线程池的实例化,以及std::shared_ptr的hash表的实例化。
-
为Acceptor对象设置回调函数:获取到新连接后,为新连接构建Connection对象,设置Connection的各项回调,并使⽤shared_ptr进⾏管理,并添加到hash表中进⾏管理,并为Connection选择⼀个EventLoop线程,为Connection添加⼀个定时销毁任务,为Connection添加事件监控,
-
启动BaseLoop。
通信连接管理模块关系图
四、Server代码实现
1、前置技术知识
C++11中的bind
bind (Fn&& fn, Args&&... args);
我们可以将bind接⼝看作是⼀个通⽤的函数适配器,它接受⼀个函数对象,以及函数的各项参数,然后返回⼀个新的函数对象,但是这个函数对象的参数已经被绑定为设置的参数。运⾏的时候相当于总是调⽤传⼊固定参数的原函数。
但是如果进⾏绑定的时候,给与的参数为 std::placeholders::_1, _2... 则相当于为新适配⽣成的函数对象的调⽤预留⼀个参数进⾏传递。
2、简单的秒级定时任务实现
在当前的⾼并发服务器中,我们不得不考虑⼀个问题,那就是连接的超时关闭问题。我们需要避免⼀个连接⻓时间不通信,但是也不关闭,空耗资源的情况。这时候我们就需要⼀个定时任务,定时的将超时过期的连接进⾏释放。
Linux提供给我们的定时器
#include int timerfd_create(int clockid, int flags); clockid: CLOCK_REALTIME-系统实时时间,如果修改了系统时间就会出问题; CLOCK_MONOTONIC-从开机到现在的时间是⼀种相对时间; flags: 0-默认阻塞属性 返回值:小于0则为错误 int timerfd_settime(int fd, int flags, struct itimerspec *new, structitimerspec *old); fd: timerfd_create返回的⽂件描述符 flags: 0-相对时间, 1-绝对时间;默认设置为0即可. new: ⽤于设置定时器的新超时时间 old: ⽤于接收原来的超时时间 struct timespec { time_t tv_sec; /* Seconds */ long tv_nsec; /* Nanoseconds */ }; struct itimerspec { struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */ struct timespec it_value; /* 第⼀次超时时间 */ }; 定时器会在每次超时时,⾃动给fd中写⼊8字节的数据,表⽰在上⼀次读取数据到当前读取数据期间超时了多少次。
示例:
#include #include #include #include int main() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if(timerfd
上边例⼦,是⼀个定时器的使⽤⽰例,是每隔3s钟触发⼀次定时器超时,否则就会阻塞在read读取数据这⾥
时间轮思想
-
上述的例⼦,存在⼀个很⼤的问题,每次超时都要将所有的连接遍历⼀遍,如果有上万个连接,效率⽆疑是较为低下的。
-
时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针⾛到3的时候,就代表时间到了。
-
同样的道理,如果我们定义了⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中⾛⼀步,三秒钟后tick⾛到对应位置,这时候执⾏对应位置的任务即可。
-
但是,同⼀时间可能会有⼤批量的定时任务,因此我们可以给数组对应位置下拉⼀个数组,这样就可
以在同⼀个时刻上添加多个定时任务了。
-
上述操作也有⼀些缺陷,⽐如我们如果要定义⼀个60s后的任务,则需要将数组的元素个数设置为60才可以,如果设置⼀⼩时后的定时任务,则需要定义3600个元素的数组,这样⽆疑是⽐较⿇烦的。
-
因此,可以采⽤多层级的时间轮,有秒针轮,分针轮,时针轮,当指针指向了时针轮所对应的位置的时候,那么就会像分针轮进行移动,当指针指向了分针轮所对应的位置的时候,指针就会向秒针轮进行移动。
-
因为当前我们的应⽤中,倒是不⽤设计的这么⿇烦,因为我们的定时任务通常设置的30s以内,所以简单的单层时间轮就够⽤了。
但是,我们也得考虑⼀个问题,当前的设计是时间到了,则主动去执⾏定时任务,释放连接,那能不能在时间到了后,⾃动执⾏定时任务呢?
作为一个时间轮定时器,本身并不关注任务类型,只要是时间到了就需要被执行。
解决方案:类的析构函数 + 智能指针share_ptr, 通过这两个技术可以实现定时任务的延时
1、使用一个类,对定时任务进行封装,类实例化的每一个对象,就是一个定时任务对象,当对象被销毁的时候,再去执行定时任务
(将定时任务的执行,放到析构函数中)
2、shared_ptr用于对new的对象进行空间管理,当shared_ptr对一个对象进行管理的时候,内部有一个计数器,计数器为0的时候,
则释放所管理的对象。
int *a = new int;
std::shared_ptr pi(a); —a对象只有在pi技术为0的时候,才会被释放
std::shared_ptr pi1(pi) --当针对pi又构建了一个shared_ptr对象,则pi和pi1计数器为2
当pi和pi1中任意一个被释放的时候,只是计数器-1,因此他们管理的a对象并没有被释放,
只有当pi和pi1都被释放了,计数器为0了,这时候才会释放管理的a 对象
示例代码:
#include #include #include #include #include #include #include using std::cout; using std::endl; using TaskFunc = std::function; using ReleaseFunc = std::function; class TimerTask { public: TimerTask(uint64_t id, uint32_t delay,const TaskFunc &cb) :_id(id),_timeout(delay),_task_cb(cb),_canceled(false) {} void SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t DelayTime() { return _timeout; } void Cancel() { _canceled = true; } ~TimerTask() { if(_canceled == false) _task_cb();//定时任务没有被取消才会执行 _release(); } private: uint64_t _id;//定时器任务对象ID uint32_t _timeout;//定时任务的超时时间 TaskFunc _task_cb;//定时器对象要执行的定时任务 ReleaseFunc _release;//用于删除TimerWheel中保存的定时器对象信息 bool _canceled; //false-表示任务没有被取消, true-表示任务被取消 }; class TimerWheel { public: TimerWheel() :_capacity(60),_tick(0),_wheel(_capacity) {} void TimerAdd(u_int64_t id, uint32_t delay, const TaskFunc &cb)//添加定时任务 { PtrTask pt(new TimerTask(id,delay,cb)); pt->SetRelease(std::bind(&TimerWheel::RemoveTimer,this,id)); _timers[id] = WeakTask(pt); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } //刷新/延迟定时任务 void TimerRefresh(u_int64_t id) { //通过保存我的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中 auto it = _timers.find(id); if(it == _timers.end()) { return;//没有找到定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptr int delay = pt->DelayTime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } //这个函数应该每秒钟执行一次,相当于秒针向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); } //取消定时任务 void TimerCancel(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) { return;//没有找到定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock(); if(pt) pt->Cancel(); } private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if(it != _timers.end()) { _timers.erase(it); } } private: using PtrTask = std::shared_ptr; using WeakTask = std::weak_ptr; int _tick; //当前的秒针,走到哪里释放哪里,就相当于执行哪里的任务 int _capacity; //表盘最大数量---其实就是最大延迟时间 std::vector _wheel; std::unordered_map _timers; }; class Test { public: Test() {cout HandleEvent(); } //3、执行任务 // RunAllTask(); } //这个接口存在线程安全问题--这个接口不能被外界使用者调用,只能在模块内,在对应的eventloop线程内执行 bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); } private: void RunAllTask()//执行所有任务池的任务 { std::vector functor; { std::unique_lock _lock(_mutex);//加锁保护交换操作,交换操作不上线程安全的 _tasks.swap(functor); } for(auto &f : functor) { f(); } } private: int _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞 std::thread::id _thread_id;//线程ID Poller _poller; std::unique_ptr_event_channel;//在eventloop释放的时候他也要释放,所以用智能指针 std::vector _tasks;//任务池 std::mutex _mutex;//实现任务池操作的线程安全 TimerWheel _timer_wheel; };
11、定时器模块
timefd:实现内核每隔一段事件,给进程一次超时事件(timefd可读)
timewheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务
要实现一个完整的秒级定时器,就需要将这两个功能整合到一起
timefd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timewheel的runtimetask,执行一下所有的过期定时任务
using TaskFunc = std::function; using ReleaseFunc = std::function; class TimerTask { public: TimerTask(uint64_t id, uint32_t delay,const TaskFunc &cb) :_id(id),_timeout(delay),_task_cb(cb),_canceled(false) {} void SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t DelayTime() { return _timeout; } void Cancel() { _canceled = true; } ~TimerTask() { if(_canceled == false) _task_cb();//定时任务没有被取消才会执行 _release(); } private: uint64_t _id;//定时器任务对象ID uint32_t _timeout;//定时任务的超时时间 TaskFunc _task_cb;//定时器对象要执行的定时任务 ReleaseFunc _release;//用于删除TimerWheel中保存的定时器对象信息 bool _canceled; //false-表示任务没有被取消, true-表示任务被取消 }; class EventLoop; class TimerWheel { public: TimerWheel(EventLoop* loop) :_capacity(60),_tick(0),_wheel(_capacity) ,_timefd(CreateTimerfd()) ,_timer_channel(new Channel(_loop, _timefd)) ,_loop(loop) { _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); _timer_channel->EnableRead();//启动读事件监控 } void ReadTimefd() { uint64_t times; int ret = read(_timefd, ×, 8); if(ret fatal("READ TIMERFD FAILED!"); abort(); } } bool HasTimer(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; } static int CreateTimerfd() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if(timerfd SetRelease(std::bind(&TimerWheel::RemoveTimer,this,id)); _timers[id] = WeakTask(pt); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } //刷新/延迟定时任务 void TimerRefresh(u_int64_t id); void TimerRefreshInLoop(u_int64_t id) { //通过保存我的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中 auto it = _timers.find(id); if(it == _timers.end()) { return;//没有找到定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptr int delay = pt->DelayTime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } //这个函数应该每秒钟执行一次,相当于秒针向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); } void OnTime() { ReadTimefd(); RunTimerTask(); } //取消定时任务 void TimerCancel(uint64_t id); void TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) { return;//没有找到定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock(); if(pt) pt->Cancel(); } private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if(it != _timers.end()) { _timers.erase(it); } } private: using PtrTask = std::shared_ptr; using WeakTask = std::weak_ptr; int _timefd;//定时器描述符 EventLoop* _loop; std::unique_ptr _timer_channel; int _tick; //当前的秒针,走到哪里释放哪里,就相当于执行哪里的任务 int _capacity; //表盘最大数量---其实就是最大延迟时间 std::vector _wheel; std::unordered_map _timers; };
12、Connection模块
目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成
功能设计:
1、套接字的管理,能够进行套接字的操作
2、连接事件的管理,可读,可写,错误,挂断,任意
3、缓冲区的管理,便于socket数据的接收和发送
4、协议上下文的管理,记录请求数据的处理过程
5、回调函数的管理
因为连接接收到数据之后该如何处理,由用户决定,因此必须有业务处理回调函数
一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数
一个连接关闭前,该如何处理,由用户决定,因此必须有关闭连接回调函数。
任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数
功能:
1、发送数据 — 给用户提供的发送数据接口,并不是真正的发送接口,而是把数据放到发送缓冲区,然后启动写事件监控
2、关闭连接 — 给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
3、启动非活跃连接的超时销毁功能
4、取消非活跃连接的超时销毁功能
5、协议切换 — 一个连接接收数据后如何进行业务处理,取决上下文,以及数据的业务处理回调函数
Connection模块是对连接的管理模块,对于连接的所有操作都是通过这个模块完成的
场景:对连接进行操作的时候,但是连接已经被释放,导致内存访问错误,最终程序崩溃
解决方案:使用只能指针shared_ptr对Connection对象进行管理,这样就能保证任意一个地方对Connection对象进行操作的时候,保 存了一份shared_ptr,因此就算其他地方进行释放操作,也只是对shared_ptr的计数器-1,而不会导致Connection的实际释放
typedef enum { DISCONNECTED,//连接关闭状态 CONNECTING, //连接建立成功--待处理状态 CONNECTED, //连接建立完成--各种设置已完成,可以通信的状态 DISCONNECTING//待关闭状态 }ConnStatu; class Connection; using PtrConnection = std::shared_ptr; class Connection : public std::enable_shared_from_this { private: uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找 //uint64_t _timer_id; //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器ID int _sockfd; // 连接关联的文件描述符 bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为false EventLoop *_loop; // 连接所关联的一个EventLoop ConnStatu _statu; // 连接状态 Socket _socket; // 套接字操作管理 Channel _channel; // 连接的事件管理 Buffer _in_buffer; // 输入缓冲区---存放从socket中读取到的数据 Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据 Any _context; // 请求的接收处理上下文 /*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*/ /*换句话说,这几个回调都是组件使用者使用的*/ using ConnectedCallback = std::function; using MessageCallback = std::function; using ClosedCallback = std::function; using AnyEventCallback = std::function; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; /*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*/ /*就应该从管理的地方移除掉自己的信息*/ ClosedCallback _server_closed_callback; private: /*五个channel的事件回调函数*/ //描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback void HandleRead() { //1. 接收socket的数据,放到缓冲区 char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, 65535); if (ret 0) { //shared_from_this--从当前对象自身获取自身的shared_ptr管理对象 return _message_callback(shared_from_this(), &_in_buffer); } } //描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送 void HandleWrite() { //_out_buffer中保存的数据就是要发送的数据 ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize()); if (ret 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release();//这时候就是实际的关闭释放操作了。 } _out_buffer.MoveReadOffset(ret);//千万不要忘了,将读偏移向后移动 if (_out_buffer.ReadAbleSize() == 0) { _channel.DisableWrite();// 没有数据待发送了,关闭写事件监控 //如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放 if (_statu == DISCONNECTING) { return Release(); } } return; } //描述符触发挂断事件 void HandleClose() { /*一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接*/ if (_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); } //描述符触发出错事件 void HandleError() { return HandleClose(); } //描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务; 2. 调用组件使用者的任意事件回调 void HandleEvent() { if (_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); } if (_event_callback) { _event_callback(shared_from_this()); } } //连接获取之后,所处的状态下要进行各种设置(启动读监控,调用回调函数) void EstablishedInLoop() { // 1. 修改连接状态; 2. 启动读事件监控; 3. 调用回调函数 assert(_statu == CONNECTING);//当前的状态必须一定是上层的半连接状态 _statu = CONNECTED;//当前函数执行完毕,则连接进入已完成连接状态 // 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁 _channel.EnableRead(); if (_connected_callback) _connected_callback(shared_from_this()); } //这个接口才是实际的释放接口 void ReleaseInLoop() { //1. 修改连接状态,将其置为DISCONNECTED _statu = DISCONNECTED; //2. 移除连接的事件监控 _channel.Remove(); //3. 关闭描述符 _socket.Close(); //4. 如果当前定时器队列中还有定时销毁任务,则取消任务 if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop(); //5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数 if (_closed_callback) _closed_callback(shared_from_this()); //移除服务器内部管理的连接信息 if (_server_closed_callback) _server_closed_callback(shared_from_this()); } //这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控 void SendInLoop(Buffer &buf) { if (_statu == DISCONNECTED) return ; _out_buffer.WriteBufferAndPush(buf); if (_channel.WriteAble() == false) { _channel.EnableWrite(); } } //这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送 void ShutdownInLoop() { _statu = DISCONNECTING;// 设置连接为半关闭状态 if (_in_buffer.ReadAbleSize() > 0) { if (_message_callback) _message_callback(shared_from_this(), &_in_buffer); } //要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭 if (_out_buffer.ReadAbleSize() > 0) { if (_channel.WriteAble() == false) { _channel.EnableWrite(); } } if (_out_buffer.ReadAbleSize() == 0) { Release(); } } //启动非活跃连接超时释放规则 void EnableInactiveReleaseInLoop(int sec) { //1. 将判断标志 _enable_inactive_release 置为true _enable_inactive_release = true; //2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可 if (_loop->HasTimer(_conn_id)) { return _loop->TimerRefresh(_conn_id); } //3. 如果不存在定时销毁任务,则新增 _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this)); } void CancelInactiveReleaseInLoop() { _enable_inactive_release = false; if (_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); } } void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; } public: Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd), _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd) { _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this)); _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this)); _channel.SetReadCallback(std::bind(&Connection::HandleRead, this)); _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this)); _channel.SetErrorCallback(std::bind(&Connection::HandleError, this)); } ~Connection() { logger->debug("RELEASE CONNECTION:%p", this); } //获取管理的文件描述符 int Fd() { return _sockfd; } //获取连接ID int Id() { return _conn_id; } //是否处于CONNECTED状态 bool Connected() { return (_statu == CONNECTED); } //设置上下文--连接建立完成时进行调用 void SetContext(const Any &context) { _context = context; } //获取上下文,返回的是指针 Any *GetContext() { return &_context; } void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; } //连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callback void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); } //发送数据,将数据放到发送缓冲区,启动写事件监控 void Send(const char *data, size_t len) { //外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行 //因此有可能执行的时候,data指向的空间有可能已经被释放了。 Buffer buf; buf.WriteAndPush(data, len); _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf))); } //提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理 void Shutdown() { _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this)); } void Release() { _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this)); } //启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务 void EnableInactiveRelease(int sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); } //取消非活跃销毁 void CancelInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this)); } //切换协议---重置上下文以及阶段性回调处理函数 -- 而是这个接口必须在EventLoop线程中立即执行 //防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。 void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _loop->AssertInLoop(); _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event)); } };
13、Acceptor模块
Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管理。
• Acceptor模块内部包含有⼀个Socket对象:实现监听套接字的操作
• Acceptor模块内部包含有⼀个Channel对象:实现监听套接字IO事件就绪的处理
具体处理流程如下:
- 实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接
- 为新连接构建⼀个Connection对象出来。
意义:
当获取了一个新建连接的描述符之后,需要为这个通信连接,封装一个Connection对象,设置各种不同回调
注意:
因为Acceptor模块本身并不知道一个连接产生了某个事件该如何处理,因此获取一个通信连接后,Connection的封装,以及事件回调的设置都应该由服务器模块来进行
class Acceptor { using AcceptCallback = std::function; public: //不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动 //否则有可能造成启动监控后,立即有事件,回调函数还没设置:新连接得不到处理 Acceptor(EventLoop* loop, int port) :_socket(CreateServer(port)),_loop(loop),_channel(loop,_socket.Fd()) {} void SetAcceptCallback(const AcceptCallback& cb) { _accept_callback = cb; } void Listen() { _channel.SetReadCallback(std::bind(&Acceptor::HandleRead,this)); _channel.EnableRead(); } ~Acceptor() { if(_socket.Fd() > 0) { _socket.Close(); } } private: //监听套接字的读事件回调处理函数---获取新连接,调用_accept_callback函数进行新连接处理 void HandleRead() { int newfd = _socket.Accept(); if(newfd
14、LoopThread模块
目标:将EventLoop模块与线程整合起来
EventLoop模块与线程是一一对应的。
EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id,而后边当运行一个操作的时候判断当前是否
运行在eventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的thread_id进行一个比较,相同就表示
在同一个线程,不同就表示当前运行线程不是EventLoop线程
含义:EventLoop模块在实例化对象的时候,必须在线程内部
EventLoop实例化对象时会设置自己的thread_id
如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置
存在问题:在构造EventLoop对象,到设置新的thread_id期间将是不可控的
因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象
构造一个新的模块:LoopThread
这个模块的功能:将EventLoop与thread整合到一起
思想:
1、创建线程
2、在线程中实例化EventLoop对象
功能:可以向外部返回实例化的EventLoop
class LoopThread { public: //创建线程,设定线程入口函数 LoopThread() :_loop(nullptr),_thread(std::thread(&LoopThread::ThreadEntry,this)) {} //返回当前线程关联的EventLoop对象指针 EventLoop* GetLoop() { EventLoop* loop = nullptr; { std::unique_lock lock(_mutex);//加锁 _cond.wait(lock,[&](){return _loop != nullptr;});//loop为空就一直阻塞 loop = _loop; } return loop; } private: //实例化EventLoop对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能 void ThreadEntry() { EventLoop loop; { std::unique_lock lock(_mutex);//加锁 _loop = &loop; _cond.notify_all(); } loop.Start(); } private: //用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop std::mutex _mutex;//互斥锁 std::condition_variable _cond;//条件变量 std::thread _thread;//EventLoop对应的线程 EventLoop* _loop;//EventLoop指针变量,这个对象需要在线程内实例化 };
15、LoopThreadPool模块
设计一个线程池:
LoopThreadPool模块:对所有的LoopThread进行管理及分配
功能:
1、线程数量可配置
注意事项:在服务器中,主从Reactor模型是主线程只负责连接获取,从属线程负责新连接的事件监控及处理
因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器,一个线程及辅助获取连接,也负责连接的处理
2、对所有的线程进行管理,其实就是管理0个或多个LoopThread对象
3、提供线程分配的功能
当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理
假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理
假设有多个从属线程,则采用轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)
class LoopThreadPool { public: LoopThreadPool(EventLoop* baseloop):_thread_count(0),_next_loop_idx(0),_baseloop(baseloop){} //设置线程数量 void SetThreadCount(int count){_thread_count = count;} void Create()//创建所有的从属线程 { if(_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for(int i = 0; i GetLoop(); } } } EventLoop* NextLoop() { if(_thread_count == 0) return _baseloop; _next_loop_idx = (_next_loop_idx + 1) % _thread_count; return _loops[_next_loop_idx]; } private: int _thread_count;//线程数量 int _next_loop_idx;//索引 EventLoop* _baseloop;//住EvnetLoop,运行在主线程,从属线程数量为0,则所有操作都在baseloop中进行 std::vector _threads;//保存所有的LoopThread对象 std::vector _loops;//从属线程数量大于0则从_loops进行线程EventLoop分配 };
16、TcpServer模块
对前边所有子模块的整合模块,是提供给用户用于搭建一个高性能服务器的模块
让组件使用者可以更加轻便的完成一个服务器的搭建
管理:
1、Acceptor对象,创建一个监听字
2、EventLoop对象,baseloop对象,实现对监听套接字的事件监控
3、std::unordered_map _conns,实现对所有新建连接的管理
4、LoopThreadPool对象,创建loop线程池,对新建连接进行事件监控及处理
功能:
1、设置从属线程池的数量
2、启动服务器
3、设置各种回调函数(连接建立完成,消息,关闭,任意),用户设置给TcpServer,TcpServer设置给获取的新连接
4、是否启动非活跃连接超时销毁功能
5、添加定时任务功能
流程:
1、在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象(baseloop)
2、将Acceptor挂到baseloop上进行事件监控
3、一旦Acceptor挂到baseloop上进行事件监控
4、对新连接,创建一个Connection进行管理
5、对连接对应的ConnEction设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)
6、启动Connection的非活跃连接的超时销毁规则
7、将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的EventLoop中进行事件监控
8、一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,
读取完毕后调用TcpServer设置的消息回调
class TcpServer { using ConnectedCallback = std::function; using MessageCallback = std::function; using ClosedCallback = std::function; using AnyEventCallback = std::function; using Functor = std::function; public: TcpServer(int port) :_next_id(0),_port(port),_enable_inactive_release(false) ,_acceptor(&_baseloop,_port),_pool(&_baseloop) { _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1)); _acceptor.Listen();//将监听套接字挂到baseloop上开始监听事件 } void SetThreadCount(int count) { _pool.SetThreadCount(count); } void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; } void Start() { _pool.Create(); //创建线程池的从属线程 _baseloop.Start(); } void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; } void RunAfter(const Functor& task, int delay)//用于添加定时任务 { _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,delay)); } private: void RunAfterInLoop(const Functor& task, int delay) { _next_id++; _baseloop.TimerAdd(_next_id,delay,task); } void NewConnection(int fd)//为新连接构造一个Connection进行管理 { _next_id++; PtrConnection conn(new Connection(_pool.NextLoop(),_next_id, fd)); conn->SetMessageCallback (_message_callback); conn->SetClosedCallback(_closed_callback); conn->SetConnectedCallback(_connected_callback); conn->SetAnyEventCallback(_event_callback); conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1)); if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//启动非活跃销毁功能 conn->Established(); _conns.insert(std::make_pair(_next_id, conn)); } void RemoveConnectionInLoop(const PtrConnection& conn) { int id = conn->Id(); auto it = _conns.find(id); if(it != _conns.end()) { _conns.erase(it); } } void RemoveConnection(const PtrConnection& conn)//从管理Connection的_conns中移除连接信息 { _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn)); } private: uint64_t _next_id;//自动增长的连接ID int _port; int _timeout; //非活跃练级的统计事件---多长时间不通信是非活跃连接 bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志 Acceptor _acceptor; //这是监听套接字的管理对象 EventLoop _baseloop;//主线程的eventloop对象,负责监听事件的处理 LoopThreadPool _pool;//从属EventLoop线程池 std::unordered_map _conns; // 保存管理所有连接对应的shared_ptr对象 ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; ClosedCallback _server_closed_callback; }; //这里是声明与定义的分离函数,因为他们使用了不同的类 void Channel::Remove(){ _loop->RemoveEvent(this);}//移除监控 void Channel::Update(){ _loop->UpdateEvent(this);} void TimerWheel::TimerAdd(u_int64_t id, uint32_t delay, const TaskFunc &cb)//添加定时任务 { _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id,delay,cb)); } void TimerWheel::TimerRefresh(u_int64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id)); } void TimerWheel::TimerCancel(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id)); } class NetWork { public: NetWork() { logger->debug("SIGPIPE INIT"); signal(SIGPIPE, SIG_IGN); } }; static NetWork nw;
17、echoServer回显服务器
#pragma once #include "../server.hpp" class EchoServer { public: EchoServer(int port):_server(port) { _server.SetThreadCount(2); _server.EnableInactiveRelease(10); _server.SetClosedCallback(std::bind(&EchoServer::OnClosed,this,std::placeholders::_1)); _server.SetConnectedCallback(std::bind(&EchoServer::OnConnected,this,std::placeholders::_1)); _server.SetMessageCallback(std::bind(&EchoServer::OnMessage,this,std::placeholders::_1,std::placeholders::_2)); } void Start() { _server.Start(); } private: void OnClosed(const PtrConnection& conn) { logger->debug("CLOSED CONNECTION:%p",conn.get()); } void OnConnected(const PtrConnection& conn) { logger->debug("NEW CONNECTION:%p",conn.get()); } void OnMessage(const PtrConnection& conn, Buffer *buf) { conn->Send(buf->ReadPosition(),buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); } private: TcpServer _server; };
五、HTTP协议模块代码实现
Http协议模块:
用于对高并发服务器模块进行协议支持,基于提供的协议支持能够更方便的完成指定协议服务器的搭建。
而Http协议支持模块的实现,可以细分如下模块
Util模块:
这个模块是一个根据模块,主要提供HTTP协议模块所用到的一些工具函数,比如url编码,文件读写…等
HttpRequest模块:
这个模块是HTTP请求数据模块,用于保存HTTP请求数据被解析后的各项请求元素信息。
HttpResponse模块:
这个模块是HTTP响应数据模块,用于业务处理后设置并保存HTTP响应数据的各项元素信息,最终会被按照HTTP协议响应格式
组织成为响应信息发送给客户端。
HttpContext模块:
这个模块是一个HTTP请求接收的上下文模块,主要是为了防止再一次接收的数据中,不是一个完整的HTTP请求,则解析过程并未完成,无法进行完整的请求处理,需要在下次接收到新数据后根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。
HttpServer模块:
这个模块是最终给组件使用者提供的HTTP服务器模块了,用于以简单的接口实现HTTP服务器的搭建。
HttpServer模块内容包含一个TcpServer对象:TcpServer对象实现服务器的搭建
HttpServer模块内部包含有两个提供给TcpServer对象的接口:连接建立成功设置上下文接口,数据处理接口
HttpServer模块内部包含有一个hash-map表存储请求与处理函数的映射表:组件使用者向HttpServer设置那些请求
应该使用那些函数进行处理,等TcpServer收到对应的请求就会使用对应的函数进行处理。
1、Util模块代码实现
#pragma once #include "../server.hpp" #include #include class Util { public: //字符串分割函数,将src字符串按照sep字符进行分割,得到的各个子串放到arry中,最终返回子串的数量 static size_t Split(const std::string &src, const std::string &sep, std::vector *arry) { int idx = 0; while(idx push_back(src.substr(idx)); return arry->size(); } if(pos == idx) { idx = pos + sep.size(); continue;//当字串为空,没必要添加 } arry->push_back(src.substr(idx, pos - idx)); idx = pos + sep.size(); } return arry->size(); } //读取文件所有内容 static bool ReadFile(const std::string& filename, std::string* buf) { std::ifstream ifs(filename, std::ios::binary); if(ifs.is_open() == false) { logger->error("OPEN %s FILE FAILED!!", filename.c_str()); return false; } size_t fsize = 0; ifs.seekg(0,ifs.end); fsize = ifs.tellg(); ifs.seekg(0,ifs.beg); buf->resize(fsize); ifs.read(&(*buf)[0],fsize); if(ifs.good() == false) { logger->error("READ %s FILE FAILED!!",filename.c_str()); ifs.close(); return false; } ifs.close(); return true; } //向文件写入数据 static bool WriteFile(const std::string &filename, const std::string &buf) { std::ofstream ofs(filename,std::ios::binary); if(ofs.is_open() == false) { logger->error("OPEN %s FILE FAILED!!",filename.c_str()); return false; } ofs.write(buf.c_str(), buf.size()); if(ofs.good() == false) { logger->error("WRITE %s FILE FAILED!", filename.c_str()); ofs.close(); return false; } ofs.close(); return true; } //URL编码,避免URL中子源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义 //编码格式:将特殊字符的ascii值,转换为两个16禁止字符,前缀% //不编码字符:RFC3986文档规定 . - _ ~以及字母和数字属于绝对不编码字符 //W3C标准中规定param中的空格必须被编码为+ //RFC2396规定URI中的保留字符需要转换为%HH格式 static std::string UrlEncode(const std::string url, bool convert_space_to_plus) { std::string res; for(auto& c : url) { if(c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c)) { res += c; continue; } if(c == ' ' && convert_space_to_plus) { res += '+'; continue; } //剩下的字符都是需要编码成为%HH格式 char tmp[4] = {0}; snprintf(tmp, 4, "%%%02X" ,c); res += tmp; } return res; } static char HEXTOI(char c) { if(c >= '0' && c = 'a' && c = 'A' && c 2b %2b->2 second; } return "Unknow"; } //根据文件后缀名获取文件mime static std::string ExtMime(const std::string &filename) { std::unordered_map _mime_msg = { {".aac", "audio/aac"}, {".abw", "application/x-abiword"}, {".arc", "application/x-freearc"}, {".avi", "video/x-msvideo"}, {".azw", "application/vnd.amazon.ebook"}, {".bin", "application/octet-stream"}, {".bmp", "image/bmp"}, {".bz", "application/x-bzip"}, {".bz2", "application/x-bzip2"}, {".csh", "application/x-csh"}, {".css", "text/css"}, {".csv", "text/csv"}, {".doc", "application/msword"}, {".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"}, {".eot", "application/vnd.ms-fontobject"}, {".epub", "application/epub+zip"}, {".gif", "image/gif"}, {".htm", "text/html"}, {".html", "text/html"}, {".ico", "image/vnd.microsoft.icon"}, {".ics", "text/calendar"}, {".jar", "application/java-archive"}, {".jpeg", "image/jpeg"}, {".jpg", "image/jpeg"}, {".js", "text/javascript"}, {".json", "application/json"}, {".jsonld", "application/ld+json"}, {".mid", "audio/midi"}, {".midi", "audio/x-midi"}, {".mjs", "text/javascript"}, {".mp3", "audio/mpeg"}, {".mpeg", "video/mpeg"}, {".mpkg", "application/vnd.apple.installer+xml"}, {".odp", "application/vnd.oasis.opendocument.presentation"}, {".ods", "application/vnd.oasis.opendocument.spreadsheet"}, {".odt", "application/vnd.oasis.opendocument.text"}, {".oga", "audio/ogg"}, {".ogv", "video/ogg"}, {".ogx", "application/ogg"}, {".otf", "font/otf"}, {".png", "image/png"}, {".pdf", "application/pdf"}, {".ppt", "application/vnd.ms-powerpoint"}, {".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"}, {".rar", "application/x-rar-compressed"}, {".rtf", "application/rtf"}, {".sh", "application/x-sh"}, {".svg", "image/svg+xml"}, {".swf", "application/x-shockwave-flash"}, {".tar", "application/x-tar"}, {".tif", "image/tiff"}, {".tiff", "image/tiff"}, {".ttf", "font/ttf"}, {".txt", "text/plain"}, {".vsd", "application/vnd.visio"}, {".wav", "audio/wav"}, {".weba", "audio/webm"}, {".webm", "video/webm"}, {".webp", "image/webp"}, {".woff", "font/woff"}, {".woff2", "font/woff2"}, {".xhtml", "application/xhtml+xml"}, {".xls", "application/vnd.ms-excel"}, {".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"}, {".xml", "application/xml"}, {".xul", "application/vnd.mozilla.xul+xml"}, {".zip", "application/zip"}, {".3gp", "video/3gpp"}, {".3g2", "video/3gpp2"}, {".7z", "application/x-7z-compressed"} }; size_t pos = filename.find_last_of('.'); if(pos != std::string::npos) { std::string ext = filename.substr(pos); auto it = _mime_msg.find(ext); if(it == _mime_msg.end()) { return "application/octet-stream"; } return it->second; } return "application/octet-stream"; } //判断一个文件是否是一个目录 static bool IsDirectory(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), & st); if(ret
2、HttpRequest模块
http请求信息模块:存储HTTP请求信息要素,提供简单的功能性接口
请求信息要素:
请求行:请求方法,URL,协议版本
URL:资源路径,查询字符串
GET /search?word=C++&en=utf8 HTTP/1.1
请求头部:key: val\r\nkey:val\r\n…
Content-Length:0\r\n
正文
要素:请求方法,资源路径,查询字符串,头部字段,正文,协议版本
功能性接口:
1、将成员变量设置为公有成员,便于直接访问
2、提供查询字符串,以及头部字段的单个查询和获取,插入功能
3、获取正文长度
4、判断长连接&段连接 Connection:close / keep-alive
class HttpRequest { public: std::string _method; //请求方法 std::string _path; //资源路径 std::string _version; //协议版本 std::string _body; //请求正文 std::smatch _matches; //资源路径的正则提取数据 std::unordered_map _headers; //头部字段 std::unordered_map _params; //查询字符串 public: HttpRequest():_version("HTTP/1.1") {} void ReSet() { _method.clear(); _path.clear(); _version = "HTTP/1.1"; _body.clear(); std::smatch match; _matches.swap(match); _headers.clear(); _params.clear(); } //插入头部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } //判断是否存在指定头部字段 bool HasHeader(const std::string &key) const { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } //获取指定头部字段的值 std::string GetHeader(const std::string &key) const { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } //插入查询字符串 void SetParam(const std::string &key, const std::string &val) { _params.insert(std::make_pair(key, val)); } //判断是否有某个指定的查询字符串 bool HasParam(const std::string &key) const { auto it = _params.find(key); if (it == _params.end()) { return false; } return true; } //获取指定的查询字符串 std::string GetParam(const std::string &key) const { auto it = _params.find(key); if (it == _params.end()) { return ""; } return it->second; } //获取正文长度 size_t ContentLength() const { // Content-Length: 1234\r\n bool ret = HasHeader("Content-Length"); if (ret == false) { return 0; } std::string clen = GetHeader("Content-Length"); return std::stol(clen); } //判断是否是短链接 bool Close() const { // 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; } };
3、HttpReponse模块
功能:存储HTTP响应信息要素,提供简单的功能性接口
响应信息要素:
1、响应状态码
2、头部字段
3、响应正文
4、重定向信息(是否进行了重定向的标志,重定向的路径)
功能性接口:w欸蓝便于成员的访问,设置为公有成员
1、头部字段的新增
2、正文的设置
3、重定向的设置
4、长短连接的判断
class HttpResponse { public: int _statu; bool _redirect_flag; std::string _body; std::string _redirect_url; std::unordered_map _headers; public: HttpResponse():_redirect_flag(false), _statu(200) {} HttpResponse(int statu):_redirect_flag(false), _statu(statu) {} void ReSet() { _statu = 200; _redirect_flag = false; _body.clear(); _redirect_url.clear(); _headers.clear(); } //插入头部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } //判断是否存在指定头部字段 bool HasHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } //获取指定头部字段的值 std::string GetHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } void SetContent(const std::string &body, const std::string &type = "text/html") { _body = body; SetHeader("Content-Type", type); } void SetRedirect(const std::string &url, int statu = 302) { _statu = statu; _redirect_flag = true; _redirect_url = url; } //判断是否是短链接 bool Close() { // 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; } };
4、HttpContext模块
意义:
有可能出现接收的数据并不是一条完整的HTTP请求数据,也就是请求的处理需要在多次收到数据后才能处理完成,因此在每次处理的时候,就需要将处理进度记录起来,以便于下次从当前进度继续向下处理
接收状态:
接收请求行,当前处于接收并处理请求行的阶段
接收请求头部,,表示请求头部的接收还没有完毕
接收正文,表示还有正文没有接收完毕
接收数据完毕,这是一个接收完毕,可以对请求进行处理的阶段
接收处理请求出错
typedef enum { RECV_HTTP_ERROR, RECV_HTTP_LINE, RECV_HTTP_HEAD, RECV_HTTP_BODY, RECV_HTTP_OVER }HttpRecvStatu; #define MAX_LINE 8192 class HttpContext { private: int _resp_statu; //响应状态码 HttpRecvStatu _recv_statu; //当前接收及解析的阶段状态 HttpRequest _request; //已经解析得到的请求信息 private: bool ParseHttpLine(const std::string &line) { std::smatch matches; std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\?(.*))? (HTTP/1\.[01])(?:\n|\r\n)?", std::regex::icase); bool ret = std::regex_match(line, matches, e); if (ret == false) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400;//BAD REQUEST return false; } //0 : GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1 //1 : GET //2 : /bitejiuyeke/login //3 : user=xiaoming&pass=123123 //4 : HTTP/1.1 //请求方法的获取 _request._method = matches[1]; std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper); //资源路径的获取,需要进行URL解码操作,但是不需要+转空格 _request._path = Util::UrlDecode(matches[2], false); //协议版本的获取 _request._version = matches[4]; //查询字符串的获取与处理 std::vector query_string_arry; std::string query_string = matches[3]; //查询字符串的格式 key=val&key=val....., 先以 & 符号进行分割,得到各个字串 Util::Split(query_string, "&", &query_string_arry); //针对各个字串,以 = 符号进行分割,得到key 和val, 得到之后也需要进行URL解码 for (auto &str : query_string_arry) { size_t pos = str.find("="); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400;//BAD REQUEST return false; } std::string key = Util::UrlDecode(str.substr(0, pos), true); std::string val = Util::UrlDecode(str.substr(pos + 1), true); _request.SetParam(key, val); } return true; } bool RecvHttpLine(Buffer *buf) { if (_recv_statu != RECV_HTTP_LINE) return false; //1. 获取一行数据,带有末尾的换行 std::string line = buf->GetLineAndPop(); //2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大 if (line.size() == 0) { //缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414;//URI TOO LONG return false; } //缓冲区中数据不足一行,但是也不多,就等等新数据的到来 return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414;//URI TOO LONG return false; } bool ret = ParseHttpLine(line); if (ret == false) { return false; } //首行处理完毕,进入头部获取阶段 _recv_statu = RECV_HTTP_HEAD; return true; } bool RecvHttpHead(Buffer *buf) { if (_recv_statu != RECV_HTTP_HEAD) return false; //一行一行取出数据,直到遇到空行为止, 头部的格式 key: val\r\nkey: val\r\n.... while(1){ std::string line = buf->GetLineAndPop(); //2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大 if (line.size() == 0) { //缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414;//URI TOO LONG return false; } //缓冲区中数据不足一行,但是也不多,就等等新数据的到来 return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414;//URI TOO LONG return false; } if (line == "\n" || line == "\r\n") { break; } bool ret = ParseHttpHead(line); if (ret == false) { return false; } } //头部处理完毕,进入正文获取阶段 _recv_statu = RECV_HTTP_BODY; return true; } bool ParseHttpHead(std::string &line) { //key: val\r\nkey: val\r\n.... if (line.back() == '\n') line.pop_back();//末尾是换行则去掉换行字符 if (line.back() == '\r') line.pop_back();//末尾是回车则去掉回车字符 size_t pos = line.find(": "); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400;// return false; } std::string key = line.substr(0, pos); std::string val = line.substr(pos + 2); _request.SetHeader(key, val); return true; } bool RecvHttpBody(Buffer *buf) { if (_recv_statu != RECV_HTTP_BODY) return false; //1. 获取正文长度 size_t content_length = _request.ContentLength(); if (content_length == 0) { //没有正文,则请求接收解析完毕 _recv_statu = RECV_HTTP_OVER; return true; } //2. 当前已经接收了多少正文,其实就是往 _request._body 中放了多少数据了 size_t real_len = content_length - _request._body.size();//实际还需要接收的正文长度 //3. 接收正文放到body中,但是也要考虑当前缓冲区中的数据,是否是全部的正文 // 3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据 if (buf->ReadAbleSize() >= real_len) { _request._body.append(buf->ReadPosition(), real_len); buf->MoveReadOffset(real_len); _recv_statu = RECV_HTTP_OVER; return true; } // 3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来 _request._body.append(buf->ReadPosition(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); return true; } public: HttpContext():_resp_statu(200), _recv_statu(RECV_HTTP_LINE) {} void ReSet() { _resp_statu = 200; _recv_statu = RECV_HTTP_LINE; _request.ReSet(); } int RespStatu() { return _resp_statu; } HttpRecvStatu RecvStatu() { return _recv_statu; } HttpRequest &Request() { return _request; } //接收并解析HTTP请求 void RecvHttpRequest(Buffer *buf) { //不同的状态,做不同的事情,但是这里不要break, 因为处理完请求行后,应该立即处理头部,而不是退出等新数据 switch(_recv_statu) { case RECV_HTTP_LINE: RecvHttpLine(buf); case RECV_HTTP_HEAD: RecvHttpHead(buf); case RECV_HTTP_BODY: RecvHttpBody(buf); } return; } };
5、HttpServer模块
功能:对于HTTP协议支持所有模块的整合
意义:让HTTP服务器的搭建变得更加简便
设计一张请求路由表:
表中记录了针对哪个请求,应该使用哪个函数来进行业务处理的映射关系
当服务器收到了一个请求,就在请求路由表中,查找有没有对应请求的处理函数,如果有,则执行对应的处理函数即可
什么请求,怎么处理,由用户来设定,服务器收到了请求只需要执行函数即可。
好处:用户只需要实现业务处理函数,然后将请求与处理函数的映射关系,添加到服务器中,而服务器只需要接收数据,解析数据,查找
路由表映射关系,执行业务处理函数
要素:
1、GET请求的路由映射表
2、POST请求的路由映射表
3、PUT请求的路由映射表
4、DELETE请求的路由映射表 —路由映射表记录对应请求方法的处理函数映射关系
5、高性能TCP服务器 — 进行连接的IO操作
6、静态资源相对根目录 — 实现静态资源的处理
接口:
服务器处理流程:
1、从socket接收数据,放到接收缓冲区
2、调用OnMessage回调函数进行业务处理
3、对请求进行解析,得到一个HttpRequest结构,包含了所有的请求要素
4、进行请求的路由查找 – 找到对应请求的处理方法
1.静态资源请求—一些实体文件资源的请求
将静态资源文件的数据读取处理,填充到HttpResponse结构中
2.功能性请求—在请求路由映射表中查找处理函数,找到了则执行函数
具体的业务处理,并运行HttpResponse结构的数据填充
5、对静态资源请求/功能性请求进行处理完毕后,得到了一个填充了响应信息的HttpResponse对象,组织http格式响应,进行发送
功能:
1、添加请求-处理函数映射信息(GET/POST/PUT/DELETE)
2、设置静态资源根目录
3、设置线程池中线程数量
4、设置是否设置超时连接释放
5、启动服务器
6、OnConnected —用于给TcpServer设置协议上下文
7、OnMessage — 用于进行缓冲区数据解析处理
8、请求路由查找:静态资源请求查找和处理,功能性请求的查找和处理
9、组织响应进行回复
class HttpServer { private: using Handler = std::function; using Handlers = std::vector; Handlers _get_route; Handlers _post_route; Handlers _put_route; Handlers _delete_route; std::string _basedir; //静态资源根目录 TcpServer _server; private: void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) { //1. 组织一个错误展示页面 std::string body; body += ""; body += ""; body += ""; body += ""; body += ""; body += "
"; body += std::to_string(rsp->_statu); body += " "; body += Util::StatuDesc(rsp->_statu); body += "
"; body += ""; body += ""; //2. 将页面数据,当作响应正文,放入rsp中 rsp->SetContent(body, "text/html"); } //将HttpResponse中的要素按照http协议格式进行组织,发送 void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) { //1. 先完善头部字段 if (req.Close() == true) { rsp.SetHeader("Connection", "close"); }else { rsp.SetHeader("Connection", "keep-alive"); } if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) { rsp.SetHeader("Content-Length", std::to_string(rsp._body.size())); } if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) { rsp.SetHeader("Content-Type", "application/octet-stream"); } if (rsp._redirect_flag == true) { rsp.SetHeader("Location", rsp._redirect_url); } //2. 将rsp中的要素,按照http协议格式进行组织 std::stringstream rsp_str; rsp_str get(); //2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象 // 1. 如果缓冲区的数据解析出错,就直接回复出错响应 // 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理 context->RecvHttpRequest(buffer); HttpRequest &req = context->Request(); HttpResponse rsp(context->RespStatu()); if (context->RespStatu() >= 400) { //进行错误响应,关闭连接 ErrorHandler(req, &rsp);//填充一个错误显示页面数据到rsp中 WriteReponse(conn, req, rsp);//组织响应发送给客户端 context->ReSet(); buffer->MoveReadOffset(buffer->ReadAbleSize());//出错了就把缓冲区数据清空 conn->Shutdown();//关闭连接 return; } if (context->RecvStatu() != RECV_HTTP_OVER) { //当前请求还没有接收完整,则退出,等新数据到来再重新继续处理 return; } //3. 请求路由 + 业务处理 Route(req, &rsp); //4. 对HttpResponse进行组织发送 WriteReponse(conn, req, rsp); //5. 重置上下文 context->ReSet(); //6. 根据长短连接判断是否关闭连接或者继续处理 if (rsp.Close() == true) conn->Shutdown();//短链接则直接关闭 } return; } public: HttpServer(int port, int timeout = DEFALT_TIMEOUT):_server(port) { _server.EnableInactiveRelease(timeout); _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void SetBaseDir(const std::string &path) { assert(Util::IsDirectory(path) == true); _basedir = path; } /*设置/添加,请求(请求的正则表达)与处理函数的映射关系*/ void Get(const std::string &pattern, const Handler &handler) { _get_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Post(const std::string &pattern, const Handler &handler) { _post_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Put(const std::string &pattern, const Handler &handler) { _put_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Delete(const std::string &pattern, const Handler &handler) { _delete_route.push_back(std::make_pair(std::regex(pattern), handler)); } void SetThreadCount(int count) { _server.SetThreadCount(count); } void Listen() { _server.Start(); } };6、HttpServer服务器测试
#include "http.hpp" #define WWWROOT "./wwwroot/" std::string RequestStr(const HttpRequest &req) { std::stringstream ss; ss
-
-
-
- ⼯作线程处理完毕后,将响应交给⼦Reactor线程进⾏数据响应
-
-
-
还没有评论,来说两句吧...