C++无锁编程-单链表

无锁编程单链表的实现,主要特性如下:

  • 只能从尾部追加元素
  • 不支持删除元素
  • 支持Iterator遍历元素
  • 提供了多线程情况下的测试用例

适用的场景如下:

  • 搜索引擎,实时索引中的倒排链表
  • 只添加元素,不删除元素的多线程场景

代码如下:

#include 
#include 
#include 
#include 
template
class LockFreeSignleLinkedList {
public:
    struct Node {
        union Data {
            DataType *ptr;
            uint64_t value;
        } data;
        std::atomic next = nullptr;
    };

    class Iterator {
    public:
        Iterator operator ++(int) {
            Iterator it;
            it.ptr = ptr->next;
            return it;
        }

        Iterator& operator ++() {
            ptr = ptr->next;
            return *this;
        }

        bool operator ==(Iterator& iter) {
            return (ptr == iter.ptr);
        }

        Iterator& operator =(const Iterator& iter) {
            ptr = iter->ptr;
            return *this;
        }

        DataType& operator *() {
            if (std::is_class::value) {
               return *(ptr->data.ptr);
            } 
            uint64_t* data_ptr = &(ptr->data.value);
            DataType* ptr = reinterpret_cast(data_ptr);
            return *ptr;
        }

        friend class LockFreeSignleLinkedList;
    private:
       Node* ptr = nullptr;
    };

    LockFreeSignleLinkedList() {
        head = new Node;
        tail = head;
    }

    void push_back(const DataType& data) {
        push_back(&data);
    }

    void push_back(const DataType* ptr) {
        Node *tmp = new Node;
        if (std::is_class::value) {
            tmp->data.ptr = const_cast(ptr);
        } else {
            DataType* tmp_ptr = reinterpret_cast(&(tmp->data.value));
            *tmp_ptr = *ptr;
        }
        for(;;) {
            ajustTail();
            Node *real_tail = tail.load(std::memory_order_relaxed);
            auto& next = real_tail->next;
            Node *real_next = next.load(std::memory_order_relaxed);
            if (nullptr != real_next) {
                continue;
            }
            if (!next.compare_exchange_weak(real_next, tmp, std::memory_order_relaxed)) {
                continue;
            } else {
                break;
            }
        }
        cnt.fetch_add(1, std::memory_order_relaxed);
        ajustTail();
    }

    Iterator begin() {
        Iterator it;
        it.ptr = head->next;
        return it;
    }

    Iterator end() {
        static Iterator blank_it;
        return blank_it;
    }

    size_t size() { return cnt.load(std::memory_order_relaxed); }

private:
    void ajustTail() {
        for(;;) {
            Node *real_tail = tail.load(std::memory_order_relaxed);
            auto& next = real_tail->next;
            Node *real_next = next.load(std::memory_order_relaxed);
            if (nullptr == real_next) {
                return;
            }
            if (!tail.compare_exchange_weak(real_tail, real_next, std::memory_order_relaxed)) {
                continue;
            }
        } // end for
    }

    Node* head = nullptr;
    std::atomic tail = nullptr;
    std::atomic cnt = 0;
};

int main()
{
    LockFreeSignleLinkedList l;
    l.push_back(9);
    l.push_back(2);
    l.push_back(4);
    l.push_back(5);
    std::cout << "list:" << std::endl;
    for (const int32_t n : l) {
        std::cout << n << std::endl;
    }
    std::cout << "retry to list:" << std::endl;
    for (auto it = l.begin(); it != l.end(); ++it ) {
        std::cout << *it << std::endl;
    }

    LockFreeSignleLinkedList l2;
    volatile bool run = false;
    auto fun = [&]() {
        for (;;) {
            if (!run) {
                continue;
            } else {
                break;
            }
        }
        std::cout<< "start pid:" << std::this_thread::get_id() << std::endl;
        for (int32_t i = 0; i < 900000; ++i) {
            l2.push_back(i);
        }    
        std::cout<< "end pid:" << std::this_thread::get_id() << std::endl;
    };

    std::thread t1([&](){fun();});
    std::thread t2([&](){fun();});
    std::thread t3([&](){fun();});
    std::thread t4([&](){fun();});
    std::atomic_thread_fence(std::memory_order_seq_cst);
    run = true;
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    size_t s1 = 0;
    for (auto it = l2.begin(); it != l2.end(); ++it ) {
       ++s1;
    }
    std::cout << "s1:" << s1 << std::endl;
    std::cout << "s2:" << l2.size() << std::endl;

    return 0;
}

C++软件架构选型

基础服务 (开源)

项目 推荐☆☆☆☆☆ 推荐☆☆☆☆ 推荐☆☆☆
虚拟化 podman docker
虚拟机集群 k8s
分布式存储 ceph

中间件

项目 推荐☆☆☆☆☆ 推荐☆☆☆☆ 推荐☆☆☆
名字服务/服务发现 consul
分布式协调一致性 etcd zookeeper
消息队列 kafka rabitmq
监控 prometheus open-falcon Zabbix
监控展示 grafana
日志存储 druid elastic-search Opentsdb
分布式任务调度 Elastic-Job
kv缓存服务 redis
kv数据库 leveldb mongodb
向量检索引擎 faiss milvus proxima

组件

项目 推荐☆☆☆☆☆ 推荐☆☆☆☆ 推荐☆☆☆
RPC框架 BPRC thrift
IDL protobuffer thrift json
基础库 boost poco
log glog Spdlog
malloc jemalloc tcmalloc
json fastjson jsoncpp
xml tinyxml2
yaml yaml-cpp
kv缓存 rockdb

C/C++编程索引

本博客中关于C/C++编程内容的索引

语言特性

编译器

经典实例

无锁编程

其他

C++实现一个空的日志类

在C++项目中,一些场景下,可能会用到一个空的日志类,覆盖项目中的日志实现,让项目不打印日志,比如:

  • 屏蔽某个类库中的日志(因为库的日志打印机制设计的不够好)
  • 单例测试中代码中,屏蔽项目日志

应对以上需求,本文实现了一个不打印日志的类,有如下特点:

  • 覆盖项目中日志的宏定义,比如LOG_INFO等,不打印日志
  • 重载operator<<操作符的实现

实现中有哪一些陷阱呢?主要在std::endl上面。

在重载operator<<操作符的时候,使用LOG_INFO<<"log"<<std::endl; 时,会提示,std::endl是未知类型(unresolved overloaded function type)

关于std::endl的本质,是一个函数模板,本文不做详细阐述,只给出实现方法。

代码如下,使用c++11标准实现:


#ifdef DISABLE_PRJ_LOG

#undef LOG_DEBUG
#undef LOG_INFO
#undef LOG_WARN
#undef LOG_ERROR
#undef LOG_FATAL

#include 
#include 

class BlankLogObject {
public:
    static BlankLogObject& Ins() {
        static BlankLogObject obj;
        return obj;
    }
    BlankLogObject& operator<<(void* in) { return *this; }
    BlankLogObject& operator<<(char in) { return *this; }
    BlankLogObject& operator<<(const char* in) { return *this; }
    BlankLogObject& operator<<(const std::string& in) { return *this; }
    BlankLogObject& operator<<(int16_t in) { return *this; }
    BlankLogObject& operator<<(int32_t in) { return *this; }
    BlankLogObject& operator<<(uint32_t in) { return *this; }
    BlankLogObject& operator<<(int64_t in) { return *this; }
    BlankLogObject& operator<<(uint64_t in) { return *this; }
    BlankLogObject& operator<<(float in) { return *this; }
    BlankLogObject& operator<<(double in) { return *this; }
    BlankLogObject& operator<<(BlankLogObject& in) { return *this; }
    BlankLogObject& operator<<(std::ostream& in) { return *this; }
    BlankLogObject& operator<<(std::ostream (*in)(std::ostream&)) { return *this; }

    typedef std::basic_ostream > endl_type;
    BlankLogObject& operator<<(endl_type& (*in)(endl_type&))  { return *this; }
};
}

#define LOG_DEBUG rec::BlankLogObject::Ins()
#define LOG_INFO  rec::BlankLogObject::Ins()
#define LOG_WARN  rec::BlankLogObject::Ins()
#define LOG_ERROR rec::BlankLogObject::Ins()
#define LOG_FATAL rec::BlankLogObject::Ins()
#endif
int main() {
    LOG_INFO << "skf" << 123 << 0.1 << std::endl;
    return 0;
}

关于gcc中string的cow机制

梗概

已知 C++ 11禁止使用COW,本文进行验证。

测试代码

#include 
#include 
#include

int main() {
    std::string origin("cow\n");
    std::string copy( origin);

    printf(origin.c_str());
    printf(copy.c_str());
    printf("origin\taddress is %x\n",(int64_t)origin.c_str());
    printf("copy\taddress is %x\n",(int64_t)copy.c_str());
    copy[0] = 'w';
    printf(origin.c_str());
    printf(copy.c_str());
    printf("origin\taddress is %x\n",(int64_t)origin.c_str());
    printf("copy\taddress is %x\n",(int64_t)copy.c_str());
    return 0;
}

使用gcc 6.3.0测试

$ g++ -o cow cow.cpp --std=c++98

$ ./cow
cow
cow
origin  address is 8c69cc40
copy    address is 8c69cc20
cow
wow
origin  address is 8c69cc40
copy    address is 8c69cc20

$ g++ -o cow cow.cpp --std=c++11
$ ./cow
cow
cow
origin  address is 28240270
copy    address is 28240250
cow
wow
origin  address is 28240270
copy    address is 28240250

使用gcc 4.8.5测试

$ g++ -o cow cow.cpp --std=c++98
$ ./cow
cow
cow
origin  address is aa9028
copy    address is aa9028
cow
wow
origin  address is aa9028
copy    address is aa9058

$ g++ -o cow cow.cpp --std=c++11
$ ./cow
cow
cow
origin  address is 23a0028
copy    address is 23a0028
cow
wow
origin  address is 23a0028
copy    address is 23a0058

结论

从gcc的实际的实现看,cow 并非是c++11之后禁止的,而是从gcc的某个版本开始禁止cow机制,与c++11无必然关系。

Google protobuf使用技巧和经验总结

技巧 & 经验

性能优化

把repeated message结构尽可能摊平为基础类型的repeated字段

基础类型的repeated字段,包含 repeated int32, int64, float,double,bool等,但不包含string、bytes、message

比如:

message Item {
    int32 id = 1;
    int32 score = 2;
}

message R {
    repeated Item items = 1;
}

改为下面的设计,会提升序列化和反序列效率

message R {
    repeated int32 item_id = 1;
    repeated int32 item_score = 2;
}

原理是非string的基础类型的repeated字段,在申请内存时pb会申请连续线性大块内存,效率高;而message 的repeated字段,会按对象逐个去申请空间。

这种设计还有一个好处就是不会触发调用析构函数,如果采取 repeated Item 这种结构,会触发大量的析构函数,浪费 CPU 与 时间。(by chengyuxuan_yutian@163.com)

善用arena管理内存

  • arena对基础类型,比如int32, int64, float,double,bool等管理效率优化明显
  • arena不会管理string类型的内存申请(更高的版本已经支持,待验证)。

用固定长度repeated uint32 替换字符串

字符串是一种不定长的数据结构,内存管理方式成本较高。对于定长的字符串,通过转换成repeated uint32类型,可以获得更高效的管理。

除此之外,repeated uint32 也支持由arena管理。

善用Any类型

假设3个网络服务的调用关系如下:
A->B->C。
其中存在某些pb结构仅会由B透传给C,而B不需要解析,则可以把这些pb放入定义为any类型的字段中。

善用浅拷贝机制(set_allocated_xxx/release)

CopyFrom是深拷贝,若要实现浅拷贝则可以通过 set_allocated_xxx/release 两个函数进行控制

结合arena使用浅拷贝机制(unsafe_arena_set_allocated)

set_allocated_xxx的风险在于,pb析构的时候会把元素也析构掉,无法重复利用。且在一些特殊场景,在无法控制pb析构而不能使用release函数。这些场景可能是pb的析构工作由框架控制,旧的代码封装层次太深等。

这种情况可以使用unsafe_arena_set_allocated_xxx 避开这个问题。

使用陷阱

不要有交叉依赖

举例一C++系统中,模块A依赖 PB;模块B依赖PB,而模块C依赖A和B。则编译模块C时一定要同时编译A、B、PB。

利用protobuf一些特性来规避陷阱

良好的可扩展性 & 保留未定义字段

良好的可扩展性使得protobuf更好地向后兼容。上游更新了proto,新增字段,下游虽然没有更新proto文件,但是新增的字段依然可以保留,来自上游的字段可以透传给下游。拼接下游请求的结构pb时,尽可能使用CopyFrom,避免把字段逐个set。

使用编号定位存储的字段

为了更好地向后兼容,应该避免修改proto文件中现有字段的名字、类型。需要修改时,通过追加新字段(字段编号增加),弃用旧字段的方式。

故障相关

protobuf被广泛使用,饱经业界考验,如果遇到问题,绝大多数还是自身软件设计的问题。遇到问题,首先不应该怀疑protobuf,应该把视角集中到去发现自身的系统设计缺陷中。

一次内存泄露的故障排查

现象:

公司里一个c++网络服务中, PV较低时没有内存泄露;而PV较高,cpu idle降到30%以下,开始内存泄露,直到OOM。

排查过程:

用了 tcmalloc和gperf,逐步定位是protobuf 申请 repeated字段的空间,没有及时释放。repeated字段约1k~1w的规模。然后逐步缩小范围。

结果:

竟然是释放内存,都放到了一个线程中。当流量大时,单个线程计算能力成为瓶颈,内存释放变慢,表现为内存泄漏。

总结

protobuf释放的代价较大,不要全部的protobuf只放在一个线程操作。场景距离,lru cache的过期元素剔除。

https://www.cnblogs.com/zgwu/p/10403939.html

brpc框架使用经验汇总

brpc自适应限流的一个bug

  • brpc 会不断尝试调小 max_concurrency, 然后重新计算
  • 因为上面的原因,当qps比较低时,max_concurrency会被计算的非常小
  • 一旦面临流量的突增,即便在机器的负载范围内,但因为超过max_concurrency,而返回限流的错误
  • 紧接上一条,即便客户端重试发送别的机器,因为下游服务整体max_concurrency,依然会接收限流的错误。

结论 brpc的自适应限流,只适合大流量,且流量稳定的场景。不适合小流量的服务。

C++无锁编程之自旋锁(spinlock)的实现

此实例通过c++11实现。

#pragma once

#include 
#include 

class Spinlock {
public:
    Spinlock() : flag(ATOMIC_FLAG_INIT), wait_count(2000) {}

    void lock() {
        int64_t i = 0;
        while(flag.test_and_set(std::memory_order_acquire)) {
            __asm__ ("pause");
            if (++i > wait_count) {
                sched_yield();
                i = 0;
            }
        }
    }

    bool try_lock() {
        if (flag.test_and_set(std::memory_order_acquire)) {
            return false;
        }
        return true;
    }

    void unlock() {
        flag.clear(std::memory_order_release);
    }

private :
    std::atomic_flag flag;
    int32_t wait_count;
};

C++无锁编程之AsyncParallelTask框架

简介
AsyncParallelTask框架,是为了适用于Rank3.0的拆包合包业务场景而设计和实现的异步任务调度框架,且具有良好的通用性和可移植性。

Rank拆包合包的业务场景,即Rank3.0接受到请求后,拆包往下游预估服务分发请求,在接收到返回后合并打分结果,最终把结果返回给上游。

使用AsyncParallelTask框架的优点
拆包合并由框架控制,免去了自己控制拆包后多个子任务的状态管理。
无锁化,没有锁竞争,性能高。
提供超时管理机制,有助于增强系统稳定性。
拥有友好的API,使用简单。
AsyncParallelTask框架可适用的场景举例
需要拆包合包的预估服务,比如Rank模块
搜索引擎的merger模块
其他需要拆包合包的业务场景
设计
设计思想
使用异步IO方式,不会引起线程阻塞,且通过超时控制来避免长尾任务。
通过使用原子变量和原子操作(atomic)来控制计数、状态变化。
支持多线程,且逻辑实现不使用任何种类的锁,使用lockfree数据结构和线程间通信机制来保证线程安全。
通过使用C++11标准的新特性,比如仿函数、参数模板等,来对外提供简洁和更加友好的API。
类域设计
AsyncParallelTask框架,总共包含控制器AsyncTaskController、定时器TimerController、异步并行任务类AsyncParallelTask、分发子任务类AsyncParallelSubTask等4部分组成。

控制器AsyncTaskController
AsyncTaskController是AsyncParallelTask的调度器,提供调度AsyncParallelTask的接口。内部包含线程池、定时器。

当AsyncTaskController调度AsyncParallelTask的运行时,首先把AsyncParallelTask放入线程池中调度,然后启动对AsyncParallelTask的超时监控。

定时器TimerController
TimerController的实现原理
TimerController如何实现定时的原理和libevent并无不同,都使用了Reactor的设计模式。但是TimerController通过生产者、消费者模型,来实现多线程添加定时任务,并保证线程安全。TimerController使用了C++11的新特性,简化了代码实现。

使用最小堆来管理延时任务和周期任务
使用1个timerfd配合epoll来实现定时任务的激活
使用1个eventfd配合epoll来实现定时任务的添加
使用一个LockFree的栈,实现生产者消费者模型。由外部多线程写,TimerController内部1个线程来读。
TimerController内部流程图

异步任务基类AsyncTask
任何继承AsyncTask的类都可供AsyncTaskController进行调度。

AsyncTask中定了一个基本的接口和AsyncTask的状态的转换。

部分代码举例:

class AsyncTask {
public:
enum Status {
UNSCHEDULED,
PROCESSING,
WAIT_CALLBACK,
CALLBACK,
TIMEOUT,
EXCEPTION,
FINISHED
};

AsyncTask() :
    id(0),
    parent_id(0),
    timeout_threshold(0),
    status(UNSCHEDULED) {}
virtual ~AsyncTask() {}
virtual Status process() = 0;
virtual Status timeout() { return TIMEOUT; }
virtual void destroy() {}
virtual void reset() {
    id = 0;
    parent_id = 0;
    timeout_threshold = 0;
    status = UNSCHEDULED;
}

virtual void callback() {}
virtual void callbackExcepiton() {}
virtual void callbackTimeout() {}

…….
private:
int64_t id;
int64_t parent_id;
int32_t timeout_threshold; // millisecond;
std::atomic status;
};
AsyncTask的状态转换图
AsyncTask约束了异步任务的7种状态,和8中状态转换。其中TIMEOUT和EXCEPITON是等效的,为了方便区分两种异常而设置两个状态。

并行任务AsyncParallelTask
AsyncParallelTask内部流程图

并行子任务AsyncParallelSubTask
拆包后分发操作主要在AsyncParallelSubTask中执行。需要创建AsyncParallelSubTask的子类,实现其中分发操作和合并结果的操作。

使用举例
初始化AsyncTaskController
在进程Init中执行,全局单例。设置分发的线程池。

static ThreadPool thread_pool(config.getWorkerThreadNum()); 
auto& task_controller = Singleton::GetInstance();
task_controller.setThreadPool(&thread_pool);

定义AsyncParallelSubTask的子类PredictAsyncParallelSubTask 
主要实现process和mergeResult两个函数,具体参考

https://gitlab.vmic.xyz/iai_common/rank/blob/experiment3/task/predict_async_parallel_subtask.h

https://gitlab.vmic.xyz/iai_common/rank/blob/experiment3/task/predict_async_parallel_subtask.cpp

class PredictAsyncParallelSubTask : public AsyncParallelSubTask {
public:
PredictAsyncParallelSubTask() :
alg_info(nullptr),
context(nullptr),
split_info({0}) {}

virtual ~PredictAsyncParallelSubTask() {}

virtual Status process() {
    if (nullptr == context) {
        throw std::runtime_error("context is nullptr");
    }
    if (nullptr == alg_info) {
        throw std::runtime_error("alg_info is nullptr");
    }
    PredictService::asyncRequestZeusServer(this, *context, *alg_info, split_info);
    return WAIT_CALLBACK;
}

virtual void mergeResult();

virtual void reset() {
    AsyncParallelSubTask::reset();
    alg_info = nullptr;
    split_info = {0};
    context = nullptr;
    temp_res.Clear();
}

void collectResult(const zeus::proto::ZeusResponse& res) {
    auto& zeus_res = const_cast(res);
    temp_res.mutable_item()->Swap(zeus_res.mutable_item());
    temp_res.mutable_model_response()->Swap(zeus_res.mutable_model_response());
}

void setAlgInfo(AlgInfo* alg_info) { this->alg_info = alg_info;};
void setRankContext(RankContext *context) { this->context = context;}
void setSplitInfo(SplitInfo& split_info) { this->split_info = split_info;}

private:
void praseZeusToScoreItem(TargetCode code, double score, ScoreItem *score_item);

AlgInfo* alg_info;
RankContext *context;
SplitInfo split_info;
zeus::proto::ZeusResponse temp_res;

};
创建AsyncParallelTask
具体参考

class PredictRankTask : public AsyncTask {
public:
……
private:
AsyncParallelTask parallel_task;
……
};

……
for (int32_t partition_id = 0; partition_id < partition_count; ++partition_id) {
int64_t total_count = req_item_size;
int64_t offset = split_count * partition_id;
int64_t end = offset + split_count;
end = end > total_count ? total_count : end;
SplitInfo split_info({total_count,
split_count,
partition_count,
partition_id,
offset,
end});

            auto sub_task = std::make_shared();
            sub_task->setAlgInfo(const_cast(&alg_info));
            sub_task->setSplitInfo(split_info);
            sub_task->setRankContext(&getContext());
            parallel_task.addSubTask((std::shared_ptr)sub_task);
        }

……
auto task = this;
parallel_task.setAllDoneCallback([=]() {
task->response();
task->setStatusCallback();
});

    parallel_task.setIncomplateCallback([=]() {
            task->response(Error::E_INCOMPLATE, "some predict server is error!");
            task->setStatusCallback();
            });

    parallel_task.setAllFailCallback([=]() {
            task->response(Error::E_PREDICT_ALL_FAILED, "all predict server is error!");
            task->setStatusCallback();
            });

    int32_t timeout = PredictService::getMaxTimeout(scene_id, sub_alg);
    parallel_task.setTimeoutCallback(timeout, [=]() {
            task->response(Error::E_PREDICT_ALL_TIMEOUT, "all predict server timeout!");
            task->setTimeout();
            });

    auto& task_controller = Singleton::GetInstance();
    parallel_task.setController(&task_controller);
    parallel_task.setId(task_controller.generateUniqueId());
    setStatusWaitCallback(std::memory_order_relaxed);
    task_controller.scheduleImmediately(¶llel_task);

执行调度
task_controller.scheduleImmediately会在当前线程分发拆包到线程池。而task_controller.schedule则会在线程池中选择一个线程分发。

    auto& task_controller = Singleton::GetInstance();
    parallel_task.setController(&task_controller);
    parallel_task.setId(task_controller.generateUniqueId());
    setStatusWaitCallback(std::memory_order_relaxed);
    task_controller.scheduleImmediately(¶llel_task);

编码
源码地址:

测试
压力测试
测试机器为 2017H2-A1-1, 32 core机器

QPS cpu num of items body length session latency P99 session latency AVG
client latency AVG

bandwidth mem remark
300 56% 1W 200KB 43 35 40 3.4 Gb/s 1%
1600 62% 2k 40KB 31 21.6 24 3.64Gb/s 1.1%
稳定性测试
测试方法:
CPU 60%的压力下,持续测试24小时。 

测试结果:
Rank服务可稳定提供服务。无内存泄露。

极限测试
测试过程:

缓慢把CPU压力从30%提升到90%~100%之间,并维持10分钟,然后把cpu压力降低至60%。整个过程中观察Rank稳定性,有无内存泄露。

测试结果:

CPU压力达到90%以上时,Rank内存增长,超时错误日志变多,定时器失准,返回大量超时、错误。 

Cpu压力降低至60%之后,Rank服务恢复正常,内存使用率变小,无内存泄露,超时错误日志不再新的产出。

符合预期。

打分一致性测试
测试方法:
使用rank-diff工具,从passby环境,复制两份流量请求新旧rank生产环境,分别记录打分结果。最后通过python脚本统计打分结果差异。

测试结果:
1000qps,新旧rank打分一致,差异率小于99.9%,满足需求。

产生差异的数据,分为两种。1)为打分近似值,差别0.01以下。 2)打分无效取默认值0.001.

有锁Rank和无锁Rank性能对比
2k条广告时,1600qps,有锁和无锁Rank压力测试性能对比
测试机器  CPU 32 cores,其中QPS、带宽都是相同的。

有锁
无锁
remark 
QPS 1600
相同
CPU 54.1% 63%
session latency AVG 15 21.7
session latency P99 21 31
bandwidth 3.64Gb/s 3.64Gb/s 相同
req body length 40 KB 40 KB 相同
Context Switch

一种压测工具使用的控制流量大小的算法-时间轮算法

压力测试工具是软件测试过程中的必用工具,而压力测试工具如何控制流量大小呢?

最常见的是计算每个请求之间的时间间隔,然后通过sleep方法来让两次请求产生间隔。这种方法有2个缺点,sleep时会让线程挂起,所以需要比较多的线程数;其二,当流量非常大的情况,比如qps达到10万以上时,会收到系统时钟精度和线程上下文切换的挑战。

本文设计了另外一种方法,底层原理为时间轮算法,利用二进制的与操作来实现。按照概率控制流量大小。但概率的计算并不依赖随机数,而是通过设置一个概率控制变量的方法,让流量的发送更加均衡。

代码如下:

class Transformer {
    public:
        Transformer() :
            send_num(0),
            qps(0),
            benchmark(0),
            counter(0),
            thread_pool(10) {}

        void run();
        void stop() { tc.stop(); }

    private:

        void sendOne();
        void transform();

        int32_t send_num;
        int32_t qps;
        int32_t benchmark;
        std::atomic counter;

        ThreadPool thread_pool;
        TimerController tc;
};
void Transformer::run() {
    qps = FLAGS_qps;
    if (qps <= 0 || qps > 1e5) {
        return;
    }

    int32_t query = (qps << 16) / 1000;
    send_num = query >> 16;
    query &= 0xFFFF;
    for (int i = 0; i < 16; ++i) {
        benchmark <<= 1;
        if (query & 1) {
            ++benchmark;
        }
        query >>= 1;
    }
    tc.cycleProcess(1, [this]() { this->transform(); });
}

void Transformer::transform() {
    uint32_t cur_c = counter.fetch_add(1, std::memory_order_relaxed);
    cur_c &= 0xFFFF;
    if (cur_c <= 0) {
        return;
    }
    int32_t delta = 0;
    for (int i = 0,bit = 1; i < 16; ++i, bit <<= 1) {
        if ((cur_c & bit) == 0) {
            continue;
        }
        if ((benchmark & bit) == 0) {
            break;
        } else {
            delta = 1;
            break;
        }
    }

    int32_t cur_send_num = send_num + delta;
    if (cur_send_num <= 0) {
        return;
    }
    for (int i = 0; i< cur_send_num; ++i) {
        thread_pool.enqueue([this]() { this->sendOne(); });
    }
}

C++-双缓冲(DoubleBuffer)的设计与实现

源码如下:

#pragma once

#include 

template
class DoubleBuffer {
    public:
        DoubleBuffer() : cur_index(0) {}

        T& getWorkingBuffer(std::memory_order order = std::memory_order_seq_cst) {
            return buffers[cur_index.load(order)];
        }

        T& getBackupBuffer(std::memory_order order = std::memory_order_seq_cst) {
            return buffers[1 ^ cur_index.load(order)];
        }

        void switchBuffer(std::memory_order order = std::memory_order_seq_cst) {
            cur_index.fetch_xor(1, order);
        }

    private:
        T buffers[2];
        std::atomic cur_index;
};

C++无锁编程-无锁栈的设计与实现

无锁栈实现原理

本文是用过C++11标准下的原子变量atomic的CAS操作compare_exchange_weak,来实现无锁栈(Lock-free Stack)。

  • 通过compare_exchange_weak方法,来实现栈顶元素添加操作的原子性。
  • 栈内元素都会由Node数据结构封装,可以规避CAS操作的ABA问题。

无锁栈实现编码

#ifndef LOCK_FREE_STACK_HPP
#define LOCK_FREE_STACK_HPP

#include 

template
class LockFreeStack {
public:
    struct Node {
        Node() : data(nullptr), pre(nullptr) {}
        void reset() { data = nullptr; pre = nullptr;}

        T *data;
        Node* pre;
    };

    LockFreeStack() : count(0), back(nullptr) {}
    ~LockFreeStack() { clear(); }

    void push(const T* data) {
        Node* node = alloc();
        node->data = const_cast(data);
        for (;;) {
            node->pre = back;
            if (back.compare_exchange_weak(node->pre, node, std::memory_order_seq_cst) ) {
                break;
            } else {
                continue;
            }
        }
        ++count;
    }

    T* pop() {
        for (;;) {
            Node* back_node = back;
            if (nullptr == back_node) { return nullptr; }
            Node* second_node = back_node->pre;
            if(!back.compare_exchange_weak(back_node, second_node, std::memory_order_seq_cst)) {
                continue;
            } else {
                --count;
                T* data = back_node->data;
                recycle(back_node);
                return data;
            }
        }
    }
    bool empty() { return (bool)count; }
    int64_t size() { return count; }
    void clear() { while (nullptr != pop()) {}; }

private:
    Node *alloc() { return (new Node()); }
    void recycle(Node *node) { delete node; }

    std::atomic count;
    std::atomic back;
};
#endif

使用C++11的特性来设计和实现API友好的高精度定时器TimerController

为什么设计和实现TimerController?

最新的TimerController代码保存在Github上面:https://github.com/zuocheng-liu/StemCell ,包含timer_controller.h 和 timer_controller.cpp两个文件,欢迎审阅!

因为软件设计中面临了一些实际问题

尤其在使用C++开发网络应用时常遇到下面的问题:

一、软件设计中,不会缺少通过使用定时器的来实现的场景,比如超时控制、定时任务、周期任务。

二、C/C++标准库中只有最原始的定时接口。没有提供功能完备的库来满足上面提到复杂场景。

三、第三方库中的定时器,往往存在一些问题,比如:

  • libevent、libev、libue 不是线程安全的,在多线程系统中,为了保证线程安全需要额外再进行封装。
  • redis的异步库libae对延时时间的处理是不准确的。

以上问题会让开发者开发新系统时带来一些困扰,但C++11新特性的出现,带来了解决上面问题的新思路。

C++11的新特性让定时器的实现更简单友好

TimerController接口更友好

接口参数支持C++11的lamaba表达式,让定时器的接口对开发人员更加友好。

代码更精简

TimerController的代码总计300~400行,而且功能完备,代码健壮。
C++11的新特性的使用,让代码更简洁,增强代码的可读性、可维护性。

保证线程安全

线程安全,是绕不开的问题。第三方库libevent等,在多线程环境中使用总是危险的。TimerController在设计之初就保证多线程环境下运行的安全性。

没有第三方依赖。

TimerController,没有依赖任何第三方库,完全依靠C/C++标准库和C++11的新特性来实现。

TimerController 接口设计

class TimerController {
    bool init(); // 初始化资源,并启动定时器
    void stop(); // 停止定时器,所有定时任务失效

    // 定时运行任务,参数delay_time单位是毫秒。后面参数是lamba表达式
    template<class F, class... Args>
    void delayProcess(uint32_t delay_time, F&& f, Args&&... args);

    // 周期运行任务,参数interval单位是毫秒,后面参数是lamba表达式。
    template<class F, class... Args>
    void  cycleProcess(uint32_t interval, F&& f, Args&&... args);
}

用一个实例来讲解TimerController的使用方法:

#include <iostream>
#include "timer_controller.h"
using namespace std;
using namespace StemCell;
int main() {
    srand((unsigned)time(NULL));
    try {
        TimerController tc;
        tc.init(); // 初始化 TimerController
        tc.cycleProcess(500, [=]() { cout << "cycle 0.5 sec" << endl; });
        for (int i = 0; i < 80; ++i) {
            // 随机产生80个延时任务,并延时执行
            auto seed = rand() % 8000;
            tc.delayProcess(seed + 1, [=]() { cout << "delay:" << seed << endl; });
        }
        sleep(8);  // 主线程睡眠8秒,让延时任务得以执行
        tc.stop(); // 停止 TimerController
        cout << "tc stoped!" << endl;
    } catch (exception& e) {
        cout << "error:" << e.what();
    }
    return 0;
}

TimerController 实现原理

TimerController如何实现定时的原理和libevent并无不同,都使用了Reactor的设计模式。但是TimerController通过生产者、消费者模型,来实现多线程添加定时任务,并保证线程安全。TimerController使用了C++11的新特性,简化了代码实现。

  • 使用最小堆来管理延时任务和周期任务
  • 使用1个timerfd配合epoll来实现定时任务的激活
  • 使用1个eventfd配合epoll来实现定时任务的添加
  • 使用一个线程安全的队列,实现生产者消费者模型。TimerController使用场景为多线程写,TimerController内部1个线程来读。

TimerController 高级用法

高延时的任务的处理

TimerController内部只有1个线程在执行定时任务。当高延时的任务增多时,可能会影响到任务运行的调度时间,高延时的任务需要在放入新的线程中运行。示例如下:

TimerController tc;
tc.init(); // 初始化 TimerController
// 把任务放入新线程或线程池中
tc.delayProcess(50, []() { std::thread([]() { do_long_time_task();}) });

TimerController保持全局单例

为了系统简洁,TimerController全局单例即可。
auto& tc = Singleton< TimerController >::GetInstance();

其他

如何避免CPU负载高时,定时器失准的问题?

TimerController 有待改进的点

  • 无锁化,目前使用了自旋锁在保证task队列的线程间互斥,后续可使用无锁队列替代有锁队列。
  • TimerController精度目前只有1毫秒,主要因为博主做网络开发都是毫秒级的,后续可以让TimerController支持更小的精度。
  • TimerController 使用了epoll、timerfd、eventfd等,只能在linux平台上面使用

源码地址

具体实现在 timer_controller.h 和 timer_controller.cpp两个文件里面。

使用docker搭建C/C++ 开发环境

优点和意义

  • 环境隔离、资源共享、节省机器资源
  • 轻量虚拟机,启动和运行迅速

使用 Dockerfile 构造镜像

Dockerfile 内容

FROM centos:centos7.4.1708

MAINTAINER Zuocheng Liu 

RUN yum -y --nogpgcheck install gcc gcc-c++ kernel-devel make cmake  libstdc++-devel libstdc++-static glibc-devel glibc-headers \
&& yum -y --nogpgcheck install openssl-devel gperftools-libs \
&& yum -y --nogpgcheck install psmisc openssh-server sudo epel-release \
&& yum -y --nogpgcheck install vim git ctags \
&& mkdir /var/run/sshd \
&& echo "root:123456" | chpasswd \
&& sed -ri 's/^#PermitRootLogin\s+.*/PermitRootLogin yes/' /etc/ssh/sshd_config \
&& sed -ri 's/UsePAM yes/#UsePAM yes/g' /etc/ssh/sshd_config \
&& ssh-keygen -t rsa -f /etc/ssh/ssh_host_rsa_key \
&& ssh-keygen -t dsa -f /etc/ssh/ssh_host_dsa_key

EXPOSE 22

CMD ["/usr/sbin/sshd", "-D"]

创建镜像

docker build -t cpp_dev:last .

创建容器

初始化容器, 宿主机调用脚本

  • 挂载容器
  • 调用容器内初始化脚本mount_in_docker.sh
#!/bin/bash

NEWUSER=$1
PORT=$2
PATH=$(dirname $(readlink -f "$0"))
echo "add User ${NEWUSER} sshd Port ${PORT}"
if [ ! -d /data/rtrs/${NEWUSER} ]; then
    /usr/bin/mkdir /data/rtrs/${NEWUSER}
fi
/usr/bin/cp /data/liuzuocheng/.profile /data/${NEWUSER}/.profile
/usr/bin/docker run -itd --name ${NEWUSER} --net=host -v /data/${NEWUSER}:/home/${NEWUSER} -v /data:/data rtrs/dev_cpp:centos7.4.1708
/usr/bin/docker exec -i ${NEWUSER} sh ${PATH}/mount_in_docker.sh ${NEWUSER} ${PORT} ${UID}

镜像内调用脚本

  • 添加用户
  • 启动sshd服务
NEWUSER=$1
PORT=$2
NEWUID=$3
echo "add User ${NEWUSER} sshd Port ${PORT}"
/usr/sbin/useradd -r -u ${NEWUID} -d /home/${NEWUSER} ${NEWUSER}
echo "${NEWUSER}:123456" | /usr/sbin/chpasswd
echo "Port ${PORT}" >> /etc/ssh/sshd_config
/usr/sbin/sshd

其他问题汇总

宿主机docker的目录迁移

docker的默认目录在/var/lib/docker 下面, 往往/var的磁盘比较小,建议把docker的目录改为大磁盘

把用户加入sudoer列表,sudo 执行命令无需密码

修改/etc/sudoers文件, 添加一行 user-name ALL=(ALL) NOPASSWD: ALL

更多问题可参考《Docker使用经验》

现实过程中使用docker搭建C/C++ 开发环境的工程意义

  • 新人入职代码培训,降低学习成本
  • 推动公司平台化建设

使用C语言实现的最简单的HTTP服务器

此段代码的特点

  • 功能简单,无论请求是什么,应答只返回<h1>Hello!</h1>
  • 代码量小,总计68行。
  • 可编译、可运行。

如何编译运行?

编译: gcc -o hello_server hello_server.c

运行: ./hello_server

请求: curl http://localhost:8888/any

源文件 hello_server.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>

#define PORT 8888
#define BUFFER_SIZE 4096
#define RESPONSE_HEADER "HTTP/1.1 200 OK\r\nConnection: close\r\nAccept-Ranges: bytes\r\nContent-Type: text/html\r\n\r\n"
#define RESPONSE_BODY "<h1>Hello!</h1>"

int handle(int conn){
    int len = 0;
    char buffer[BUFFER_SIZE];
    char *pos = buffer;
    bzero(buffer, BUFFER_SIZE);
    len = recv(conn, buffer, BUFFER_SIZE, 0);
    if (len <= 0 ) {
        printf ("recv error");
        return -1;
    } else {
        printf("Debug request:\n--------------\n%s\n\n",buffer);
    }

    send(conn, RESPONSE_HEADER RESPONSE_BODY, sizeof(RESPONSE_HEADER RESPONSE_BODY), 0);
    close(conn);//关闭连接
}

int main(int argc,char *argv[]){
    int port = PORT;
    struct sockaddr_in client_sockaddr;     
    struct sockaddr_in server_sockaddr;
    int listenfd = socket(AF_INET,SOCK_STREAM,0);
    int opt = 1; 
    int conn;
    socklen_t length = sizeof(struct sockaddr_in);
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(port);
    server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    if(bind(listenfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1){
        printf("bind error!\n");
        return -1;   
    }  

    if(listen(listenfd, 10) < 0) {
        printf("listen failed!\n");
        return -1;   
    }

    while(1){
        conn = accept(listenfd, (struct sockaddr*)&client_sockaddr, &length);
        if(conn < 0){
            printf("connect error!\n");
            continue;
        }
        if (handle(conn) < 0) {
            printf("connect error!\n");
            close(conn);
            continue;
        }  
    }
    return 0;
}

后记

为什么要写这篇博文?

原因是,在使用公司里的自动化平台部署c++服务时,拿这个简单的示例来测试平台是否有问题。俗称趟一遍坑儿。

在网上也搜索了很多不少博文,发现里面的代码有几个问题,第一个问题就是编译不过,第二个问题则是有的代码应答必须要有文件,这对我的测试也造成了些许麻烦。

所以就自己参考别人的列子,在自己的博客里写一个简单的吧。以后再去趟别的自动化部署系统的坑的时候,顺手就能拿来用。

使用Thrift的网络框架搭建一般性网络应用

Idea的提出

Thrift 存在的一些问题:

  • 相比于protobuf,Thrift的序列化和反序列化性能表现欠佳,大概比protobuf慢10倍。
  • 相比于其他RPC框架,Thrift拥有优秀的底层通信框架。(作者简单比较过thrift和grpc1.0的通信框架,grpc的设计实在太过简单。)

由此提出猜想和假设:

  • 将 Thrift 的底层通信框架抛离出Thrift框架,利用其来构建一般性的网络应用。
  • 组合 Thrift 的底层通信框架 和 protobuf序列化协议,使之成为一个新的RPC框架。

从实现难度和工作量上的考虑,本文尝试实现第一个假设,“将 Thrift 的底层通信框架抛离出Thrift框架,利用其来构建一般性的网络应用”。第二个假设希望日后,作者在时间和精力富余的时候再进行试验。

使用Thrift的网络框架搭建一般性网络应用的优点

  • 快速搭建网络应用,节省时间成本
  • 当Thrift协议序列化和反序列化成为系统性能瓶颈时,可对其进行替换,同时又能保留Thrift的网络框架,减少对上下游系统的影响

如何操作

有两种方法:

  • 在IDL文本中,将自定义协议的结构体存为一个thrift的string变量。
  • 构建自定义的Processor类

下面对这两种方法做详细介绍:

在IDL文本中,将自定义协议的结构体存为一个thrift的string变量

举例:

namespace cpp com.thrift.test

struct Parameter{
    1: required string bin_data;
}

service DemoService{
    i32 demoMethod(1:string param1, 2:Parameter param2);
}

将新的协议序列化后的数据放入bin_data中,这种方法缺点是,自己定义的协议,还要被thrift的序列化反序列协议包裹,不能完全消除thrift序列化和反序列化的代价。

第一种方法太过简单和粗糙,因此经过挖掘thrift代码后,探索出了更精细的方法。

构建自定义的Processor类

Thrift 底层通信模块的四大基类,TServer、TProcotol、TProcessor、TTransport,其中TProcessor::process是负责处理具体业务逻辑入口。

class TProcessor {
 public:
  virtual ~TProcessor() {}

  virtual bool process(boost::shared_ptr<protocol::TProtocol> in, 
                       boost::shared_ptr<protocol::TProtocol> out) = 0;

  bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io) {
    return process(io, io);
  }

 protected:
  TProcessor() {}
};

因此,只要自定义实现TProcessor的基类,重写process方法,就能自定义自己的网络应用。

下面是一个Hello world应用的简单实现:

首先实现一个HelloWorldProcessor 类。’

class HelloWordProcessor : public apache::thrift::TProcessor {
public:
  virtual bool process(boost::shared_ptr<apache::thrift::protocol::tprotocol> in, boost::shared_ptr</apache::thrift::protocol::tprotocol><apache::thrift::protocol::tprotocol> out) {
    out->writeBinary("Hello world!");
    out->getTransport()->flush();
    out->getTransport()->close();
    GlobalOutput.printf("send bytes %s", "Hello world!");
    return true;
  }
};

然后构建main函数,本实例使用TSimpleServer模型

using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::processor;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
int main(int argc, char **argv) {
boost::shared_ptr<tprotocolfactory> protocolFactory(new TBinaryProtocolFactory());
boost::shared_ptr<tprocessor> processor(new UwsgiProcessor());
boost::shared_ptr<tservertransport> serverTransport(new TServerSocket(9090));
boost::shared_ptr<ttransportfactory> transportFactory(new TBufferedTransportFactory());
TSimpleServer server(processor,
  serverTransport,
  transportFactory,
  protocolFactory);
printf("Starting the server...\n");
server.serve();
printf("done.\n");
return 0;
}

最后编译、链接和运行。

简单实现一个socket客户端,发送请求,就能得到HelloWord。

性能测试

待完善

Thrift 底层通信框架的优化和调优

待完善

本文小结

作者写本文,正是在工作中遇到了一些真实的问题,比如thrift序列化反序列化慢,thrift底层那么优秀的通信框架如何更高的加以利用呢?因此带着工作中的一些问题,开始阅读thrift的源代码。

除了本文中的一些实例,作者还做了一个小的代码库,里面就用到了本文中的方法,单独使用了thrift了网络框架,Github地址如下:https://github.com/zuocheng-liu/GI

STL容器 vector list 性能对比(附带测试方法和测试数据)

最近在重构公司的一个C++模块,逻辑里有排序、过滤等操作。开发过程中,遇到下面的一些问题:

问题和猜想

  • 单链表的归并排序线性表的快速排序的时间复杂度都是 O(nlog(n)),但在实际情况中哪个更快呢?需要测试一下。
  • 猜想的发散,stl vector 和 list(双向链表)的迭代器遍历、顺序插入、clear操作, 时间复杂度好像都是相同的,但谁更快呢?
  • 看stl代码去研究vector和list的实现,并不能得到我们最终想要的结果,测试数据会更直观。

测试总结

先写测试总结,测试方法、测试结果都在下面。

  • 迭代器遍历,list 比vector 稍块,使用for_each 比使用for更快。
  • 顺序插入, vector比list 约快3倍
  • clear操作,vector 几乎不耗时,list 要耗费好多时间,vector比list至少快1千倍以上
  • 排序, vector 大约比list 快2倍。

结合项目,对比vector和list的性能

  • 在我重构前,模块使用的是vector存储数据,排序使用的是快排(大于16小于堆),过滤则是用临时数据vector转存。
  • 在我重构后,使用list存储,排序使用list的归并排序,过滤则是直接使用list的erase操作。
  • 重构前,耗时4000微秒,重构后1500微秒,时间减少60%。
  • list除了删除操作和遍历操作,排序和插入都比vector慢,但是我使用list后,却提升了性能。原因可能有2个: 1.单个元素数据结构大,vector移动这些元素代价较大,而list移动这些元素反而代价很小;2.去掉了中间临时存储,数据的转存代价也比较大。
  • 使用list后,模块的可扩展性也变得更好了。

这次项目最大的收获就是,如果有过滤操作,优选使用list。vector的缺点如下,1.vector快速排序需要移动元素,如果元素占据空间大,移动代价也非常大。2.vector过滤需要借助中间临时存储,直接erase的代价更大。

测试方法

  • 随机生成10万数据量,由vector和list结构的变量分别存储
  • 对vector 和 list 的数据分别做如下, 1)迭代器遍历 2)顺序插入 3) clear操作 4)排序
  • 记录每种操作消耗的时间
  • 多次测试,记录典型的数据结果

测试结果

直接列出测试结果,单位是微秒

vector for_each : 3263
list for_each : 2556
vector traverse : 4783
vector num traverse : 666
list traverse : 3078
vector push_back : 8136
list push_back : 24581
list delete : 7428
vector push_back : 7329
list push_back : 22968
vector clear : 0
list clear : 13079
vector sort : 89512
list sort : 146529

(附)测试程序

Github 托管地址,保持更新:https://github.com/zuocheng-liu/code-samples/blob/master/linux-c/stl/vector_list_performance_test.cpp

#include<stdio.h>
#include<stdlib.h>
#include<sys/time.h>
#include <list>
#include <vector>
#include <iostream>

#define MAX_NUM 100000

using namespace std;

timeval tv;
uint64_t timer;


bool NumCmp(uint32_t a, uint32_t b) { return (a > b); }

void startTimer() {
    gettimeofday(&tv,NULL);
    timer = 1000000 * tv.tv_sec + tv.tv_usec;
}

uint64_t stopTimer() {
    gettimeofday(&tv,NULL);
    timer = 1000000 * tv.tv_sec + tv.tv_usec - timer;
    return timer;
}

int main() {
    vector<uint32_t> v;
    vector<uint32_t> v2;
    list<uint32_t> l;
    list<uint32_t> l2;
    for (int i = 0; i< MAX_NUM; ++i) {
        srand((int)time(0)  * i * i + 1);
        int num = rand();
        v.push_back(num);
        l.push_back(num);
    }

    // compare oper traverse
    startTimer();   
    for (vector<uint32_t>::iterator iter = v.begin(); iter != v.end(); ++ iter) {
        *iter;
    }
    cout<<"vector\t traverse\t :\t"<< stopTimer() << endl;  

    startTimer();   
    for (int i = 0 ; i < MAX_NUM; ++ i) {
        //v[i];
    }
    cout<<"vector\t num traverse\t :\t"<< stopTimer() << endl;  


    startTimer();   
    for (list<uint32_t>::iterator iter = l.begin(); iter != l.end(); ++ iter) {
    }
    cout<<"list\t traverse\t :\t"<< stopTimer() << endl;  


    // compare oper push_back   
    startTimer();   
    for (vector<uint32_t>::iterator iter = v.begin(); iter != v.end(); ++ iter) {
        v2.push_back(*iter);
    }
    cout<<"vector\t push_back\t :\t"<< stopTimer() << endl;  

    startTimer();   
    for (list<uint32_t>::iterator iter = l.begin(); iter != l.end(); ++ iter) {
        l2.push_back(*iter);
    }
    cout<<"list\t push_back\t :\t"<< stopTimer() << endl;  

    // compare oper delete
    startTimer();
    v2.clear();
    for (vector<uint32_t>::iterator iter = v2.begin(); iter != v2.end(); ++ iter) {
    //    v2.erase(iter);
    }
    //cout<<"vector\t delete\t :\t"<< stopTimer() << endl;  

    startTimer();   
    for (list<uint32_t>::iterator iter = l2.begin(); iter != l2.end(); ++ iter) {
        iter = l2.erase(iter);
    }
    cout<<"list\t delete\t :\t"<< stopTimer() << endl;  

    // compare oper push_back   
    startTimer();   
    for (vector<uint32_t>::iterator iter = v.begin(); iter != v.end(); ++ iter) {
        v2.push_back(*iter);
    }
    cout<<"vector\t push_back\t :\t"<< stopTimer() << endl;  

    startTimer();   
    for (list<uint32_t>::iterator iter = l.begin(); iter != l.end(); ++ iter) {
        l2.push_back(*iter);
    }
    cout<<"list\t push_back\t :\t"<< stopTimer() << endl;  


    // compare oper clear
    startTimer();   
    v2.clear();
    cout<<"vector\t clear      \t:\t"<< stopTimer() << endl;  

    startTimer();   
    l2.clear();
    cout<<"list\t clear     \t :\t"<< stopTimer() << endl;  

    // compare oper sort
    startTimer();   
    std::sort(v.begin(), v.end(), NumCmp);
    cout<<"vector\t sort    \t :\t"<< stopTimer() << endl;  

    startTimer();   
    l.sort(NumCmp);
    cout<<"list\t sort    \t :\t"<< stopTimer() << endl;  

    return 0;
}

C/C++ 中如何写“空语句”

最近我的同事和一些网友都说C/C++中“空语句”(就是单独一个分号的语句)具有延时的作用,可以用来写延时代码。其实这是一种错误的理解。

首先,有人认为空语句经编译后,生成汇编代码是“NOP”指令,NOP指令是空操作指令,执行一个指令周期时间,所以认为C/C++中的“空语句”还有延时的功能,其实这是错误的,“空语句”是不会生成任何有效的指令代码的,是不具有延时做用的。

有人说如下代码是具有延时做用,实际上下边的延时功能主要是加法运算和条件判断运算指令起到了延时的作用。

define DELAY asm(“nop”);

Google C++ 代码风格示例

They say a good example is worth 100 pages of API documentation, a million directives, or a thousand words.

最近项目中决定使用Google的C++代码规范,然后写了两个代码示例文件(.h 和 .cc),这两个文件中的代码都是用Google的C++代码规范。

写示例代码的好处

  • 减少查看文档时间,降低学习成本
  • 帮助程序员快速了解Google的代码风格的概貌
  • 用于参考和模仿,帮助快速上手

代码托管地址

下面地址保持持续更新:

https://github.com/zuocheng-liu/code-samples/tree/master/code_style

仓促之作,错误和疏忽不可避免,但保证后续不断更新。

初版代码示例

建议点击上面的链接,获取最新版。编辑器的原因,缩进的显示可能有问题。

// Copyright (c) 2016, Zuocheng.net
//
// Author: Zuocheng Liu
//
// Lisence: GPL
//
// File: code_samples/code_style/google_code_style.h 
// (文件名要全部小写,可以包含下划线(_)或短线(-))
// C++文件以.cc 结尾,头文件以.h 结尾。
//
// Google C++ code Style 代码示例 (简体中文版)
//
// *注释  使用//或/* */,但要统一。
// *注释  每一个文件版权许可及作者信息后,对文件内容进行注释说明
// -------------------------------------------------------------最多不能超过80行
#ifndef CODE_SAMPLES_CODE_STYLE_GOOGLE_CODE_STYLE_
#define CODE_SAMPLES_CODE_STYLE_GOOGLE_CODE_STYLE_
// 命名空间的名称是全小写的,其命名基于项目名称和目录结构
// 不要声明命名空间std 下的任何内容
namespace code_samples {
namespace code_style {

// **变量命名**
// 尽可能给出描述性名称,不要节约空间
// 变量名一律小写,单词间以下划线相连
// 少用全用变量,可以以g_与局部变量区分
int g_my_exciting_global_variable;

// 只有在描述数据时用struct ,其他情况都用class
typedef struct CodeStyle {
    uint32_t type;
} CodeStyle;

// **类注释** 
// 每个类的定义要附着描述类的功能和用法的注释
//
// GoogleCodeStyle , 通过代码示例,展现google c++ code style
//
// (按需注明synchronization assumptions,是否线程安全)
class GoogleCodeStyle {
public :

  // ** 声明次序 **
  // - typedefs 和enums;
  // - 常量;
  // - 构造函数;
  // - 析构函数;
  // - 成员函数,含静态成员函数;
  // - 数据成员,含静态数据成员。

  // **类型命名**
  // 每个单词以大写字母开头,不包含下划线
  // 所有类型命名——类、结构体、类型定义(typedef)、枚举——使用相同约定
  typedef enum StyleType {
      // 枚举值应全部大写,单词间以下划线相连
      GOOGLE_CODE_STYLE = 0,
      K_AND_R,
      POCO
  } StyleType;

  // 常量命名,在名称前加k
  const int kDaysInAWeek = 7;   

  GoogleCodeStyle() {}
  ~GoogleCodeStyle() {}
  // **函数注释**
  // 函数声明处注释描述函数功能,定义处描述函数实现.
  // - inputs(输入)及outputs(输出);
  // - 对类成员函数而言:函数调用期间对象是否需要保持引用参数,是否会释放这些参数;
  // - 如果函数分配了空间,需要由调用者释放;
  // - 参数是否可以为NULL;
  // - 是否存在函数使用的性能隐忧(performance implications);
  // - 如果函数是可重入的(re-entrant),其同步前提(synchronization assumptions)
  // 举例 :
  // 初始化函数
  ~Init() {}

  // 普通函数名以大写字母开头,每个单词首字母大写,没有下划线
  // 
  uint32_t MyExcitingMethod(CodeStype &code_style, char *output);
  // 存取函数要与存取的变量名匹配
  inline int num_entries() const { return num_entries_; }
  inline void set_num_entries(int num_entries) { num_entries_ = num_entries; }

  // 类的成员变量以下划线结尾
  int num_completed_connections_;

protected :

private :
  // **类数据成员**
  // 每个类数据成员(也叫实例变量或成员变量)应注释说明用途。
  // 如果变量可以接受NULL 或-1, 等警戒值(sentinel values),须说明之
  int num_entries_;

}; // class GoogleCodeStyle

}} // namespace code_samples::code_style 

#endif //CODE_SAMPLES_CODE_STYLE_GOOGLE_CODE_STYLE

线程安全的单例模式-以C++代码为例

本文描述3种场景下的单例模式:

  • 进程体内无线程的单例模式
  • 进程体内多线程单例模式
  • 在单个线程体中的单例模式

本文所写单例模式代码都使用懒汉模式。

进程体内单例

注意问题:

  • 如果进程体中运行多个线程,则需要考虑多线程同步访问的问题。
  • 如果进程体重没有运行多个线程,则不需要考虑多线程同步访问。
  • 使用线程同步锁保证多进程同步

使用场景举例 :

  • 日志类、文件读写类
  • 资源管理类

代码示例:

进程体内没有运行多线程的单例模式,无需考虑线程同步与互斥

class Singleton {
  public:
    static Singleton* getInstance() {
        if (NULL == instance) {
          instance = new SingletonInside();
        }
        return instance;
    }
  private:
    SingletonInside(){}
    ~SingletonInside() {}
    static Singleton* instance;
};

Singleton::instance = NULL;    

进程体内运行多线程单例模式,使用系统mutex保证线程安全

class Singleton {
  public:
    static Singleton* getInstance() {
        pthread_once(&g_once_control, InitOnce);
        pthread_mutex_lock(&mutex);  // lock
        if (NULL == instance) {
          instance = new SingletonInside();
        }
        pthread_mutex_unlock(&mutex); // unlock
        return instance;
    }
  private:
    SingletonInside() {

    }
    ~SingletonInside() {
       pthread_mutex_destroy(&mutex);   // destroy lock
    }
    static void InitOnce(void) {
      pthread_mutex_init(&mutex,NULL);  // init lock
    }
    Singleton* instance;
    static pthread_once_t g_once_control;
    static pthread_mutex_t mutex;
};
Singleton::instance = NULL;
pthread_once_t Singleton::g_once_control = PTHREAD_ONCE_INIT;

单个线程体中的单例

某些资源在单个线程体内需要保持单例,即每个线程体内都保持唯一。每个线程体内,对象相互隔离,则无需考虑线程安全问题。

此种单例模式的实例需要调用系统提供的 TLS 接口,放于线程局部存储中。

使用场景举例:

  • 多路复用Socket封装类
  • 网络上下文环境类
  • 线程安全的资源

代码示例

class Singleton {
  public:
    static Singleton* getInstance() {
       pthread_once(&g_once_control, InitOnce);
       Singleton* instance = (Singleton*)pthread_getspecific(g_thread_data_key);

        if (NULL == instance) {
          instance = new SingletonInside();
          pthread_setspecific(g_thread_data_key, (void*)Singleton)
        }
        return instance;
    }
  private:
    SingletonInside() {}
    ~SingletonInside() {
       pthread_key_delete(g_thread_data_key);
    }
    static void InitOnce(void) {
      pthread_key_create(&g_thread_data_key, NULL);
    }

    static pthread_once_t g_once_control;
    static pthread_key_t g_thread_data_key;
};

pthread_once_t Singleton::g_once_control = PTHREAD_ONCE_INIT;
pthread_key_t Singleton::g_thread_data_key;

如果使用 Poco库 TreadLocal ,代码还会简洁很多,性能上也会好很多

    class Singleton {
      public:
        static Singleton* getInstance() {
            if (NULL == instance) {
              instance = new SingletonInside();
            }
            return instance;
        }
      private:
        SingletonInside() {}
        ~SingletonInside() {
           delete instance;
        }
        static Poco::ThreadLocal<Singleton> instance;
    };
Poco::ThreadLocal<singleton> Singleton::instance = NULL;

总结

  • 无论程序是多进程运行还是多线程运行,代码都要尽量兼容线程安全

多线程编程的陷阱

依据《Java 并 发编程实践》/《Java Concurrency in Practice》一书,一个线程安全的 class 应当满足三个条件:

  • 从多个线程访问时,其表现出正确的行为
  • 无论操作系统如何调度这些线程,无论这些线程的执行顺序 如何交织
  • 调用端代码无需额外的同步或其他协调动作

不安全的多线程编程容易造成哪些后果

  • 内存泄露
  • 资源、数据访问冲突、数据不一致
  • 进程空间用尽
  • 计算资源耗尽

* 错误处理不当导致整个进程异常结束

数据访问

  • 冲突
  • 不一致

多线程间共享的标志变量, 可能被编译器优化,存储于cpu寄存器,一个线程将其值改变(内存值),在另一个线程的cpu寄存器对应的值却未同步。

解决方法: 1. 添加 volatile 关键字修饰变量。 并且加互斥锁。 2.不使用共享的标志变量。

内存空间使用的安全性

  • 防止内存泄露,小心使用堆空间

  • 小心使用栈空间

逻辑的安全性

  • 参数检查一定要严格,任何一个不合法的参数能让线程异常,然后整个app异常

  • 容错

内存泄露原因

软件的稳定性

  • 如果一个线程体挂掉

  • 悬空指针

释放时设置为NULL 声明定义时设置为NULL

线程空间限制

  • 所有线程受限于进程的空间
  • 系统会限制线程的栈空间(默认是10M)

在大规模软件设计中,线程数量的设定和栈空间的使用必须要小心。

高效使用栈空间技巧

  • 如果使用大的内存空间,则到堆里去申请
  • 不使用递归算法,而使用循环,也要避免使用递归算法的库
  • 更加细致地拆分逻辑或模块,让函数体量更小
  • 复杂数据类型(类或者结构体)做为函数参数时,尽可能传递它们的指针或引用,以节省栈空间

多线程下的设计模式

资源竞争与互斥

  • 单例模式

循环中不使用wait

多线程下的全局变量、资源

线程安全

语言特性 valatile

内存泄露

和多进程编程相同的陷阱

运维的安全性

如果程序在线程在启动时固定,则在可动态扩展的容器中,可能会有因为线程数过多而不能充分使用cpu资源。在性能压力测试时,常常会引起在cpu资源使用率较低时,系统则已经到达瓶颈。

spawn-fcgi 源码分析

梗概

本文内容对Spawn-fcgi源码进行解读,简要说明其原理,并具体说明其实现方式。

Spawn-fcgi 源码虽然只有600多行,但是初次阅读起来依然需要花很多时间。为了节省读者的学习成本,提高学习Spawn-fcgi 的效果,作者对Spawn-fcgi的源码做了裁剪,保留最核心的功能和原有的代码结构,且能编译后正常运行。最后代码只有200多行。

源码地址在这里

必备知识

要阅读Spawn-fcgi,读者至少需要掌握以下几个方面的知识或技能:

  • 了解CGI和FastCgi的概念,了解其使用场景
  • 基础的Linux C 环境编程,会使用常见的库函数比如getopt、exec 等
  • 基础的Linux C 的多进程编程,熟悉fork、waitpid、setsid等函数
  • 基础的Linux C 网络编程,熟悉建立tcp连接、select非阻塞方式通信,多路复用I/O等

裁剪后Spawn-fcgi的执行过程

  • 创建服务器socket
  • fork进程,子进程初始化会有两个主要操作:
  1. 把socket的文件描述符,复制到FCGI_LISTENSOCK_FILENO
  2. 会执行execl 函数,运行cgi程序,并让cgi程序拥有子进程的上下文环境

    运行cgi程序后,使用FCGI_LISTENSOCK_FILENO这个描述符,来与webserver进行通信。

如何编译

gcc -o spawn-fcgi spawn-fcgi.c

一行命令即可

如何调用

./spawn-fcgi -f cgi -p 9001 -F 256

裁剪后也仅支持接收这三个参数

作者对于Spawn-fcgi的思考

  • fastcgi 协议规定,fcgi管理器中把网络描述符定为FCGI_LISTENSOCK_FILENO,为了一致CGI程序中复用FCGI_LISTENSOCK_FILENO的套接字,总感觉不是很完美。
  • Spawn-fcgi太简单,不需要单独做一个软件,完全可以集成到cgi程序中
  • Spawn-fcgi使用的是多进程,如果集成到cgi程序中,可以自由选择多进程模型、多线程模型
  • 缺少进程守护监控,spawn-fcgi如果一个进程挂掉,不会被重启。
  • Spawn-fcgi 网络多路复用调用的是select,但现在最常用的是epoll

Spawn-fcgi 补充知识

与原版相比,裁剪后Spawn-fcgi的失去了哪些功能

  • 使用linux套接字文件建立tcp连接功能
  • 对IPv6的支持
  • 对root、group 用户的检查
  • 对windows、Solaris等编译环境的支持
  • 通过进程pid文件获取cgi进程
  • 去除对autoconf等编译工具的依赖

C函数和C++函数相互调用

实例解说

这个例子,示例了两点

  • C 如何调用C++对象里的函数
  • C++ 如何调用C的函数

共两个文件,test.c 和 main.cpp,代码解释如下:

在main.cpp (C++ 代码) 里定义了一个类MyMath,类里有个成员函数sum ;如何让C能调用这个c++的函数MyMath::sum呢?

即在main.cpp 中添加extern C后,声明定义一个C的函数call_MyMath_sum。在test.c 中先声明这个函数,然后通过调用call_MyMath_sum,达到调用C++ MyMath::sum的作用。

在test.c 中,定义了一个sum 的函数。如何让C++能调用这个c的函数sum呢? 这么做的,在main.cpp中 extend C 后声明它,然后在main函数中直接调用就可以了。

代码有点绕,C和C++调来调去的,不过仔细看就容易明白。

起关键作用的就是 extent C 这个关键语句,它的作用是告诉C++编译器,把后面的语句当作C语言进行处理。

代码如下

C语言中的函数,其中调用了C++中的call_MyMath_sum:

test.c

int call_MyMath_sum (int, int); // 此函数定义在main.cpp中
int sum(int a , int b) {
    return call_MyMath_sum(a,b);
}

C++语言中的函数:

main.cpp

# include <iostream>
using namespace std;
extern C { 
    int sum(int , int);  // 声明sum函数,已经在test.c 中定义过
} 

class MyMath { 
  public : 
    static int sum(int , int); 
};

int MyMath::sum(int a, int b) { 
    return (a + b); 
} 

extern C int call_MyMath_sum (int a , int b) {  // 定义call_MyMath_sum , 使其可以被c的代码调用
    return (MyMath::sum(a,b)); 
} 
int main(void) { 
    cout< <sum(5,6); return 0;  // 此sum是 在test.c中定义的
} 

如何编译:

# Makefile 
main.o:
    g++ -c -o main.o main.cpp  # 注意是g++
test.o:
    gcc -c -o test.o test.c  # 注意是gcc
main: main.o test.o
    g++ -o main main.o test.o # 最后链接用的是g++
all: main
clean:
    rm -f test.o main.o

执行 make 得到可执行文件main