龙空技术网

海量高并发定时器设计

Linux特训营 1172

前言:

而今大家对“nginx和libevent”大约比较关心,小伙伴们都需要剖析一些“nginx和libevent”的相关知识。那么小编在网络上收集了一些关于“nginx和libevent””的相关知识,希望各位老铁们能喜欢,各位老铁们一起来了解一下吧!

目录

1. 最小堆

2. 红黑树

3. 时间轮

4. 时间轮应用

5. 高并发定时器实现

最小堆

在libevent中定时器的实现是通过基于最小堆的优先级队列来实现的,对于这两个数据结构比较陌生的可以去翻算法导论的6.5节中,主要的源码都在min_heap.c中,下面是C++的实现。

数据结构

typedef struct min_heap{  struct event** p;  unsigned n, a;} min_heap_t;

在这个数据结构中 p也就是整个优先级队列,而这个优先级队列的每个节点都是一个struct *event.n表示这个队列的元素个数。a表示这个队列的大小。

#ifndef VOS_TIMER_H_

#define VOS_TIMER_H_

#define RELATIVE_TIMER 1

#define ABSOLUTE_TIMER 2

namespace vos

{

struct event;

typedef struct min_heap

{

struct event** p;

unsigned n, a;

} min_heap_t;

class Timer

{

public:

Timer();

virtual ~Timer();

/**************************************

* input: interval: 每次执行的时间隔间, 单位是毫秒。

* fun arg : 回调函数以及参数。

* flag : 绝对定时器还是相对定时器,如果是相对定时器

* exe_num : 只有在相对定时器才有效,表示执行的次数。最少为1次

* return: 生成定时器的ID

**************************************/

unsigned int timer_add(int interval, void (*fun)(void*), void *arg, int flag = ABSOLUTE_TIMER,

int exe_num = 0);

/***************************************

* description:

* 去掉已经加入的定时器,比如产生定时器的母体已经消亡了,在消亡之间要将其删除。

* 相对定时器在任务完成后会Timer会自己释放掉。

***************************************/

bool timer_remove(unsigned int timer_id);

/***************************************

* description: Timer属于被动对象,没有自己的执行线程,属于被调用者。这样主要是为了避免产生线程同步。

* 定时器的循环处理函数,由定时器的拥有者进行循环调用。它的最小时间间隔决定了定时器的精度。

***************************************/

int timer_process();

private:

struct min_heap _min_heap;

unsigned int _timer_id;

};

} /* namespace vos */

#endif /* VOS_TIMER_H_ */

#include "vos_timer.h"

#ifndef __WINDOWS

#include <sys/time.h>

#else

#include <windows.h>

#endif

#include <time.h>

#include <stdio.h>

#include <stdlib.h>

#define evutil_timercmp(tvp, uvp, cmp) \

(((tvp)->tv_sec == (uvp)->tv_sec) ? \

((tvp)->tv_usec cmp (uvp)->tv_usec) : \

((tvp)->tv_sec cmp (uvp)->tv_sec))

#define evutil_timersub(tvp, uvp, vvp) \

do { \

(vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \

(vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \

if ((vvp)->tv_usec < 0) { \

(vvp)->tv_sec--; \

(vvp)->tv_usec += 1000000; \

} \

} while (0)

#define evutil_timeradd(tvp, uvp, vvp) \

do { \

(vvp)->tv_sec = (tvp)->tv_sec + (uvp)->tv_sec; \

(vvp)->tv_usec = (tvp)->tv_usec + (uvp)->tv_usec; \

if ((vvp)->tv_usec >= 1000000) { \

(vvp)->tv_sec++; \

(vvp)->tv_usec -= 1000000; \

} \

} while (0)

#ifdef __WINDOWS

int gettimeofday(struct timeval* tv, void * attr)

{

union

{

long long ns100;

FILETIME ft;

}now;

GetSystemTimeAsFileTime (&now.ft);

tv->tv_usec = (long) ((now.ns100 / 10LL) % 1000000LL);

tv->tv_sec = (long) ((now.ns100 - 116444736000000000LL) / 10000000LL);

return (0);

}

#endif

namespace vos

{

struct event

{

unsigned int min_heap_idx; /* for managing timeouts */

unsigned int timer_id;

struct timeval ev_interval;

struct timeval ev_timeout;

int ev_exe_num;

void (*ev_callback)(void *arg);

void *ev_arg;

int ev_res; /* result passed to event callback */

int ev_flags;

};

/***构造函数 ***************/

static inline void min_heap_ctor(min_heap_t* s);

/***析构函数 ***************/

static inline void min_heap_dtor(min_heap_t* s);

/***初始化函数 ***************/

static inline void min_heap_elem_init(struct event* e);

/****比较函数***************/

static inline int min_heap_elem_greater(struct event *a, struct event *b);

static inline int min_heap_empty(min_heap_t* s);

static inline unsigned min_heap_size(min_heap_t* s);

/****返回栈顶元素************/

static inline struct event* min_heap_top(min_heap_t* s);

/****调整定时器的大小*********/

static inline int min_heap_reserve(min_heap_t* s, unsigned n);

/****放入数据*************/

static inline int min_heap_push(min_heap_t* s, struct event* e);

/****取得最后上面的数据******/

static inline struct event* min_heap_pop(min_heap_t* s);

/****消除一个定时器元素*******/

static inline int min_heap_erase(min_heap_t* s, struct event* e);

/****向上调整 ************/

static inline void min_heap_shift_up_(min_heap_t* s, unsigned hole_index, struct event* e);

/****向下调整************/

static inline void min_heap_shift_down_(min_heap_t* s, unsigned hole_index, struct event* e);

static inline void gettime(struct timeval *tm);

Timer::Timer() :

_timer_id(0)

{

min_heap_ctor(&_min_heap);

}

Timer::~Timer()

{

for (int i = 0; i < _min_heap.n; i++)

{

free(_min_heap.p[i]);

}

min_heap_dtor(&_min_heap);

}

unsigned int Timer::timer_add(int interval, void(*fun)(void*), void *arg,

int flag /* = ABSOLUTE_TIMER */, int exe_num /* = 0 */)

{

struct event * ev = (struct event*) malloc(sizeof(struct event));

min_heap_elem_init(ev);

if (NULL == ev)

return NULL;

struct timeval now;

gettime(&now);

ev->ev_interval.tv_sec = interval / 1000;

ev->ev_interval.tv_usec = (interval % 1000) * 1000;

evutil_timeradd(&now, &(ev->ev_interval), &(ev->ev_timeout));

ev->ev_flags = flag;

ev->ev_callback = fun;

ev->ev_arg = arg;

ev->ev_exe_num = exe_num;

ev->timer_id = _timer_id++;

min_heap_push(&_min_heap, ev);

return ev->timer_id;

}

bool Timer::timer_remove(unsigned int timer_id)

{

for (int i = 0; i < _min_heap.n; i++)

{

if (timer_id == _min_heap.p[i]->timer_id)

{

struct event * e = _min_heap.p[i];

min_heap_erase(&_min_heap, _min_heap.p[i]);

free(e);

return true;

}

}

return false;

}

int Timer::timer_process()

{

struct event *event;

struct timeval now;

while ((event = min_heap_top(&_min_heap)) != NULL)

{

gettime(&now);

if (evutil_timercmp(&now, &(event->ev_timeout), < ))

break;

min_heap_pop(&_min_heap);

event->ev_callback(event->ev_arg);

if (event->ev_flags == ABSOLUTE_TIMER

|| (event->ev_flags == RELATIVE_TIMER && --event->ev_exe_num > 0))

{

evutil_timeradd(&(event->ev_timeout), &(event->ev_interval), &(event->ev_timeout));

min_heap_push(&_min_heap, event);

}

else

{

free(event);

}

}

return 0;

}

void gettime(struct timeval *tm)

{

gettimeofday(tm, NULL);

}

int min_heap_elem_greater(struct event *a, struct event *b)

{

return evutil_timercmp(&a->ev_timeout, &b->ev_timeout, >);

}

void min_heap_ctor(min_heap_t* s)

{

s->p = 0;

s->n = 0;

s->a = 0;

}

void min_heap_dtor(min_heap_t* s)

{

if (s->p)

free(s->p);

}

void min_heap_elem_init(struct event* e)

{

e->min_heap_idx = -1;

}

int min_heap_empty(min_heap_t* s)

{

return 0u == s->n;

}

unsigned min_heap_size(min_heap_t* s)

{

return s->n;

}

struct event* min_heap_top(min_heap_t* s)

{

return s->n ? *s->p : 0;

}

int min_heap_push(min_heap_t* s, struct event* e)

{

if (min_heap_reserve(s, s->n + 1))

return -1;

min_heap_shift_up_(s, s->n++, e);

return 0;

}

struct event* min_heap_pop(min_heap_t* s)

{

if (s->n)

{

struct event* e = *s->p;

min_heap_shift_down_(s, 0u, s->p[--s->n]);

e->min_heap_idx = -1;

return e;

}

return 0;

}

int min_heap_erase(min_heap_t* s, struct event* e)

{

if (((unsigned int) -1) != e->min_heap_idx)

{

struct event *last = s->p[--s->n];

unsigned parent = (e->min_heap_idx - 1) / 2;

/* we replace e with the last element in the heap. We might need to

shift it upward if it is less than its parent, or downward if it is

greater than one or both its children. Since the children are known

to be less than the parent, it can't need to shift both up and

down. */

if (e->min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], last))

min_heap_shift_up_(s, e->min_heap_idx, last);

else

min_heap_shift_down_(s, e->min_heap_idx, last);

e->min_heap_idx = -1;

return 0;

}

return -1;

}

int min_heap_reserve(min_heap_t* s, unsigned n)

{

if (s->a < n)

{

struct event** p;

unsigned a = s->a ? s->a * 2 : 8;

if (a < n)

a = n;

if (!(p = (struct event**) realloc(s->p, a * sizeof *p)))

return -1;

s->p = p;

s->a = a;

}

return 0;

}

void min_heap_shift_up_(min_heap_t* s, unsigned hole_index, struct event* e)

{

unsigned parent = (hole_index - 1) / 2;

while (hole_index && min_heap_elem_greater(s->p[parent], e))

{

(s->p[hole_index] = s->p[parent])->min_heap_idx = hole_index;

hole_index = parent;

parent = (hole_index - 1) / 2;

}

(s->p[hole_index] = e)->min_heap_idx = hole_index;

}

void min_heap_shift_down_(min_heap_t* s, unsigned hole_index, struct event* e)

{

unsigned min_child = 2 * (hole_index + 1);

while (min_child <= s->n)

{

min_child -= min_child == s->n

|| min_heap_elem_greater(s->p[min_child], s->p[min_child - 1]);

if (!(min_heap_elem_greater(e, s->p[min_child])))

break;

(s->p[hole_index] = s->p[min_child])->min_heap_idx = hole_index;

hole_index = min_child;

min_child = 2 * (hole_index + 1);

}

min_heap_shift_up_(s, hole_index, e);

}

} /* namespace vos */

红黑树

R-B Tree,全称是Red-Black Tree,又称为“红黑树”,它一种特殊的二叉查找树。红黑树的每个节点上都有存储位表示节点的颜色,可以是红(Red)或黑(Black)。 红黑树的特性:

(1)每个节点或者是黑色,或者是红色。

(2)根节点是黑色。

(3)每个叶子节点(NIL)是黑色。 [注意:这里叶子节点,是指为空(NIL或NULL)的叶子节点!] (4)如果一个节点是红色的,则它的子节点必须是黑色的。

(5)从一个节点到该节点的子孙节点的所有路径上包含相同数目的黑节点。

注意: (1) 特性

(2)中的叶子节点,是指为空(NIL或null)的节点。

(3) 特性

(4),确保没有一条路径会比其他路径长出两倍。因而,红黑树是相对是接近平衡的二叉树。

时间轮

定时器的实现,需要具备以下几个行为,这也是在后面评判各种定时器实现的一个基本模型 [1]:StartTimer(Interval, TimerId, ExpiryAction)注册一个时间间隔为 Interval 后执行 ExpiryAction 的定时器实例,其中,返回 TimerId 以区分在定时器系统中的其他定时器实例。时间复杂度取决于数据结构。StopTimer(TimerId)根据 TimerId 找到注册的定时器实例并执行 Stop。时间复杂度为O(1)。

PerTickBookkeeping()在一个 Tick 内,定时器系统需要执行的动作,它最主要的行为,就是检查定时器系统中,是否有定时器实例已经到期。时间复杂度取决于数据结构。

高并发定时器实现

由于linux还不是一个实时的操作系统,因此如果需要更高精度,或者更精确的定时的话,可能就需要打一些实时的补丁,或者用商用版的实时linux,.

这里内的定时器最小间隔也就是1个tick.

这里还有一个要注意的,我这里的分析并没有分析内核新的hrt 定时器.这个定时器是Monta Vista加入到内核的一个高精度的定时器的实现.

先来看几个相关的数据结构.

///这个是一个最主要的数据结构,表示一个完整的定时器级联表

struct tvec_base {

///自旋锁

spinlock_t lock;

///表示由本地cpu正在处理的定时器链表

struct timer_list *running_timer;

///这个表示当前的定时器级联表中最快要超时的定时器的jiffer

unsigned long timer_jiffies;

///下面表示了5级的定时器级联表.

struct tvec_root tv1;

struct tvec tv2;

struct tvec tv3;

struct tvec tv4;

struct tvec tv5;

} ____cacheline_aligned;

下面来看tvec和tvec_root的结构:

struct tvec {

struct list_head vec[TVN_SIZE];

};

struct tvec_root {

struct list_head vec[TVR_SIZE];

};

可以看到这两个结构也就是hash链表.每次通过超时jiffies来计算slot,然后插入到链表.这里链表是FIFO的.这里除了tv5外其他几个都是简单的与TVR_MASK按位与计算.

struct timer_list {

struct list_head entry;

///超时节拍数

unsigned long expires;

///定时器将要执行的回调函数

void (*function)(unsigned long);

///传递给回调函数的参数

unsigned long data;

///从属于那个base

struct tvec_base *base;

};

//定义了一个per cpu变量.这里要知道定时器的注册和触发执行一定是在相同的cpu上的.

struct tvec_base boot_tvec_bases;

static DEFINE_PER_CPU(struct tvec_base *, tvec_bases) = &boot_tvec_bases;

内核注册定时器最终都会通过调用internal_add_timer来实现.具体的工作方式是这样的:

1 如果定时器在接下来的0~255个jiffies中到期,则将定时器添加到tv1.

2 如果定时器是在接下来的256*64个jiffies中到期,则将定时器添加到tv2.

3 如果定时器是在接下来的256*64*64个jiffies中到期,则将定时器添加到tv3.

4 如果定时器是在接下来的256*64*64*64个jiffies中到期,则将定时器添加到tv4.

5 如果更大的超时,则利用0xffffffff来计算hash,然后插入到tv5(这个只会出现在64的系统).

看下面的图就能比较清晰了:

总结‘’

高并发的处理:

① 配置多个Linux外部服务器,Nginx反向代理。

② 增加缓存服务器,将数据放入内存里面,增加读取速度。

③ 搭建Redis集群。

④ 做数据分离(建立历史表,用的技术是Quartz(每天定时定点的执行任务))。

⑤ 将逻辑处理涉及到多处数据库连接操作,优化成一个存储过程,只调用一次数据库,从而缩短数据库的访问时间。

标签: #nginx和libevent