Skip to content

Latest commit

 

History

History
206 lines (172 loc) · 8.4 KB

about-counter.md

File metadata and controls

206 lines (172 loc) · 8.4 KB

关于计数器

计数器是我们框架中一种非常重要的基础任务,计数器本质上是一个不占线程的信号量。
计数器主要用于工作流的控制,包括匿名计数器和命名计数器两种,可以实现非常复杂的业务逻辑。

计数器的创建

由于计数器也是一种任务,它的创建同样通过WFTaskFactory来完成,包括两种创建方法:

using counter_callback_t = std::function<void (WFTimerTask *)>;

class WFTaskFactory
{
    ...
    static WFCounterTask *create_counter_task(unsigned int target_value,
                                              counter_callback_t callback);
                                              
    static WFCounterTask *create_counter_task(const std::string& counter_name,
                                              unsigned int target_value,
                                              counter_callback_t callback);

    ...
};

每个计数器都包含一个target_value,当计数器的计数到达target_value,callback被调用。
以上两个接口分别产生匿名计数器和命名计数器,匿名计算器直接通过WFCounterTask的count方法来增加计数:

class WFCounterTask
{
public:
    virtual void count()
    {
        ...
    }
    ...
}

如果创建计数器时,传入一个counter_name,则产生一个命名计数器,可以通过count_by_name函数来增加计数。

用匿名计数器实现任务并行

并行抓取的示例中,我们通过创建一个ParallelWork来实现多个series并行。
通过ParallelWork和SeriesWork的组合,可以构建任意的串并连图,已经可以满足大多数应用场景需求。
而计数器的存在,可以让我们构建更复杂的任务依赖关系,比如实现一个全连接的神经网络。
以下简单的代码,可代替ParallelWork,实现一个并行的http抓取。

void http_callback(WFHttpTask *task)
{
    /* Save http page. */
    ...

    WFCounterTask *counter = (WFCounterTask *)task->user_data;
    counter->count();
}

std::mutex mutex;
std::condition_variable cond;
bool finished = false;

void counter_callback(WFCounterTask *counter)
{
    mutex.lock();
    finished = true;
    cond.notify_one();
    mutex.unlock();
}

int main(int argc, char *argv[])
{
    WFCounterTask *counter = create_counter_task(url_count, counter_callback);
    WFHttpTask *task;
    std::string url[url_count];

    /* init urls */
    ...

    for (int i = 0; i < url_count; i++)
    {
        task = create_http_task(url[i], http_callback);
        task->user_data = counter;
        task->start();
    }

    counter->start();
    std::unique_lock<std:mutex> lock(mutex);
    while (!finished)   
        cond.wait(lock);
    lock.unlock();
    return 0;
}

以上创建一个目标值为url_count的计数器,每个http任务完成之后,调用一次count。
注意,匿名计数器的count次数不可以超过目标值,否则counter可能已经callback销毁了,程序行为无定义。
counter->start()调用可以放在for循环之前。counter只要被创建,就可以调用其count接口,无论counter是否已经启动。
匿名计算器的count接口调用,也可以写成counter->WFCounterTask::count(); 在非常注重性能的应用下可以这么用。

Server与其它异步引擎结合使用

某些情况下,我们的server可能需要调用非本框架的异步客户端等待结果。简单的方法我们可以在process里同步等待,通过条件变量来唤醒。
这个做的缺点是我们占用了一个处理线程,把其它框架的异步客户端别为同步客户端。但通过counter的方法,我们可以不占线程的等待。 方法很简单:

void some_callback(void *context)
{
    protocol::HttpResponse *resp = get_resp_from_context(context);
    WFCounterTask *counter = get_counter_from_context(context);
    /* write data to resp. */
    ...
    counter->count();
}

void process(WFHttpTask *task)
{
    WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);

    SomeOtherAsyncClient client(some_callback, context);

    *series_of(task) << counter;
}

在这里,我们可以把server任务所在的series理解为一个协程,而目标值为1的counter,可以理解为一个条件变量。

命名计数器

对匿名计数器进行count操作时,直接访问了counter对象指针。这就必然要求在操作时,调用count的次数不超过目标值。
但想象这样一个应用场景,我们同时启动4个任务,只要其中有任意3个任务完成,工作流就可以继续进行。
我们可以用一个目标值为3的计数器,每个任务完成之后,count一次,这样只要任务3个任务完成,计数器就被callbavck。
但这样的问题是,当第四个任务完成,再调用counter->count()的时候,计算数已经是一个野指针了,程序崩溃。
这时候我们可以用命名计数器来解决这个问题。通过给计数器命名,并通过名字来计算器,例如以下实现:

void counter_callback(WFCounterTask *counter)
{
    WFRedisTask *next = WFTaskFactory::create_redis_task(...);
    series_of(counter).push_back(next);
}

int main(void)
{
    WFHttpTask *tasks[4];
    WFCounterTask *counter;

    counter = WFTaskFactory::create_counter_task("c1", 3, counter_callback);
    counter->start();

    for (i = 0; i < 4; i++)
    {
        tasks[i] = WFTaskFactory::create_http_task(..., [](WFHttpTask *task){
                                            WFTaskFactory::count_by_mame("c1"); });
        tasks[i]->start();
    }

    ...

}

这个示例中,调起4个并发的http任务,其中3个完成了,立刻启动一个redis任务。实际应用中,可能还需要加入数据传递的代码。
示例中创建命名为"c1"的计数器,在http回调里,使用WFTaskFactory::count_by_name()调用来进行计数。

class WFTaskFactory
{
    ...
    static void count_by_name(const std::string& counter_name);

    static void count_by_name(const std::string& counter_name, unsigned int n);
    ...
};

WFTaskFactory::count_by_name方法还可以传入一个整数n,表示这一次操作要增加的计数值,显然:
count_by_name("c1")等价于count_by_name("c1", 1)。
如果"c1"计数器不存在(未创建或已经完成),那么对"c1"的操作不产生任何效果,因此不会有匿名计数器野指针的问题。

命名计数器详细行为定义

调用WFTaskFactory::count_by_name(name, n)的时候:

  • 如果name不存在(未创建或已经完成),无任何行为。
  • 如果只有一个名字为name的计数器:
    • 如果该计数器剩余的值小于或等于n,计数完成,callback被调用,该计数器被销毁。结束。
    • 如果计数器剩余值大于n,则计数值加n。结束。
  • 如果存在多个同名为name的计数器:
    • 按照创建顺序,取第一个计数器,假设其剩余值为m:
      • 如果m值大于n,则计数加n。结束(剩余值为m-n)。
      • 如果m小于或等于n,计数完成,callback被调用,第一个计数器被销毁。置n=n-m。
        • 如果n为0,结束。
        • 如果n大于0,再取出下一个同名计数器,重复整个的操作。

虽然描述很复杂,但总结起来就一句话,按照创建顺序,依次访问所有名字为name的计数器,直到n为0。
也就是说,一次count_by_name(name, n)可以唤醒多个计数器。
用好计数器,特别实现非常复杂的业务逻辑。计数器在我们框架里,往往用于实现异步锁,或者用于任务之间的通道。形态上更像一种控制任务。

计数器的扩展WFContainerTask

计数器像一种信号量,每一个count操作,并不能附带操作数据,很多时候会带来一些不便。
大家如果把计数器想象成有向无环图上的一个节点,每个count是一条入边。那么,节点上可以有属性的,但入边则没有包含任何信息。
而WFContainerTask则是一种给入边加上属性的任务。在WFCounterTask.h里,有相关的定义:

template<tyename T>
class WFContainerTask : public WFCounterTask
{
public:
    void push(const T& value);
    void push(T&& value);
    ...
};

有需要的用户可以自行查阅相关代码。由于WFTaskFactory没有提供工厂函数,创造container任务需要自己调用new。