C++无锁编程之AsyncParallelTask框架

简介
AsyncParallelTask框架,是为了适用于Rank3.0的拆包合包业务场景而设计和实现的异步任务调度框架,且具有良好的通用性和可移植性。

Rank拆包合包的业务场景,即Rank3.0接受到请求后,拆包往下游预估服务分发请求,在接收到返回后合并打分结果,最终把结果返回给上游。

使用AsyncParallelTask框架的优点
拆包合并由框架控制,免去了自己控制拆包后多个子任务的状态管理。
无锁化,没有锁竞争,性能高。
提供超时管理机制,有助于增强系统稳定性。
拥有友好的API,使用简单。
AsyncParallelTask框架可适用的场景举例
需要拆包合包的预估服务,比如Rank模块
搜索引擎的merger模块
其他需要拆包合包的业务场景
设计
设计思想
使用异步IO方式,不会引起线程阻塞,且通过超时控制来避免长尾任务。
通过使用原子变量和原子操作(atomic)来控制计数、状态变化。
支持多线程,且逻辑实现不使用任何种类的锁,使用lockfree数据结构和线程间通信机制来保证线程安全。
通过使用C++11标准的新特性,比如仿函数、参数模板等,来对外提供简洁和更加友好的API。
类域设计
AsyncParallelTask框架,总共包含控制器AsyncTaskController、定时器TimerController、异步并行任务类AsyncParallelTask、分发子任务类AsyncParallelSubTask等4部分组成。

控制器AsyncTaskController
AsyncTaskController是AsyncParallelTask的调度器,提供调度AsyncParallelTask的接口。内部包含线程池、定时器。

当AsyncTaskController调度AsyncParallelTask的运行时,首先把AsyncParallelTask放入线程池中调度,然后启动对AsyncParallelTask的超时监控。

定时器TimerController
TimerController的实现原理
TimerController如何实现定时的原理和libevent并无不同,都使用了Reactor的设计模式。但是TimerController通过生产者、消费者模型,来实现多线程添加定时任务,并保证线程安全。TimerController使用了C++11的新特性,简化了代码实现。

使用最小堆来管理延时任务和周期任务
使用1个timerfd配合epoll来实现定时任务的激活
使用1个eventfd配合epoll来实现定时任务的添加
使用一个LockFree的栈,实现生产者消费者模型。由外部多线程写,TimerController内部1个线程来读。
TimerController内部流程图

异步任务基类AsyncTask
任何继承AsyncTask的类都可供AsyncTaskController进行调度。

AsyncTask中定了一个基本的接口和AsyncTask的状态的转换。

部分代码举例:

class AsyncTask {
public:
enum Status {
UNSCHEDULED,
PROCESSING,
WAIT_CALLBACK,
CALLBACK,
TIMEOUT,
EXCEPTION,
FINISHED
};

AsyncTask() :
    id(0),
    parent_id(0),
    timeout_threshold(0),
    status(UNSCHEDULED) {}
virtual ~AsyncTask() {}
virtual Status process() = 0;
virtual Status timeout() { return TIMEOUT; }
virtual void destroy() {}
virtual void reset() {
    id = 0;
    parent_id = 0;
    timeout_threshold = 0;
    status = UNSCHEDULED;
}

virtual void callback() {}
virtual void callbackExcepiton() {}
virtual void callbackTimeout() {}

…….
private:
int64_t id;
int64_t parent_id;
int32_t timeout_threshold; // millisecond;
std::atomic status;
};
AsyncTask的状态转换图
AsyncTask约束了异步任务的7种状态,和8中状态转换。其中TIMEOUT和EXCEPITON是等效的,为了方便区分两种异常而设置两个状态。

并行任务AsyncParallelTask
AsyncParallelTask内部流程图

并行子任务AsyncParallelSubTask
拆包后分发操作主要在AsyncParallelSubTask中执行。需要创建AsyncParallelSubTask的子类,实现其中分发操作和合并结果的操作。

使用举例
初始化AsyncTaskController
在进程Init中执行,全局单例。设置分发的线程池。

static ThreadPool thread_pool(config.getWorkerThreadNum()); 
auto& task_controller = Singleton::GetInstance();
task_controller.setThreadPool(&thread_pool);

定义AsyncParallelSubTask的子类PredictAsyncParallelSubTask 
主要实现process和mergeResult两个函数,具体参考

https://gitlab.vmic.xyz/iai_common/rank/blob/experiment3/task/predict_async_parallel_subtask.h

https://gitlab.vmic.xyz/iai_common/rank/blob/experiment3/task/predict_async_parallel_subtask.cpp

class PredictAsyncParallelSubTask : public AsyncParallelSubTask {
public:
PredictAsyncParallelSubTask() :
alg_info(nullptr),
context(nullptr),
split_info({0}) {}

virtual ~PredictAsyncParallelSubTask() {}

virtual Status process() {
    if (nullptr == context) {
        throw std::runtime_error("context is nullptr");
    }
    if (nullptr == alg_info) {
        throw std::runtime_error("alg_info is nullptr");
    }
    PredictService::asyncRequestZeusServer(this, *context, *alg_info, split_info);
    return WAIT_CALLBACK;
}

virtual void mergeResult();

virtual void reset() {
    AsyncParallelSubTask::reset();
    alg_info = nullptr;
    split_info = {0};
    context = nullptr;
    temp_res.Clear();
}

void collectResult(const zeus::proto::ZeusResponse& res) {
    auto& zeus_res = const_cast(res);
    temp_res.mutable_item()->Swap(zeus_res.mutable_item());
    temp_res.mutable_model_response()->Swap(zeus_res.mutable_model_response());
}

void setAlgInfo(AlgInfo* alg_info) { this->alg_info = alg_info;};
void setRankContext(RankContext *context) { this->context = context;}
void setSplitInfo(SplitInfo& split_info) { this->split_info = split_info;}

private:
void praseZeusToScoreItem(TargetCode code, double score, ScoreItem *score_item);

AlgInfo* alg_info;
RankContext *context;
SplitInfo split_info;
zeus::proto::ZeusResponse temp_res;

};
创建AsyncParallelTask
具体参考

class PredictRankTask : public AsyncTask {
public:
……
private:
AsyncParallelTask parallel_task;
……
};

……
for (int32_t partition_id = 0; partition_id < partition_count; ++partition_id) {
int64_t total_count = req_item_size;
int64_t offset = split_count * partition_id;
int64_t end = offset + split_count;
end = end > total_count ? total_count : end;
SplitInfo split_info({total_count,
split_count,
partition_count,
partition_id,
offset,
end});

            auto sub_task = std::make_shared();
            sub_task->setAlgInfo(const_cast(&alg_info));
            sub_task->setSplitInfo(split_info);
            sub_task->setRankContext(&getContext());
            parallel_task.addSubTask((std::shared_ptr)sub_task);
        }

……
auto task = this;
parallel_task.setAllDoneCallback([=]() {
task->response();
task->setStatusCallback();
});

    parallel_task.setIncomplateCallback([=]() {
            task->response(Error::E_INCOMPLATE, "some predict server is error!");
            task->setStatusCallback();
            });

    parallel_task.setAllFailCallback([=]() {
            task->response(Error::E_PREDICT_ALL_FAILED, "all predict server is error!");
            task->setStatusCallback();
            });

    int32_t timeout = PredictService::getMaxTimeout(scene_id, sub_alg);
    parallel_task.setTimeoutCallback(timeout, [=]() {
            task->response(Error::E_PREDICT_ALL_TIMEOUT, "all predict server timeout!");
            task->setTimeout();
            });

    auto& task_controller = Singleton::GetInstance();
    parallel_task.setController(&task_controller);
    parallel_task.setId(task_controller.generateUniqueId());
    setStatusWaitCallback(std::memory_order_relaxed);
    task_controller.scheduleImmediately(¶llel_task);

执行调度
task_controller.scheduleImmediately会在当前线程分发拆包到线程池。而task_controller.schedule则会在线程池中选择一个线程分发。

    auto& task_controller = Singleton::GetInstance();
    parallel_task.setController(&task_controller);
    parallel_task.setId(task_controller.generateUniqueId());
    setStatusWaitCallback(std::memory_order_relaxed);
    task_controller.scheduleImmediately(¶llel_task);

编码
源码地址:

测试
压力测试
测试机器为 2017H2-A1-1, 32 core机器

QPS cpu num of items body length session latency P99 session latency AVG
client latency AVG

bandwidth mem remark
300 56% 1W 200KB 43 35 40 3.4 Gb/s 1%
1600 62% 2k 40KB 31 21.6 24 3.64Gb/s 1.1%
稳定性测试
测试方法:
CPU 60%的压力下,持续测试24小时。 

测试结果:
Rank服务可稳定提供服务。无内存泄露。

极限测试
测试过程:

缓慢把CPU压力从30%提升到90%~100%之间,并维持10分钟,然后把cpu压力降低至60%。整个过程中观察Rank稳定性,有无内存泄露。

测试结果:

CPU压力达到90%以上时,Rank内存增长,超时错误日志变多,定时器失准,返回大量超时、错误。 

Cpu压力降低至60%之后,Rank服务恢复正常,内存使用率变小,无内存泄露,超时错误日志不再新的产出。

符合预期。

打分一致性测试
测试方法:
使用rank-diff工具,从passby环境,复制两份流量请求新旧rank生产环境,分别记录打分结果。最后通过python脚本统计打分结果差异。

测试结果:
1000qps,新旧rank打分一致,差异率小于99.9%,满足需求。

产生差异的数据,分为两种。1)为打分近似值,差别0.01以下。 2)打分无效取默认值0.001.

有锁Rank和无锁Rank性能对比
2k条广告时,1600qps,有锁和无锁Rank压力测试性能对比
测试机器  CPU 32 cores,其中QPS、带宽都是相同的。

有锁
无锁
remark 
QPS 1600
相同
CPU 54.1% 63%
session latency AVG 15 21.7
session latency P99 21 31
bandwidth 3.64Gb/s 3.64Gb/s 相同
req body length 40 KB 40 KB 相同
Context Switch

AE-Memcached 优化记录

优化背景和目的

  • 学习Memcached 代码
  • 将 Memcached 的代码成为自己的技术积累
  • 优化Memcache 代码,提高自己系统分析能力

源代码托管于Github上:

https://github.com/zuocheng-liu/ae-memcached

性能优化

网络模型的优化

  • 网络IO多路复用 + 单线程

  • 将 Redis 异步库 移植至 Memcached

优化动态申请内存机制

  • 使用预分配,减小系统调用 malloc、realloc、free的次数,主要出现在新建/关闭链接时,会有较多的系统调用

部分小的函数使用宏代替

优化Memcache协议命令的解析

  • 调整各个命令的解析顺序,把get 和 set 命令放到最前面

软件架构优化

软件架构优化,保证关键代码性能不变

使用宏加强代码复用

  • 重构verbose日志
  • 重构网络库
  • 重构slab

命令模式重构 Memcache 协议

  • 创建command_service类,统一管理命令的解析、处理

更深层次的抽象

将 stats 、 settings 、 logger 和全局资源进行抽象

解耦

  • 将各个模块接口化,减少模块间耦合,尤其是 slab item memcached之间的耦合
  • 依赖注入原则,增强各个模块的复用,其中mem_cache模块 settings等可以形成框架。
  • logger
  • command service

线程安全的单例模式-以C++代码为例

本文描述3种场景下的单例模式:

  • 进程体内无线程的单例模式
  • 进程体内多线程单例模式
  • 在单个线程体中的单例模式

本文所写单例模式代码都使用懒汉模式。

进程体内单例

注意问题:

  • 如果进程体中运行多个线程,则需要考虑多线程同步访问的问题。
  • 如果进程体重没有运行多个线程,则不需要考虑多线程同步访问。
  • 使用线程同步锁保证多进程同步

使用场景举例 :

  • 日志类、文件读写类
  • 资源管理类

代码示例:

进程体内没有运行多线程的单例模式,无需考虑线程同步与互斥

class Singleton {
  public:
    static Singleton* getInstance() {
        if (NULL == instance) {
          instance = new SingletonInside();
        }
        return instance;
    }
  private:
    SingletonInside(){}
    ~SingletonInside() {}
    static Singleton* instance;
};

Singleton::instance = NULL;    

进程体内运行多线程单例模式,使用系统mutex保证线程安全

class Singleton {
  public:
    static Singleton* getInstance() {
        pthread_once(&g_once_control, InitOnce);
        pthread_mutex_lock(&mutex);  // lock
        if (NULL == instance) {
          instance = new SingletonInside();
        }
        pthread_mutex_unlock(&mutex); // unlock
        return instance;
    }
  private:
    SingletonInside() {

    }
    ~SingletonInside() {
       pthread_mutex_destroy(&mutex);   // destroy lock
    }
    static void InitOnce(void) {
      pthread_mutex_init(&mutex,NULL);  // init lock
    }
    Singleton* instance;
    static pthread_once_t g_once_control;
    static pthread_mutex_t mutex;
};
Singleton::instance = NULL;
pthread_once_t Singleton::g_once_control = PTHREAD_ONCE_INIT;

单个线程体中的单例

某些资源在单个线程体内需要保持单例,即每个线程体内都保持唯一。每个线程体内,对象相互隔离,则无需考虑线程安全问题。

此种单例模式的实例需要调用系统提供的 TLS 接口,放于线程局部存储中。

使用场景举例:

  • 多路复用Socket封装类
  • 网络上下文环境类
  • 线程安全的资源

代码示例

class Singleton {
  public:
    static Singleton* getInstance() {
       pthread_once(&g_once_control, InitOnce);
       Singleton* instance = (Singleton*)pthread_getspecific(g_thread_data_key);

        if (NULL == instance) {
          instance = new SingletonInside();
          pthread_setspecific(g_thread_data_key, (void*)Singleton)
        }
        return instance;
    }
  private:
    SingletonInside() {}
    ~SingletonInside() {
       pthread_key_delete(g_thread_data_key);
    }
    static void InitOnce(void) {
      pthread_key_create(&g_thread_data_key, NULL);
    }

    static pthread_once_t g_once_control;
    static pthread_key_t g_thread_data_key;
};

pthread_once_t Singleton::g_once_control = PTHREAD_ONCE_INIT;
pthread_key_t Singleton::g_thread_data_key;

如果使用 Poco库 TreadLocal ,代码还会简洁很多,性能上也会好很多

    class Singleton {
      public:
        static Singleton* getInstance() {
            if (NULL == instance) {
              instance = new SingletonInside();
            }
            return instance;
        }
      private:
        SingletonInside() {}
        ~SingletonInside() {
           delete instance;
        }
        static Poco::ThreadLocal<Singleton> instance;
    };
Poco::ThreadLocal<singleton> Singleton::instance = NULL;

总结

  • 无论程序是多进程运行还是多线程运行,代码都要尽量兼容线程安全

工程师手记-PNI

PNI 简介

PNI,是PHP Native Interface的简写。它是PHP的一个扩展,可以通过它,让PHP直接调用C语言写的函数。

想法源自在百度做项目时,不时地会面临同样一个问题,PHP该如何直接快速地与其他语言交互呢?

PHP 扩展的不足

用C写PHP的扩展是常规的方法。不过使用这种方法总是要面临诸多问题:

  • 作为开发者,要去学习PHP-API、Zend-API,这是不小的学习成本,而且用C做开发测试的效率都不高。
  • 写好了PHP扩展,怎么部署呢,还要找运维工程师去讨论、去争取后才能上线。

来来回回项目进度就被拖慢了。

有如此技术痛点,就找个通用的法子吧。

调研过程

JNI 和 Python

Java 可以通过JNI调用C/C++,Python也有相应的包,比如ctypes

HHVM

稍微去翻了一下HHVM的扩展,发现HHVM有 native interface, 但稍微看了一下,却发现那只是HHVM的native interface,PHP无法使用。

这个项目,模仿JNI,就取名PNI吧

c 对动态链接库的动态调用

c 加载动态链接库及调用函数有现成的方法。

使用dlfcn.h 库中的dlopen、dlsym、dlclose三个函数就够了。

设计和实现过程

接口设计

最开始的方案,就是模仿JNI。因此在最初的实现里,PNI对动态链接库的查询,调用函数的方法都直接模仿了JNI的接口实现。测试使用时,却感觉PNI的接口非常不友好。

和Java、Python 不同,PHP是弱类型语言。JNI 可以在给函数传递参数时,参数的数据类型是已知的,但是PHP传的都是zval,类型并不可知。我把数据类型的控制交给了开发者,在PNI的扩展里,通过struct zval 中 type 字段判断,此种方案让PNI很难于被使用。

于是改进方案,在PNI里添加PNIInteger、PNIDouble、PNIChar和其他几个可以标明参数数据类型的类。

参数压栈

PHP 的函数调用后,动过dlfcn.h库,可以找到函数地址,但是如何调用函数呢?调用函数又如何传参呢?我们无法知道所调用函数的参数列表是什么样子。

由于没有找到比较好的方案,所以就借助于汇编,使用汇编模式C语言的参数压栈方法。

于是写了很多种C语言的函数,主要是参数列表不一样,总结了GCC编译器在x86_64架构CPU下参数压栈的几个特点:

  • 8、16、32位和64位整形的参数,按顺序传到64位的寄存器,当多于6个整形参数时,剩下的整形参数都压栈

  • 32位和64位浮点,都传到64位的浮点计算器,浮点参数多于6个时,剩下的浮点参数都无效

无论C函数的参数是什么样的类型,PNI都将其按64位的整形或浮点处理。

问题解决了

系统兼容性

因为参数压栈的问题,目前PNI只支持GCC编译器和x86_64架构的CPU。其他架构和编译器都没有来得及去实现。

总结

吭哧吭哧,PNI最初版本调试测试成功。

后记

如何让PNI被更多的人知道并使用,怎么做呢?(还在思考中)

  • 开源,传到GitHub
  • 融入社区
  • 在问答网站上多回答相关问题

其他方法呢?再想一下。

软件设计之缓存使用

本文主要讨论分布式环境下,缓存如何在软件设计作用、原理、实现方式及注意问题。

缓存的作用

  • 减小原始数据访问压力
  • 提高资源利用率

缓存的原理

局部性原理

缓存的实现方式

查询算法

  • 散列算法,Hash 、 MD5 等
  • B数、二叉树、有序二分查找等

存储

  • 只将访问量最高的部分数据放入缓存
  • 将数据放到比原始IO速率更高的存储介质中

缓存资源回收

  • RUL 算法
  • 定时清理
  • 设置资源有效时间

缓存的存储介质

  • CPU 寄存器
  • 内存
  • 本地文件
  • 分布式系统(Memcache 、 Redis)
  • 数据库缓存数据表

缓存设计注意的问题

缓存的一致性,Cache coherence

  • 避免数据脏读
  • 多级缓存的一致性协议

系统的鲁棒性

  • 在缓存系统停止服务,但仍能保证整体系统正常运行。因此在使用缓存之前,检查提供缓存系统的有效性。

单机缓存

  • 单机缓存是指,将系统资源存放于每个单台服务器上,而不是集中存储与分布式缓存系统中。 缺陷是,如果原数据发生更改,为保证一致性,则必须调用每一台服务器清理或更新缓存。

软件设计之浮点运算

工程项目中,需要注意浮点数的点

  • 不能有 == 比较
  • 上下游注意nan值检查

计算机对浮点的表示和运算的特点

二进制表示

  • 部分十进制小数转化为二进制小数后,是无限循环小数,比如 0.05 转换为二进制 0.000011001100110011001100110011…… 如果截断,实际计算机存储的是 0.049999999999

IEEE 754 标准下的浮点

  • 小数位数是精确的
  • 有效位数是有限的

计算机做浮点运算的陷阱和缺陷

  • 丢失精度,近似比较不准确

正确的比较方法 : if (a - b < 0.0000001 && a-b > -0.0000001) {}

  • 四舍五入,导致误差累积

软件设计时如何保证高精度的计算

使用高精度计算库,比如:

  • Java 使用BigInteger 、BigDecimal 类
  • PHP 使用bcmath扩展提供的方法
  • C/C++ 自行编写高精度算法和数据结构

附录

优秀文档总结:https://gywbd.github.io/posts/2015/9/floating-point-number.html