skynet 中定时器的设计

距离上一次阅读 skynet 的源码已经有 4 年之久了,那是我第一次阅读如此优雅的代码,以至于我从 Java 直接转到了 skynet 开发。 前几个月有朋友问我如何实现 timer 的,我只简单的回答了下用了分级来实现的。 至于是如何分级的,我今天下午重新阅读了代码把思路整理了下。 说实话,通过看代码来推断设计思路远不如先看设计思路再阅读代码来的清晰。

首先需要说明的是 skynet 的时间精度是厘秒,即 1s = 100 cs。 以及下面会混用定时器引擎和添加进来的定时器任务,我都统称为定时器,请根据上下文进行分辨。 并且此文中队列和链表是一样的,链表是队列的具体实现方式。

0x01 数据结构定义

下面先解释 timer 的分级结构,然后再介绍如何添加定时器,以及定时器的 update 。

以下是 timer 的结构体:

#define TIME_NEAR_SHIFT 8
#define TIME_NEAR (1 << TIME_NEAR_SHIFT)
#define TIME_LEVEL_SHIFT 6
#define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)
#define TIME_NEAR_MASK (TIME_NEAR-1)
#define TIME_LEVEL_MASK (TIME_LEVEL-1)

struct timer {
        struct link_list near[TIME_NEAR];
        struct link_list t[4][TIME_LEVEL];
        uint32_t time;
};

0x02 添加定时器到分级队列

skynet 的分级原理是将即将执行的定时器放在 timer.near 中,将执行时间较远的定时器放在 timer.t 的分级链表中,分级的含义是将较近的定时器放在索引为 timer.t[0],更远的放在 timer.t[1],又更远的放在 timer.t[2] 依次类推。 随着时间的推移将高分级上的定时器下放到低分级的定时器,一直到 timer.near 队列。

基本原理就是这样。具体代码如下:

static void
add_node(struct timer *T,struct timer_node *node) {
    uint32_t time=node->expire;
    uint32_t current_time=T->time;

    if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) {
        link(&T->near[time&TIME_NEAR_MASK],node);
    } else {
        int i;
        uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT;
        for (i=0;i<3;i++) {
            if ((time|(mask-1))==(current_time|(mask-1))) {
                break;
            }
            mask <<= TIME_LEVEL_SHIFT;
        }

        link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);    
    }
}

下面解释下 TIME_NEAR_SHIFT 8 TIME_LEVEL_SHIFT 6 的含义。 skynet 将定时器的当前时间用 timer.time 表示,其数值是 32 位,而 near 和每个 level 的队列分区刚好是 32 位(8+4*6),不管时间是任何值都可以对应到队列中去 。 添加的定时器按照其 expire 分配到对应的队列中去,具体的是如果定时器 expire 与当前时间差值在 8 位内则放入到 near 队列数据中,差值在 14 位内则放到第一级队列数组中,依次类推。具体放在哪一个队列则具体是对应段的 mask (TIME_NEAR_MASKTIME_LEVEL_MASK) 进行位与来得到。

从这里我们可以看出,时间越近其分区精度越高,最高的是 near ,每个时间刻度都对应一个队列。 而后面的分级队列则是前面一级的倍数。比如,我们知道 t[0] 包含 TIME_LEVEL(64)个链表,每一个链表所容纳的时间范围是 8 位,也就是 near 的容量。t[1] 也包含 TIME_LEVEL 个链表,每个链表则容纳的时间范围是 14 位也就是 t[0] 的容量。

0x03 按时间更新定时器分级队列

随着时间的推移,在后面分级队列上的定时器需要下放到前面的分级队列上。进行调整的时机是 time 的最低有效位超过 8 时,按照最低有效位的位置查找对应的队列,并将其重新按照当前时间添加到分级队列中。 得到的效果便是,每经过 64 次调整一级队列,就会调整一次二次队列。每经过 64 次调整二级队列,就会调整一次三级队列。依次类推。

具体代码如下:

static void
timer_shift(struct timer *T) {
    int mask = TIME_NEAR;
    uint32_t ct = ++T->time;
    if (ct == 0) {
        move_list(T, 3, 0);
    } else {
        uint32_t time = ct >> TIME_NEAR_SHIFT;
        int i=0;

        while ((ct & (mask-1))==0) {
            int idx=time & TIME_LEVEL_MASK;
            if (idx!=0) {
                move_list(T, i, idx);
                break;              
            }
            mask <<= TIME_LEVEL_SHIFT;
            time >>= TIME_LEVEL_SHIFT;
            ++i;
        }
    }
}

上面 move_list 的功能很简单,就是将整个链表中的节点,再次调用 add_node 添加到分级队列中。这样时间靠近的定时器就会被放到 near 链表中,而其它的会放到一级、二级等队列中。 这里还有一点需要注意的是由于 time 的大小是 uint32_t,因而会出现回绕的情况,也就是 0xffffffff + 1 = 0,而由于这个回绕的情况存在,回绕之后的所有定时器都存储在 t[3][0] 的队列中,你从前面的论述中看看是否这么回事。

顺带说一句,在 C 中将更高精度数值赋值给低精度数值是合法的,且不需要转型,只是会引起高位截断。我之所以感到疑惑是因为函数的返回值是 long int,而代码中接收返回值的是 int 类型。

Leave a Reply

Your email address will not be published. Required fields are marked *