C++无锁编程之自旋锁(spinlock)的实现

此实例通过c++11实现。

#pragma once

#include 
#include 

class Spinlock {
public:
    Spinlock() : flag(ATOMIC_FLAG_INIT), wait_count(2000) {}

    void lock() {
        int64_t i = 0;
        while(flag.test_and_set(std::memory_order_acquire)) {
            __asm__ ("pause");
            if (++i > wait_count) {
                sched_yield();
                i = 0;
            }
        }
    }

    bool try_lock() {
        if (flag.test_and_set(std::memory_order_acquire)) {
            return false;
        }
        return true;
    }

    void unlock() {
        flag.clear(std::memory_order_release);
    }

private :
    std::atomic_flag flag;
    int32_t wait_count;
};

C++无锁编程之AsyncParallelTask框架

简介
AsyncParallelTask框架,是为了适用于Rank3.0的拆包合包业务场景而设计和实现的异步任务调度框架,且具有良好的通用性和可移植性。

Rank拆包合包的业务场景,即Rank3.0接受到请求后,拆包往下游预估服务分发请求,在接收到返回后合并打分结果,最终把结果返回给上游。

使用AsyncParallelTask框架的优点
拆包合并由框架控制,免去了自己控制拆包后多个子任务的状态管理。
无锁化,没有锁竞争,性能高。
提供超时管理机制,有助于增强系统稳定性。
拥有友好的API,使用简单。
AsyncParallelTask框架可适用的场景举例
需要拆包合包的预估服务,比如Rank模块
搜索引擎的merger模块
其他需要拆包合包的业务场景
设计
设计思想
使用异步IO方式,不会引起线程阻塞,且通过超时控制来避免长尾任务。
通过使用原子变量和原子操作(atomic)来控制计数、状态变化。
支持多线程,且逻辑实现不使用任何种类的锁,使用lockfree数据结构和线程间通信机制来保证线程安全。
通过使用C++11标准的新特性,比如仿函数、参数模板等,来对外提供简洁和更加友好的API。
类域设计
AsyncParallelTask框架,总共包含控制器AsyncTaskController、定时器TimerController、异步并行任务类AsyncParallelTask、分发子任务类AsyncParallelSubTask等4部分组成。

控制器AsyncTaskController
AsyncTaskController是AsyncParallelTask的调度器,提供调度AsyncParallelTask的接口。内部包含线程池、定时器。

当AsyncTaskController调度AsyncParallelTask的运行时,首先把AsyncParallelTask放入线程池中调度,然后启动对AsyncParallelTask的超时监控。

定时器TimerController
TimerController的实现原理
TimerController如何实现定时的原理和libevent并无不同,都使用了Reactor的设计模式。但是TimerController通过生产者、消费者模型,来实现多线程添加定时任务,并保证线程安全。TimerController使用了C++11的新特性,简化了代码实现。

使用最小堆来管理延时任务和周期任务
使用1个timerfd配合epoll来实现定时任务的激活
使用1个eventfd配合epoll来实现定时任务的添加
使用一个LockFree的栈,实现生产者消费者模型。由外部多线程写,TimerController内部1个线程来读。
TimerController内部流程图

异步任务基类AsyncTask
任何继承AsyncTask的类都可供AsyncTaskController进行调度。

AsyncTask中定了一个基本的接口和AsyncTask的状态的转换。

部分代码举例:

class AsyncTask {
public:
enum Status {
UNSCHEDULED,
PROCESSING,
WAIT_CALLBACK,
CALLBACK,
TIMEOUT,
EXCEPTION,
FINISHED
};

AsyncTask() :
    id(0),
    parent_id(0),
    timeout_threshold(0),
    status(UNSCHEDULED) {}
virtual ~AsyncTask() {}
virtual Status process() = 0;
virtual Status timeout() { return TIMEOUT; }
virtual void destroy() {}
virtual void reset() {
    id = 0;
    parent_id = 0;
    timeout_threshold = 0;
    status = UNSCHEDULED;
}

virtual void callback() {}
virtual void callbackExcepiton() {}
virtual void callbackTimeout() {}

…….
private:
int64_t id;
int64_t parent_id;
int32_t timeout_threshold; // millisecond;
std::atomic status;
};
AsyncTask的状态转换图
AsyncTask约束了异步任务的7种状态,和8中状态转换。其中TIMEOUT和EXCEPITON是等效的,为了方便区分两种异常而设置两个状态。

并行任务AsyncParallelTask
AsyncParallelTask内部流程图

并行子任务AsyncParallelSubTask
拆包后分发操作主要在AsyncParallelSubTask中执行。需要创建AsyncParallelSubTask的子类,实现其中分发操作和合并结果的操作。

使用举例
初始化AsyncTaskController
在进程Init中执行,全局单例。设置分发的线程池。

static ThreadPool thread_pool(config.getWorkerThreadNum()); 
auto& task_controller = Singleton::GetInstance();
task_controller.setThreadPool(&thread_pool);

定义AsyncParallelSubTask的子类PredictAsyncParallelSubTask 
主要实现process和mergeResult两个函数,具体参考

https://gitlab.vmic.xyz/iai_common/rank/blob/experiment3/task/predict_async_parallel_subtask.h

https://gitlab.vmic.xyz/iai_common/rank/blob/experiment3/task/predict_async_parallel_subtask.cpp

class PredictAsyncParallelSubTask : public AsyncParallelSubTask {
public:
PredictAsyncParallelSubTask() :
alg_info(nullptr),
context(nullptr),
split_info({0}) {}

virtual ~PredictAsyncParallelSubTask() {}

virtual Status process() {
    if (nullptr == context) {
        throw std::runtime_error("context is nullptr");
    }
    if (nullptr == alg_info) {
        throw std::runtime_error("alg_info is nullptr");
    }
    PredictService::asyncRequestZeusServer(this, *context, *alg_info, split_info);
    return WAIT_CALLBACK;
}

virtual void mergeResult();

virtual void reset() {
    AsyncParallelSubTask::reset();
    alg_info = nullptr;
    split_info = {0};
    context = nullptr;
    temp_res.Clear();
}

void collectResult(const zeus::proto::ZeusResponse& res) {
    auto& zeus_res = const_cast(res);
    temp_res.mutable_item()->Swap(zeus_res.mutable_item());
    temp_res.mutable_model_response()->Swap(zeus_res.mutable_model_response());
}

void setAlgInfo(AlgInfo* alg_info) { this->alg_info = alg_info;};
void setRankContext(RankContext *context) { this->context = context;}
void setSplitInfo(SplitInfo& split_info) { this->split_info = split_info;}

private:
void praseZeusToScoreItem(TargetCode code, double score, ScoreItem *score_item);

AlgInfo* alg_info;
RankContext *context;
SplitInfo split_info;
zeus::proto::ZeusResponse temp_res;

};
创建AsyncParallelTask
具体参考

class PredictRankTask : public AsyncTask {
public:
……
private:
AsyncParallelTask parallel_task;
……
};

……
for (int32_t partition_id = 0; partition_id < partition_count; ++partition_id) {
int64_t total_count = req_item_size;
int64_t offset = split_count * partition_id;
int64_t end = offset + split_count;
end = end > total_count ? total_count : end;
SplitInfo split_info({total_count,
split_count,
partition_count,
partition_id,
offset,
end});

            auto sub_task = std::make_shared();
            sub_task->setAlgInfo(const_cast(&alg_info));
            sub_task->setSplitInfo(split_info);
            sub_task->setRankContext(&getContext());
            parallel_task.addSubTask((std::shared_ptr)sub_task);
        }

……
auto task = this;
parallel_task.setAllDoneCallback([=]() {
task->response();
task->setStatusCallback();
});

    parallel_task.setIncomplateCallback([=]() {
            task->response(Error::E_INCOMPLATE, "some predict server is error!");
            task->setStatusCallback();
            });

    parallel_task.setAllFailCallback([=]() {
            task->response(Error::E_PREDICT_ALL_FAILED, "all predict server is error!");
            task->setStatusCallback();
            });

    int32_t timeout = PredictService::getMaxTimeout(scene_id, sub_alg);
    parallel_task.setTimeoutCallback(timeout, [=]() {
            task->response(Error::E_PREDICT_ALL_TIMEOUT, "all predict server timeout!");
            task->setTimeout();
            });

    auto& task_controller = Singleton::GetInstance();
    parallel_task.setController(&task_controller);
    parallel_task.setId(task_controller.generateUniqueId());
    setStatusWaitCallback(std::memory_order_relaxed);
    task_controller.scheduleImmediately(¶llel_task);

执行调度
task_controller.scheduleImmediately会在当前线程分发拆包到线程池。而task_controller.schedule则会在线程池中选择一个线程分发。

    auto& task_controller = Singleton::GetInstance();
    parallel_task.setController(&task_controller);
    parallel_task.setId(task_controller.generateUniqueId());
    setStatusWaitCallback(std::memory_order_relaxed);
    task_controller.scheduleImmediately(¶llel_task);

编码
源码地址:

测试
压力测试
测试机器为 2017H2-A1-1, 32 core机器

QPS cpu num of items body length session latency P99 session latency AVG
client latency AVG

bandwidth mem remark
300 56% 1W 200KB 43 35 40 3.4 Gb/s 1%
1600 62% 2k 40KB 31 21.6 24 3.64Gb/s 1.1%
稳定性测试
测试方法:
CPU 60%的压力下,持续测试24小时。 

测试结果:
Rank服务可稳定提供服务。无内存泄露。

极限测试
测试过程:

缓慢把CPU压力从30%提升到90%~100%之间,并维持10分钟,然后把cpu压力降低至60%。整个过程中观察Rank稳定性,有无内存泄露。

测试结果:

CPU压力达到90%以上时,Rank内存增长,超时错误日志变多,定时器失准,返回大量超时、错误。 

Cpu压力降低至60%之后,Rank服务恢复正常,内存使用率变小,无内存泄露,超时错误日志不再新的产出。

符合预期。

打分一致性测试
测试方法:
使用rank-diff工具,从passby环境,复制两份流量请求新旧rank生产环境,分别记录打分结果。最后通过python脚本统计打分结果差异。

测试结果:
1000qps,新旧rank打分一致,差异率小于99.9%,满足需求。

产生差异的数据,分为两种。1)为打分近似值,差别0.01以下。 2)打分无效取默认值0.001.

有锁Rank和无锁Rank性能对比
2k条广告时,1600qps,有锁和无锁Rank压力测试性能对比
测试机器  CPU 32 cores,其中QPS、带宽都是相同的。

有锁
无锁
remark 
QPS 1600
相同
CPU 54.1% 63%
session latency AVG 15 21.7
session latency P99 21 31
bandwidth 3.64Gb/s 3.64Gb/s 相同
req body length 40 KB 40 KB 相同
Context Switch

C++-双缓冲(DoubleBuffer)的设计与实现

源码如下:

#pragma once

#include 

template
class DoubleBuffer {
    public:
        DoubleBuffer() : cur_index(0) {}

        T& getWorkingBuffer(std::memory_order order = std::memory_order_seq_cst) {
            return buffers[cur_index.load(order)];
        }

        T& getBackupBuffer(std::memory_order order = std::memory_order_seq_cst) {
            return buffers[1 ^ cur_index.load(order)];
        }

        void switchBuffer(std::memory_order order = std::memory_order_seq_cst) {
            cur_index.fetch_xor(1, order);
        }

    private:
        T buffers[2];
        std::atomic cur_index;
};

使用C++11的特性来设计和实现API友好的高精度定时器TimerController

为什么设计和实现TimerController?

最新的TimerController代码保存在Github上面:https://github.com/zuocheng-liu/StemCell ,包含timer_controller.h 和 timer_controller.cpp两个文件,欢迎审阅!

因为软件设计中面临了一些实际问题

尤其在使用C++开发网络应用时常遇到下面的问题:

一、软件设计中,不会缺少通过使用定时器的来实现的场景,比如超时控制、定时任务、周期任务。

二、C/C++标准库中只有最原始的定时接口。没有提供功能完备的库来满足上面提到复杂场景。

三、第三方库中的定时器,往往存在一些问题,比如:

  • libevent、libev、libue 不是线程安全的,在多线程系统中,为了保证线程安全需要额外再进行封装。
  • redis的异步库libae对延时时间的处理是不准确的。

以上问题会让开发者开发新系统时带来一些困扰,但C++11新特性的出现,带来了解决上面问题的新思路。

C++11的新特性让定时器的实现更简单友好

TimerController接口更友好

接口参数支持C++11的lamaba表达式,让定时器的接口对开发人员更加友好。

代码更精简

TimerController的代码总计300~400行,而且功能完备,代码健壮。
C++11的新特性的使用,让代码更简洁,增强代码的可读性、可维护性。

保证线程安全

线程安全,是绕不开的问题。第三方库libevent等,在多线程环境中使用总是危险的。TimerController在设计之初就保证多线程环境下运行的安全性。

没有第三方依赖。

TimerController,没有依赖任何第三方库,完全依靠C/C++标准库和C++11的新特性来实现。

TimerController 接口设计

class TimerController {
    bool init(); // 初始化资源,并启动定时器
    void stop(); // 停止定时器,所有定时任务失效

    // 定时运行任务,参数delay_time单位是毫秒。后面参数是lamba表达式
    template<class F, class... Args>
    void delayProcess(uint32_t delay_time, F&& f, Args&&... args);

    // 周期运行任务,参数interval单位是毫秒,后面参数是lamba表达式。
    template<class F, class... Args>
    void  cycleProcess(uint32_t interval, F&& f, Args&&... args);
}

用一个实例来讲解TimerController的使用方法:

#include <iostream>
#include "timer_controller.h"
using namespace std;
using namespace StemCell;
int main() {
    srand((unsigned)time(NULL));
    try {
        TimerController tc;
        tc.init(); // 初始化 TimerController
        tc.cycleProcess(500, [=]() { cout << "cycle 0.5 sec" << endl; });
        for (int i = 0; i < 80; ++i) {
            // 随机产生80个延时任务,并延时执行
            auto seed = rand() % 8000;
            tc.delayProcess(seed + 1, [=]() { cout << "delay:" << seed << endl; });
        }
        sleep(8);  // 主线程睡眠8秒,让延时任务得以执行
        tc.stop(); // 停止 TimerController
        cout << "tc stoped!" << endl;
    } catch (exception& e) {
        cout << "error:" << e.what();
    }
    return 0;
}

TimerController 实现原理

TimerController如何实现定时的原理和libevent并无不同,都使用了Reactor的设计模式。但是TimerController通过生产者、消费者模型,来实现多线程添加定时任务,并保证线程安全。TimerController使用了C++11的新特性,简化了代码实现。

  • 使用最小堆来管理延时任务和周期任务
  • 使用1个timerfd配合epoll来实现定时任务的激活
  • 使用1个eventfd配合epoll来实现定时任务的添加
  • 使用一个线程安全的队列,实现生产者消费者模型。TimerController使用场景为多线程写,TimerController内部1个线程来读。

TimerController 高级用法

高延时的任务的处理

TimerController内部只有1个线程在执行定时任务。当高延时的任务增多时,可能会影响到任务运行的调度时间,高延时的任务需要在放入新的线程中运行。示例如下:

TimerController tc;
tc.init(); // 初始化 TimerController
// 把任务放入新线程或线程池中
tc.delayProcess(50, []() { std::thread([]() { do_long_time_task();}) });

TimerController保持全局单例

为了系统简洁,TimerController全局单例即可。
auto& tc = Singleton< TimerController >::GetInstance();

其他

如何避免CPU负载高时,定时器失准的问题?

TimerController 有待改进的点

  • 无锁化,目前使用了自旋锁在保证task队列的线程间互斥,后续可使用无锁队列替代有锁队列。
  • TimerController精度目前只有1毫秒,主要因为博主做网络开发都是毫秒级的,后续可以让TimerController支持更小的精度。
  • TimerController 使用了epoll、timerfd、eventfd等,只能在linux平台上面使用

源码地址

具体实现在 timer_controller.h 和 timer_controller.cpp两个文件里面。