C++无锁编程-单链表

无锁编程单链表的实现,主要特性如下:

  • 只能从尾部追加元素
  • 不支持删除元素
  • 支持Iterator遍历元素
  • 提供了多线程情况下的测试用例

适用的场景如下:

  • 搜索引擎,实时索引中的倒排链表
  • 只添加元素,不删除元素的多线程场景

代码如下:

#include 
#include 
#include 
#include 
template
class LockFreeSignleLinkedList {
public:
    struct Node {
        union Data {
            DataType *ptr;
            uint64_t value;
        } data;
        std::atomic next = nullptr;
    };

    class Iterator {
    public:
        Iterator operator ++(int) {
            Iterator it;
            it.ptr = ptr->next;
            return it;
        }

        Iterator& operator ++() {
            ptr = ptr->next;
            return *this;
        }

        bool operator ==(Iterator& iter) {
            return (ptr == iter.ptr);
        }

        Iterator& operator =(const Iterator& iter) {
            ptr = iter->ptr;
            return *this;
        }

        DataType& operator *() {
            if (std::is_class::value) {
               return *(ptr->data.ptr);
            } 
            uint64_t* data_ptr = &(ptr->data.value);
            DataType* ptr = reinterpret_cast(data_ptr);
            return *ptr;
        }

        friend class LockFreeSignleLinkedList;
    private:
       Node* ptr = nullptr;
    };

    LockFreeSignleLinkedList() {
        head = new Node;
        tail = head;
    }

    void push_back(const DataType& data) {
        push_back(&data);
    }

    void push_back(const DataType* ptr) {
        Node *tmp = new Node;
        if (std::is_class::value) {
            tmp->data.ptr = const_cast(ptr);
        } else {
            DataType* tmp_ptr = reinterpret_cast(&(tmp->data.value));
            *tmp_ptr = *ptr;
        }
        for(;;) {
            ajustTail();
            Node *real_tail = tail.load(std::memory_order_relaxed);
            auto& next = real_tail->next;
            Node *real_next = next.load(std::memory_order_relaxed);
            if (nullptr != real_next) {
                continue;
            }
            if (!next.compare_exchange_weak(real_next, tmp, std::memory_order_relaxed)) {
                continue;
            } else {
                break;
            }
        }
        cnt.fetch_add(1, std::memory_order_relaxed);
        ajustTail();
    }

    Iterator begin() {
        Iterator it;
        it.ptr = head->next;
        return it;
    }

    Iterator end() {
        static Iterator blank_it;
        return blank_it;
    }

    size_t size() { return cnt.load(std::memory_order_relaxed); }

private:
    void ajustTail() {
        for(;;) {
            Node *real_tail = tail.load(std::memory_order_relaxed);
            auto& next = real_tail->next;
            Node *real_next = next.load(std::memory_order_relaxed);
            if (nullptr == real_next) {
                return;
            }
            if (!tail.compare_exchange_weak(real_tail, real_next, std::memory_order_relaxed)) {
                continue;
            }
        } // end for
    }

    Node* head = nullptr;
    std::atomic tail = nullptr;
    std::atomic cnt = 0;
};

int main()
{
    LockFreeSignleLinkedList l;
    l.push_back(9);
    l.push_back(2);
    l.push_back(4);
    l.push_back(5);
    std::cout << "list:" << std::endl;
    for (const int32_t n : l) {
        std::cout << n << std::endl;
    }
    std::cout << "retry to list:" << std::endl;
    for (auto it = l.begin(); it != l.end(); ++it ) {
        std::cout << *it << std::endl;
    }

    LockFreeSignleLinkedList l2;
    volatile bool run = false;
    auto fun = [&]() {
        for (;;) {
            if (!run) {
                continue;
            } else {
                break;
            }
        }
        std::cout<< "start pid:" << std::this_thread::get_id() << std::endl;
        for (int32_t i = 0; i < 900000; ++i) {
            l2.push_back(i);
        }    
        std::cout<< "end pid:" << std::this_thread::get_id() << std::endl;
    };

    std::thread t1([&](){fun();});
    std::thread t2([&](){fun();});
    std::thread t3([&](){fun();});
    std::thread t4([&](){fun();});
    std::atomic_thread_fence(std::memory_order_seq_cst);
    run = true;
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    size_t s1 = 0;
    for (auto it = l2.begin(); it != l2.end(); ++it ) {
       ++s1;
    }
    std::cout << "s1:" << s1 << std::endl;
    std::cout << "s2:" << l2.size() << std::endl;

    return 0;
}

Linux下关于openssh和putty密钥的配置

openssh-key

生成密钥

ssh-keygen -t rsa -b 4096 -C "it@zuocheng.net"

密钥生效

$eval ssh-agent -s #注意这里是反引号
$ssh-add

把openssh-key 转为 puttyKey

使用puttygen.exe转换,load openssh-key,然后导出
可能会遇到"Putty key format too new"的问题,则在puttygen的选项卡 key->parameter for saving key files 中,把 "ppk file version" 改为V2

其他

Linux设置密钥登陆

C++软件架构选型

基础服务 (开源)

项目 推荐☆☆☆☆☆ 推荐☆☆☆☆ 推荐☆☆☆
虚拟化 podman docker
虚拟机集群 k8s
分布式存储 ceph

中间件

项目 推荐☆☆☆☆☆ 推荐☆☆☆☆ 推荐☆☆☆
名字服务/服务发现 consul
分布式协调一致性 etcd zookeeper
消息队列 kafka rabitmq
监控 prometheus open-falcon Zabbix
监控展示 grafana
日志存储 druid elastic-search Opentsdb
分布式任务调度 Elastic-Job
kv缓存服务 redis
kv数据库 leveldb mongodb
向量检索引擎 faiss milvus proxima

组件

项目 推荐☆☆☆☆☆ 推荐☆☆☆☆ 推荐☆☆☆
RPC框架 BPRC thrift
IDL protobuffer thrift json
基础库 boost poco
log glog Spdlog
malloc jemalloc tcmalloc
json fastjson jsoncpp
xml tinyxml2
yaml yaml-cpp
kv缓存 rockdb

murmur-hash\Md5\crc32等算法对用户信息散列均匀程度测试

问题的提出

  • 一致性hash,散列并不均匀
  • 集群中机器性能并不均衡。因素众多,比如型号不同、硬件不同;同型号下,机器性能表现也不同。
  • 当使用一致性hash作为负载均衡策略时,会让机器负载更加不均衡。

因此需要对一致性hash作为负载均衡策略时表现的的特性进行详细探索。最主要观察的指标则是散列均匀程度。

提前给出实验结果

  • murmur hash的散列均匀程度远优于crc32,和MD5相当。但murmur hash在计算消耗上低于MD5
  • 使用murmur hash,在100台物理节点,5000虚拟节点倍数的情况下,可以做到散列均匀和性能的平衡

brpc中如何实现一致性hash负载均衡

// code file: brpc/src/brpc/policy/consistent_hashing_load_balancer.cpp
int ConsistentHashingLoadBalancer::SelectServer(
    const SelectIn &in, SelectOut *out) {
    if (!in.has_request_code) {
        LOG(ERROR) << "Controller.set_request_code() is required";
        return EINVAL;
    }
    if (in.request_code > UINT_MAX) {
        LOG(ERROR) << "request_code must be 32-bit currently";
        return EINVAL;
    }
    butil::DoublyBufferedData >::ScopedPtr s;
    if (_db_hash_ring.Read(&s) != 0) {
        return ENOMEM;
    }
    if (s->empty()) {
        return ENODATA;
    }
    std::vector::const_iterator choice =
        std::lower_bound(s->begin(), s->end(), (uint32_t)in.request_code);
    if (choice == s->end()) {
        choice = s->begin();
    }
    for (size_t i = 0; i < s->size(); ++i) {
        if (((i + 1) == s->size() // always take last chance
             || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
            && Socket::Address(choice->server_sock.id, out->ptr) == 0
            && (*out->ptr)->IsAvailable()) {
            return 0;
        } else {
            if (++choice == s->end()) {
                choice = s->begin();
            }
        }
    }
    return EHOSTDOWN;
}
bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
    std::vector add_nodes;
    add_nodes.reserve(_num_replicas);
    if (!GetReplicaPolicy(_type)->Build(server, _num_replicas, &add_nodes)) {
        return false;
    }
    std::sort(add_nodes.begin(), add_nodes.end());
    bool executed = false;
    const size_t ret = _db_hash_ring.ModifyWithForeground(
                        AddBatch, add_nodes, &executed);
    CHECK(ret == 0 || ret == _num_replicas) << ret;
    return ret != 0;
}

由代码看出hash环中的每台服务器会变为100个虚拟节点,假设有100台机器,则在hash环分布上会有10000个节点

不同散列算法对比

hash/曼哈顿距离/虚拟节点数 100*100 1000*100 3000*100 5000*100 10000*1000
murmur 30% 20% 10% 5% 5%
MD5 30% 20% 10% 5% 5%
crc32 150% 100% 50% 30% 30%

结论,murmur hash的散列均匀程度优于crc32、MD5等

MurMurHash算法分析

测试murmurhash 散列均匀程度的代码

int main () {
    int32_t rep = 100;
    int32_t base = 100;
    std::vector  nodes;
    nodes.resize(base);
    std::vector imeis;

    std::map node_info;
    std::vector rings;

    // init cs rings
    for (auto& node : nodes) {
        node.host = std::to_string(rand());
        for (int32_t i = 0; i < rep; ++i) {
            std::string host = node.host +  "-" + std::to_string(i);
            uint32_t hash = MurmurHash32(host.c_str(), host.size());
            node.hash = hash;
            node_info[hash] = &node;
            rings.push_back(hash);
        }
    }
    std::sort(rings.begin(), rings.end());
    // read imeis
    {
        std::ifstream if_stream("./imei_sample.txt");
        std::string imei;
        while(if_stream >> imei) {
            imeis.emplace_back(imei);
        }
        if_stream.close();
        cout << "imei size:" << imeis.size()
            << endl;
    }
    // search
    for (auto& imei : imeis) {
        uint32_t hash = MurmurHash32(imei.c_str(), imei.size());
        auto iter = std::lower_bound(rings.begin(), rings.end(), hash);
        if (iter == rings.end()) {
            iter = rings.begin();
        }
        node_info[*iter]->count += 1;
    }

    // print
    std::sort(nodes.begin(), nodes.end(), [](const Node& a, Node& b)->bool { return( a.count < b.count); });
    for (auto& node : nodes) {
        cout << "host:" << node.host
            << "\thash:" << node.hash
            << "\tcount:" << node.count
            << "\trate:" << (double)node.count/(double)imeis.size()
            << endl;
    }

    auto max = nodes.rbegin()->count;
    auto avg = imeis.size()/nodes.size();
    auto min = nodes.begin()->count;
    cout << "max:" << nodes.rbegin()->count << endl;
    cout << "avg:" << imeis.size()/nodes.size() << endl;
    cout << "min:" << nodes.begin()->count << endl;
    cout << "up:" << (double)(max-avg)/(double)avg << endl;
    cout << "down:" << (double)(avg-min)/(double)avg << endl;
    return 0;

测试结果

nodes_num chash_num_replicas 单核cpu使用率% 内存使用率% 100万次查询/s 波动率 hash ring时间 us hash ring sort时间 us
100 100 3 0.3 1.56 0.403038 14014 3564
100 1000 3 0.4 1.89783 0.0884258 156900 42973
100 5000 4.7 0.5 2.49725 0.0536913 880742 244460
100 10000 5 0.6 2.60658 0.0416814 1900755 519096
100 50000 10 1 3.24731 0.024373 11680936 2852591
100 100000 15 3 3.52997 0.0270811 25035142 5932167
100 500000 100 11 4.62568 0.0300247 148615862 32576504

结论

总量 235W imei
波动范围:-21~35%
+0.352182
-0.210649

./murmur_hash
imei size:2362768
host:278722862  hash:1925471051 count:18650 rate:0.00789328
host:783368690  hash:4002282954 count:19363 rate:0.00819505
host:1548233367 hash:3058897622 count:19635 rate:0.00831017
host:491705403  hash:611553964  count:19699 rate:0.00833726
host:1984210012 hash:1268219422 count:19909 rate:0.00842613
host:1303455736 hash:2326844163 count:20035 rate:0.00847946
host:855636226  hash:3637853163 count:20067 rate:0.008493
host:1059961393 hash:2408710735 count:20074 rate:0.00849597
host:1957747793 hash:1288415059 count:20084 rate:0.0085002
host:1477171087 hash:1988358724 count:20170 rate:0.0085366
host:1540383426 hash:411668640  count:20173 rate:0.00853787
host:610515434  hash:663691886  count:20526 rate:0.00868727
host:1973594324 hash:34305553   count:20670 rate:0.00874821
host:760313750  hash:3679971615 count:20685 rate:0.00875456
host:137806862  hash:1969944721 count:20771 rate:0.00879096
host:294702567  hash:2546435079 count:20788 rate:0.00879816
host:468703135  hash:646134042  count:20832 rate:0.00881678
host:943947739  hash:90910835   count:21160 rate:0.0089556
host:709393584  hash:574850828  count:21243 rate:0.00899073
host:1681692777 hash:3333659905 count:21308 rate:0.00901824
host:1102520059 hash:1769249379 count:21493 rate:0.00909653
host:1632621729 hash:1495248802 count:21503 rate:0.00910077
host:233665123  hash:1873607595 count:21784 rate:0.00921969
host:859484421  hash:3330505905 count:21797 rate:0.0092252
host:424238335  hash:4012193417 count:22014 rate:0.00931704
host:184803526  hash:2094014013 count:22097 rate:0.00935217
host:1911759956 hash:824332016  count:22155 rate:0.00937671
host:412776091  hash:2345249892 count:22194 rate:0.00939322
host:1125898167 hash:3699427959 count:22223 rate:0.00940549
host:135497281  hash:3267716888 count:22224 rate:0.00940592
host:572660336  hash:2471704921 count:22463 rate:0.00950707
host:596516649  hash:3577994617 count:22477 rate:0.00951299
host:719885386  hash:1271161395 count:22746 rate:0.00962684
host:1843993368 hash:209167012  count:22748 rate:0.00962769
host:1131176229 hash:253515204  count:22804 rate:0.00965139
host:805750846  hash:3215568319 count:22873 rate:0.00968059
host:35005211   hash:2142699485 count:22887 rate:0.00968652
host:1649760492 hash:1041784901 count:22967 rate:0.00972038
host:1967513926 hash:1618568325 count:22975 rate:0.00972376
host:2038664370 hash:3103071240 count:23099 rate:0.00977625
host:2044897763 hash:1674256958 count:23241 rate:0.00983634
host:1656478042 hash:1438521939 count:23273 rate:0.00984989
host:511702305  hash:3873703906 count:23349 rate:0.00988205
host:752392754  hash:3262287234 count:23352 rate:0.00988332
host:304089172  hash:1732284559 count:23392 rate:0.00990025
host:84353895   hash:2348091605 count:23450 rate:0.0099248
host:1469348094 hash:646865057  count:23522 rate:0.00995527
host:1726956429 hash:4181074938 count:23539 rate:0.00996247
host:945117276  hash:4238293984 count:23567 rate:0.00997432
host:1141616124 hash:1571084204 count:23687 rate:0.0100251
host:1937477084 hash:4082607906 count:23749 rate:0.0100513
host:1474612399 hash:3177345892 count:23788 rate:0.0100679
host:1129566413 hash:1470592129 count:23800 rate:0.0100729
host:1734575198 hash:2699928391 count:23809 rate:0.0100767
host:628175011  hash:2004400594 count:23829 rate:0.0100852
host:356426808  hash:1763444061 count:23858 rate:0.0100975
host:1424268980 hash:4064778616 count:23945 rate:0.0101343
host:1100661313 hash:277960042  count:23951 rate:0.0101368
host:336465782  hash:2967368358 count:24001 rate:0.010158
host:861021530  hash:204708028  count:24044 rate:0.0101762
host:1264095060 hash:2871390615 count:24102 rate:0.0102007
host:1780695788 hash:3482626452 count:24127 rate:0.0102113
host:635723058  hash:1724005082 count:24184 rate:0.0102355
host:42999170   hash:3780009203 count:24188 rate:0.0102371
host:608413784  hash:1746093087 count:24209 rate:0.010246
host:1315634022 hash:2863942689 count:24290 rate:0.0102803
host:846930886  hash:3078961593 count:24352 rate:0.0103066
host:1914544919 hash:3276410919 count:24523 rate:0.0103789
host:1801979802 hash:4146573372 count:24672 rate:0.010442
host:1365180540 hash:625462217  count:24727 rate:0.0104653
host:939819582  hash:2372438818 count:24831 rate:0.0105093
host:982906996  hash:247356574  count:24930 rate:0.0105512
host:521595368  hash:3895493184 count:24982 rate:0.0105732
host:2145174067 hash:1076998566 count:25050 rate:0.010602
host:2084420925 hash:1383371566 count:25103 rate:0.0106244
host:1433925857 hash:4101903831 count:25117 rate:0.0106303
host:1956297539 hash:1715097599 count:25379 rate:0.0107412
host:1374344043 hash:649457066  count:25383 rate:0.0107429
host:1101513929 hash:1467247780 count:25506 rate:0.010795
host:756898537  hash:1686538984 count:25587 rate:0.0108292
host:1189641421 hash:176009951  count:25675 rate:0.0108665
host:1159126505 hash:1773799926 count:25939 rate:0.0109782
host:1827336327 hash:1008242909 count:25984 rate:0.0109973
host:1714636915 hash:4167178899 count:26113 rate:0.0110519
host:749241873  hash:3575756704 count:26202 rate:0.0110895
host:1025202362 hash:2435165496 count:26208 rate:0.0110921
host:1585990364 hash:1528149985 count:26263 rate:0.0111154
host:2001100545 hash:3446016448 count:26377 rate:0.0111636
host:1998898814 hash:1116227003 count:26415 rate:0.0111797
host:1350490027 hash:1503077054 count:26472 rate:0.0112038
host:1804289383 hash:4001391486 count:26715 rate:0.0113067
host:1749698586 hash:1034046264 count:26729 rate:0.0113126
host:1411549676 hash:2524237029 count:26744 rate:0.0113189
host:149798315  hash:2620653860 count:26859 rate:0.0113676
host:1369133069 hash:474177003  count:26875 rate:0.0113744
host:2089018456 hash:2266701595 count:27028 rate:0.0114391
host:1889947178 hash:533689514  count:27869 rate:0.0117951
host:1653377373 hash:2085222316 count:28508 rate:0.0120655
host:2053999932 hash:399467675  count:30092 rate:0.0127359
host:1918502651 hash:941599913  count:31948 rate:0.0135214
max:31948
avg:23627
min:18650
up:0.352182
down:0.210649

复盘 CheckList

复盘步骤

  • 过程回溯
  • 问题描述 & 影响统计(正面、负面)
  • 优化方案 & 方案评估
  • 经验总结
  • 方案执行追踪

复盘检查点

下面是一些标准问题。如果在原因分析/改进措施中已经处理,请写上改进措施序号。对于没有处理的,说明为什么不需要处理。

例行检查点 答案示例
为什么会发生这个问题? 内部不够、以及新出的坏道告警处理方式双方没有对齐。
在哪个阶段出现的问题? design、coding、操作?
为什么测试阶段没有发现? 新上的告警灰度阶段发现的问题,测试阶段无法覆盖
系统为什么不能容错? 不需要处理
能不能更早发现问题? 双方建立告警对接机制
解决过程能否更快? 不需要处理,主要是流程优化。
怎么防止类似的事情发生? 加强内部宣导;进一步完善SOP;对于新告警,双方及时对齐处理方案
我们是否过度思考了? 方案太过长远,无法解决短期问题
方案三阶段 短期、中期、长期

vim常见插件汇总

Vim作为强大的文本编辑器,具有丰富的插件和功能。以下是一些常用的Vim插件:

NERDTree:在Vim中打开一个侧边栏,可以浏览和管理文件目录。

CtrlP:快速模糊查找文件和缓冲区中的内容。

Vundle:用于管理Vim插件的插件管理器。

YouCompleteMe:提供自动代码完成和语义分析功能,对于编码和改善程序员的工作流程非常有用。

Syntastic:对某些文件类型进行语法检查。

Fugitive:提供Git的集成,方便用户进行源代码版本控制。

Vim-airline:自定义状态栏,提高用户的工作效率和审美体验。

Vim-markdown:对Markdown文档进行语法高亮,方便编辑和查看。

Vimtex:LaTeX编辑器插件,提供自动完成以及LaTeX相关功能。

UltiSnips:提供代码段补全和自定义代码片段功能,方便编写重复的代码或快速输入命令。

这只是Vim插件的冰山一角,还有很多其他有用的插件可以扩展Vim的功能和增强用户体验。

C++网络服务迭代开发-模板

需求分析

需求相关内容

系统设计

系统升级方案

系统设计、更改相关内容

测试用例设计

No. case 预期
1
2
3
4 回归测试-点

发布方案

包含灰度、全量发布流程

人力评估

项目 人力
合计

编码开发

grerrit code review地址:

测试结果

功能测试

No. case 预期结果 测试通过(✅ or ❌)
1
2
3 回归测试

性能测试

qps cpu idle 内存 失败率 延时P99 是否通过(✅ or ❌)
实验 50%
基线 50%
实验 30%
基线 30%

稳定性测试

测试项目 qps 内存是否异常增长 是否产生core文件 是否通过(✅ or ❌)
cpu idle 保持50%以下,持续压力测试4小时

特征打分一致性验证

测试项目 是否通过(✅ or ❌)
预发环境验证打分一致性,bdsp任务结果是否通过

上线部署

预发验证

功能验证

性能验证

分类 项目 是否通过
性能 CPU
内存
延时
带宽
服务 失败率
超时率
错误日志
特征大小变化
数据打分 ctr 骤变
cvr 骤变,各类cost type
cvr2 骤变
ecpm
特征一致性监控 观察特征一致性报警

灰度上线并验证

功能验证

性能验证

分类 项目 是否通过
性能 CPU
内存
延时
带宽
服务 失败率
超时率
错误日志
特征大小变化
数据打分 ctr 骤变
cvr 骤变,各类cost type
cvr2 骤变
ecpm
特征一致性监控 观察特征一致性报警

主干上线并验证

功能验证

性能验证

分类 项目 是否通过
性能 CPU
内存
延时
带宽
服务 失败率
超时率
错误日志
特征大小变化
数据打分 ctr 骤变
cvr 骤变,各类cost type
cvr2 骤变
ecpm
特征一致性监控 观察特征一致性报警

一道关于字符串算法题

题目

输入a b c 三个字符串,判断c是否是a、b中的字符,并且分别按在a、b中的顺序出现在c里

#include 
using namespace std;

// case 0 , nomal
// case 1 , null
// case 2 , a、b重复
// case 3 , a 或 b有叠词
// case 4 , a、b 字符串逆序

bool foo(char* a, int sa, char* b, int sb,  char* c, int sc) {
    if (nullptr == a || nullptr == b || nullptr == c ) {
        return false;
    }
    if (sa < 0 || sb < 0 || sc < 0) {
        return false;
    }
    if ( sa + sb != sc) {
        return false;
    }
    if (*c != *a && *c != *b) {
        return false;
    }
    if (sa == 0 && sb == 0 && sc == 0) {
        return true;
    }
    if (*a == *b && sa > 0 && sb > 0) {
        bool check = false;
        check |= foo(a+1, sa-1, b, sb, c+1, sc-1);
        check |= foo(a, sa, b+1, sb-1, c+1, sc-1);
        return check;
    } else if (*a == *c && sa > 0) {
        return foo(a+1, sa-1, b, sb, c+1, sc -1);
    } else if (*b == *c && sb > 0) {
        return foo(a, sa, b+1, sb-1, c+1, sc-1);
    }
    return true;
}

int main() {
    cout << "0 foo expect 1:" << foo("",0, "",0, "",0) << endl;
    cout << "1 foo expect 1:" << foo("abc",3, "123",3, "a1b2c3",6) << endl;
    cout << "2 foo expect 0:" << foo("acb",3, "123",3, "a1b2c3",6) << endl;
    cout << "3 foo expect 0:" << foo("acb",3, "123",3, "a1b2b3",6) << endl;
    cout << "4 foo expect 0:" << foo("acc",3, "123",3, "a1b2c3",6) << endl;
    cout << "5 foo expect 1:" << foo("acc",3, "123",3, "acc123",6) << endl;
    cout << "6 foo expect 1:" << foo("acc",3, "1b23",4, "a1cbc23",7) << endl;
    cout << "6 foo expect 0:" << foo("acc",3, "1b23",4, "a1cbc2",6) << endl;
    return 0;
}
  • 题目来自今日头条

C/C++编程索引

本博客中关于C/C++编程内容的索引

语言特性

编译器

经典实例

无锁编程

其他

分支路径图调度框架在vivo效果广告业务的落地实践

大家好,我是vivo效果广告预估服务的架构师刘作程。非常荣幸能在2022 vivo 技术大会和大家见面,我今天带来的分享是《分支路径图调度框架在vivo效果广告业务的落地实践》。

在介绍分支路径图调度框架之前,请容我先向大家介绍vivo效果广告预估服务。

vivo效果广告实时在线服务,是提供实时AI算法推荐的服务。在广告投放场景,承载了一天百亿级别数量的请求,支持VIVO广告收入。在可用性、可扩展性等方面具有非常高的要求。服务中起到调度作用的模块,上下游依赖特别多,比如特征服务、ABT实验平台、实时数据流、模型计算模块等等。调度模块在请求下游服务的方式都是采用异步的方式。那么我们是怎么管理这么多异步请求的呢?

异步调用已成为系统设计中的主流方法。虽然异步调度提升了系统性能,提升了资源的利用率,但却对系统的可扩展性和可维护性提出了挑战。回忆历史中用过的异步管理方法有以下三种。

面向过程方法,树调度,有限有向图管理。单纯使用面向过程的方法,简单却粗放,随着下游服务增多,代码逻辑中产生大量的callback函数和类。使得系统调度过程繁杂无序,可扩展性和可维护性变差。

树调度方法,使得异步调度进入框架调控的新阶段。可扩展性方面较面向过程的方法要好很多。但树结构不能准确描述复杂服务的调用流程。

有限有向图,是目前使用最为广泛的方法。扩展性较好,能管理复杂的调用流程。但这是否是一种完美的方法呢?不是的。有限有向图,对图中节点是全路径访问,对分支路径的管理不够友好。因此我们依然要探索新的方法。

为了解释有限有向图在实时在线服务中的局限性,我们以vivo效果广告预估服务的调度流程为例,进行说明。把调度流程经过抽象后,调度流程如左图一样简洁明了,但这只是理想状态。

那么现实状况又是什么样子的呢?大家来看,和所有的实时在线服务一样,我们在系统设计时,为了系统健壮,总要和大量的异常和超时做斗争。并且除了异常和超时,系统还需要有兜底逻辑。上一个简洁明了的有限有向图已经不复存在。

为了进一步说明,有限有向图不能完全把控在线服务中异步调度的流程。我们依据刚才展示的流程调度图,做了一个状态转换图。在这个图中,展示了系统中各个状态的流转路径,总数达到了7条之多。有限有向图是一种全路径图调度框架,已经难以适用复杂度不断增长的系统。

全路径图调度框架具体有什么痛点,逼迫让我们去寻找新的方法。具体原因就在这里,有限有向图在落地实践中,使用skip状态变量约束路径。系统在处理一次任务的过程中,不经过的路径上的所有节点状态都会被设置为skip。

全路径的图调度框架中,每增添一个节点,会导致:控制变量数量 +1,状态全集 x2, 复杂度成指数增长。在如此状态下,对复杂系统添加流程或调整流程,就会变成工程师的噩梦。设计开发时必须小心甚微,上线时则如履薄冰。

为了安全开发和提升迭代效率,新的调度方法被迫切地创建出来。那就支持分支路径的图调度框架。它的原理,则是在原有的图调度框架中,添加两处功能,一是加入了分支节点,二是对于图中节点的触发和激活支持“与”激活和“或”激活。是不是像极了逻辑电路呢?

大家是否有疑问,添加两处修改就可以了吗?这样做真的有效吗?其实依据,就来自于我们的《编译原理》里的常见概念和常见规律。

有限有向图,是一种NFA,即不确定的有限自动机。我们都知道,在实践中NFA实现难度很大,它不如DFA简洁和简单。所以结论呼之欲出,支持分支路径的图调度框架,则是把图变成DFA。我们还给它起了新名字,DDAG。

实践是检验真理的唯一标准,我们再通过实际的落地过程看,分支路径调度框架是否满足我们的预期。

我们回忆一下vivo效果广告预估服务的调度流程,那一个充满着异常、超时、兜底逻辑的调用图。为了便于说明,我们把它化简了一下,变成一个流程图,如左图,目前它还是一个全路径的调度图。使用单路径调度框架改造后,变成右图,图中添加了判断节点,具体路径的走向则由判断逻辑来控制。大量Skip的状态控制变量依然不复存在。 图中蓝、青、红分别代表了3条路径,让路径和流程一目了然。

当图与实时在线系统融合在一起的时候,我们发现了单路径图调度框架更多的提升空间。 比如,图的整体的超时、异常管理,图中节点的超时、异常管理, 以及复杂图结构的自动化简,会成为我们以后进一步升级的空间。

单路径图调度框架在VIVO效果广告预估服务中做了一次成功的实践。它帮助工程师降低在开发过程中的风险,提速在项目中的迭代效率。让算法预估服务的飞轮越转越快!(握拳)

C++实现一个空的日志类

在C++项目中,一些场景下,可能会用到一个空的日志类,覆盖项目中的日志实现,让项目不打印日志,比如:

  • 屏蔽某个类库中的日志(因为库的日志打印机制设计的不够好)
  • 单例测试中代码中,屏蔽项目日志

应对以上需求,本文实现了一个不打印日志的类,有如下特点:

  • 覆盖项目中日志的宏定义,比如LOG_INFO等,不打印日志
  • 重载operator<<操作符的实现

实现中有哪一些陷阱呢?主要在std::endl上面。

在重载operator<<操作符的时候,使用LOG_INFO<<"log"<<std::endl; 时,会提示,std::endl是未知类型(unresolved overloaded function type)

关于std::endl的本质,是一个函数模板,本文不做详细阐述,只给出实现方法。

代码如下,使用c++11标准实现:


#ifdef DISABLE_PRJ_LOG

#undef LOG_DEBUG
#undef LOG_INFO
#undef LOG_WARN
#undef LOG_ERROR
#undef LOG_FATAL

#include 
#include 

class BlankLogObject {
public:
    static BlankLogObject& Ins() {
        static BlankLogObject obj;
        return obj;
    }
    BlankLogObject& operator<<(void* in) { return *this; }
    BlankLogObject& operator<<(char in) { return *this; }
    BlankLogObject& operator<<(const char* in) { return *this; }
    BlankLogObject& operator<<(const std::string& in) { return *this; }
    BlankLogObject& operator<<(int16_t in) { return *this; }
    BlankLogObject& operator<<(int32_t in) { return *this; }
    BlankLogObject& operator<<(uint32_t in) { return *this; }
    BlankLogObject& operator<<(int64_t in) { return *this; }
    BlankLogObject& operator<<(uint64_t in) { return *this; }
    BlankLogObject& operator<<(float in) { return *this; }
    BlankLogObject& operator<<(double in) { return *this; }
    BlankLogObject& operator<<(BlankLogObject& in) { return *this; }
    BlankLogObject& operator<<(std::ostream& in) { return *this; }
    BlankLogObject& operator<<(std::ostream (*in)(std::ostream&)) { return *this; }

    typedef std::basic_ostream > endl_type;
    BlankLogObject& operator<<(endl_type& (*in)(endl_type&))  { return *this; }
};
}

#define LOG_DEBUG rec::BlankLogObject::Ins()
#define LOG_INFO  rec::BlankLogObject::Ins()
#define LOG_WARN  rec::BlankLogObject::Ins()
#define LOG_ERROR rec::BlankLogObject::Ins()
#define LOG_FATAL rec::BlankLogObject::Ins()
#endif
int main() {
    LOG_INFO << "skf" << 123 << 0.1 << std::endl;
    return 0;
}

关于gcc中string的cow机制

梗概

已知 C++ 11禁止使用COW,本文进行验证。

测试代码

#include 
#include 
#include

int main() {
    std::string origin("cow\n");
    std::string copy( origin);

    printf(origin.c_str());
    printf(copy.c_str());
    printf("origin\taddress is %x\n",(int64_t)origin.c_str());
    printf("copy\taddress is %x\n",(int64_t)copy.c_str());
    copy[0] = 'w';
    printf(origin.c_str());
    printf(copy.c_str());
    printf("origin\taddress is %x\n",(int64_t)origin.c_str());
    printf("copy\taddress is %x\n",(int64_t)copy.c_str());
    return 0;
}

使用gcc 6.3.0测试

$ g++ -o cow cow.cpp --std=c++98

$ ./cow
cow
cow
origin  address is 8c69cc40
copy    address is 8c69cc20
cow
wow
origin  address is 8c69cc40
copy    address is 8c69cc20

$ g++ -o cow cow.cpp --std=c++11
$ ./cow
cow
cow
origin  address is 28240270
copy    address is 28240250
cow
wow
origin  address is 28240270
copy    address is 28240250

使用gcc 4.8.5测试

$ g++ -o cow cow.cpp --std=c++98
$ ./cow
cow
cow
origin  address is aa9028
copy    address is aa9028
cow
wow
origin  address is aa9028
copy    address is aa9058

$ g++ -o cow cow.cpp --std=c++11
$ ./cow
cow
cow
origin  address is 23a0028
copy    address is 23a0028
cow
wow
origin  address is 23a0028
copy    address is 23a0058

结论

从gcc的实际的实现看,cow 并非是c++11之后禁止的,而是从gcc的某个版本开始禁止cow机制,与c++11无必然关系。

Google protobuf使用技巧和经验总结

技巧 & 经验

性能优化

把repeated message结构尽可能摊平为基础类型的repeated字段

基础类型的repeated字段,包含 repeated int32, int64, float,double,bool等,但不包含string、bytes、message

比如:

message Item {
    int32 id = 1;
    int32 score = 2;
}

message R {
    repeated Item items = 1;
}

改为下面的设计,会提升序列化和反序列效率

message R {
    repeated int32 item_id = 1;
    repeated int32 item_score = 2;
}

原理是非string的基础类型的repeated字段,在申请内存时pb会申请连续线性大块内存,效率高;而message 的repeated字段,会按对象逐个去申请空间。

这种设计还有一个好处就是不会触发调用析构函数,如果采取 repeated Item 这种结构,会触发大量的析构函数,浪费 CPU 与 时间。(by chengyuxuan_yutian@163.com)

善用arena管理内存

  • arena对基础类型,比如int32, int64, float,double,bool等管理效率优化明显
  • arena不会管理string类型的内存申请(更高的版本已经支持,待验证)。

用固定长度repeated uint32 替换字符串

字符串是一种不定长的数据结构,内存管理方式成本较高。对于定长的字符串,通过转换成repeated uint32类型,可以获得更高效的管理。

除此之外,repeated uint32 也支持由arena管理。

善用Any类型

假设3个网络服务的调用关系如下:
A->B->C。
其中存在某些pb结构仅会由B透传给C,而B不需要解析,则可以把这些pb放入定义为any类型的字段中。

善用浅拷贝机制(set_allocated_xxx/release)

CopyFrom是深拷贝,若要实现浅拷贝则可以通过 set_allocated_xxx/release 两个函数进行控制

结合arena使用浅拷贝机制(unsafe_arena_set_allocated)

set_allocated_xxx的风险在于,pb析构的时候会把元素也析构掉,无法重复利用。且在一些特殊场景,在无法控制pb析构而不能使用release函数。这些场景可能是pb的析构工作由框架控制,旧的代码封装层次太深等。

这种情况可以使用unsafe_arena_set_allocated_xxx 避开这个问题。

使用陷阱

不要有交叉依赖

举例一C++系统中,模块A依赖 PB;模块B依赖PB,而模块C依赖A和B。则编译模块C时一定要同时编译A、B、PB。

利用protobuf一些特性来规避陷阱

良好的可扩展性 & 保留未定义字段

良好的可扩展性使得protobuf更好地向后兼容。上游更新了proto,新增字段,下游虽然没有更新proto文件,但是新增的字段依然可以保留,来自上游的字段可以透传给下游。拼接下游请求的结构pb时,尽可能使用CopyFrom,避免把字段逐个set。

使用编号定位存储的字段

为了更好地向后兼容,应该避免修改proto文件中现有字段的名字、类型。需要修改时,通过追加新字段(字段编号增加),弃用旧字段的方式。

故障相关

protobuf被广泛使用,饱经业界考验,如果遇到问题,绝大多数还是自身软件设计的问题。遇到问题,首先不应该怀疑protobuf,应该把视角集中到去发现自身的系统设计缺陷中。

一次内存泄露的故障排查

现象:

公司里一个c++网络服务中, PV较低时没有内存泄露;而PV较高,cpu idle降到30%以下,开始内存泄露,直到OOM。

排查过程:

用了 tcmalloc和gperf,逐步定位是protobuf 申请 repeated字段的空间,没有及时释放。repeated字段约1k~1w的规模。然后逐步缩小范围。

结果:

竟然是释放内存,都放到了一个线程中。当流量大时,单个线程计算能力成为瓶颈,内存释放变慢,表现为内存泄漏。

总结

protobuf释放的代价较大,不要全部的protobuf只放在一个线程操作。场景距离,lru cache的过期元素剔除。

https://www.cnblogs.com/zgwu/p/10403939.html

工程师文化&工匠精神名言名句

Scientists study the world as it is. Engineers create the world that has never been. –Theodore von Kármán

但尽管Google的核心算法出自冰雪聪明的工程师创新,但能否拧成一股绳才是公司持续发展的关键。

不要去做设计高手,只去做综合素质高手! –贝聿铭

知者创物,巧者述之守之,世谓之工。百工之事,皆圣人之作也。 –《考工记》

我亦无他,惟手熟尔。(欧阳修《卖油翁》)

打工的状态并不可怕,打工的心态很可怕。(付守永《工匠精神》)

人心惟危,道心惟微;惟精惟一,允执厥中。(《尚书·大禹谟》)

建于精诚,工于品质。–作者:宋杰

在长期实践中,我们培育形成了爱岗敬业、争创一流、艰苦奋斗、勇于创新、淡泊名利、甘于奉献的劳模精神,崇尚劳动、热爱劳动、辛勤劳动、诚实劳动的劳动精神,执着专注、精益求精、一丝不苟、追求卓越的工匠精神。劳模精神、劳动精神、工匠精神是以爱国主义为核心的民族精神和以改革创新为核心的时代精神的生动体现,是鼓舞全党全国各族人民风雨无阻、勇敢前进的强大精神动力。——《人民日报》2020年11月25日

劳动者素质对一个国家、一个民族发展至关重要。技术工人队伍是支撑中国制造、中国创造的重要基础,对推动经济高质量发展具有重要作用。要健全技能人才培养、使用、评价、激励制度,大力发展技工教育,大规模开展职业技能培训,加快培养大批高素质劳动者和技术技能人才。要在全社会弘扬精益求精的工匠精神,激励广大青年走技能成才、技能报国之路。——《人民日报》2019年9月24日

执着专注、精益求精、一丝不苟、追求卓越。(领导人讲话)

查看Linux Cpu软中断脚本

代码如下:

#!/bin/sh
# filename softirqs_status.sh

PATH=/bin:/sbin:/usr/bin:/usr/sbin
export PATH

interrupts_=cat /proc/interrupts |grep -E "CPU0|em[0-9]{1,2}.*-[0-9]{1,3}$|eth[0-9]{1,2}.*-[0-9]{1,3}$|p[0-9]{1,2}p[0-9]{1,2}.*-[0-9]{1,3}$|nvme" |cat -s |column -t 
#interrupts_=cat /proc/interrupts |grep -E "CPU0|em[0-9]{1,2}.*-[0-9]{1,3}$|eth[0-9]{1,2}.*-[0-9]{1,3}$|p[0-9]{1,2}p[0-9]{1,2}.*-[0-9]{1,3}$" |cat -s |column -t 
#interrupts_=cat /proc/interrupts |grep -E "CPU0|em[0-9]{1,2}|eth[0-9]{1,2}|p[0-9]{1,2}p[0-9]{1,2}" |grep -i  "TxRx" |cat -s |column -t 
interrupts_=echo "${interrupts_}" |awk '{ if(NR == 1){one=$0;gsub(one, "- &", $0); print $0}else{print $0}}' |cat -s |column -t 
interrupts_=echo "${interrupts_}" |awk '{ for(i=2;i<=NF;i++){ if( $i ~ /^[[:digit:]]*$/ && $i >= 1000 && $i < 1000000 ){$i=int( $i / 1000 + 1)"K"}else if( $i ~ /^[[:digit:]]*$/ && $i >= 1000000 ){$i=int( $i / 1000000)"M"}}; print $0 }'
echo "${interrupts_}" |sed 's/CPU/U/g' |column -t

echo

cpu_c=expr $(cat /proc/cpuinfo |grep -i processor |wc -l) + 1 ;

softirqs_=cat /proc/softirqs |grep -E 'TX|RX|CPU' 
softirqs_=echo "${softirqs_}" |awk -v cpu_c=$cpu_c '{ if(NR == 1){one=$0;gsub(one, "- &", $0)}; for(i=1;i<=cpu_c;i++){print $i} }' 
softirqs_=echo "${softirqs_}" |xargs -n $cpu_c |column -t 
softirqs_=echo "${softirqs_}" |awk '{ for(i=2;i<=NF;i++){ if( $i ~ /^[[:digit:]]*$/ && $i >= 1000 && $i < 1000000 ){$i=int( $i / 1000 + 1)"K"}else if( $i ~ /^[[:digit:]]*$/ && $i >= 1000000 ){$i=int( $i / 1000000)"M"}}; print $0 }'
echo "${softirqs_}" |sed 's/CPU/U/g' |column -t

echo

显示结果如下

 bash /tmp/softirqs_status.sh
-    U0   U1  U2  U3  U4  U5  U6  U7  U8  U9  U10  U11  U12  U13  U14  U15  U16  U17  U18  U19  U20  U21  U22  U23  U24  U25  U26  U27  U28  U29  U30  U31
89:  102  0   0   0   0   0   0   0   0   0   0    0    0    0    0    0    0    0    0    0    0    0    0    4M   0    0    0    0    0    0    0    0    IR-PCI-MSI-edge  em49-fp-0
90:  4K   0   0   0   6M  0   0   0   0   0   0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    IR-PCI-MSI-edge  em49-fp-1
91:  50   5M  0   0   0   0   0   0   0   0   0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    IR-PCI-MSI-edge  em49-fp-2
92:  51   0   7M  0   0   0   0   0   0   0   0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    IR-PCI-MSI-edge  em49-fp-3
93:  64   0   0   6M  0   0   0   0   0   0   0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    IR-PCI-MSI-edge  em49-fp-4
94:  32   0   0   0   3K  0   0   0   0   0   0    0    0    0    0    0    0    5M   0    0    0    0    0    0    0    0    0    0    0    0    0    0    IR-PCI-MSI-edge  em49-fp-5
95:  41   0   0   0   0   6M  0   0   0   0   0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    IR-PCI-MSI-edge  em49-fp-6
96:  53   0   0   0   0   0   6M  0   0   0   0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    0    IR-PCI-MSI-edge  em49-fp-7

-        U0  U1  U2   U3   U4  U5   U6   U7   U8   U9   U10  U11  U12  U13  U14  U15  U16  U17  U18  U19  U20  U21  U22  U23  U24  U25  U26  U27  U28  U29  U30  U31
NET_TX:  35  26  667  114  93  160  106  1    0    0    0    0    0    0    0    0    1    2K   0    2    0    0    0    51   0    0    0    0    0    0    0    0
NET_RX:  5K  5M  7M   6M   6M  6M   6M   707  720  462  542  507  541  866  392  332  127  5M   289  199  251  244  194  4M   204  3K   396  446  217  165  164  128

Mini Server问题集锦

Nextcloud

更新版本时错误,[version 22.2.x]

Step 4 is currently in process. Please reload this page later

解决方法:

nextcloud路径/data/updater-xxxxxxx文件夹

frp

运行时错误, [verion 0.33、0.37.1 、0.38]

2021/11/16 22:23:50 [E] [control.go:158] [dbf0bbaf2a7be98c] work connection closed before response StartWorkConn message: read tcp 192.168.199.111:59690->xx.xx.xx.x:443: read: connection reset by peer

当天流量太大,网络运营商切断了链接,过一天就好了。

分布式系统讲义(二、负载均衡策略)

同学们欢迎回来,这节课,我们来讨论分布式系统的那些事当然负载均衡的问题,什么是负载均衡的问题,呢就是上一节课。我们讲过分布式系统呢可能会有非常多的服务器来呃来组成这个系统。那么,这个系统,它是如何分工的呢?当很多用户都过来发送请求的时候,这些请求应该怎么样,去被分摊到不同的服务器上,让他们来处理呢?这个呢,实际上就是负载均衡要解决的一个问题。负载惊鸿,从字面意思就是让我们的服务器的负载,都能够均衡些。

这是字面意思。那么,我们是怎么做的呢?我们还是一个WEB网站为例子,当非常多的用户同时或者说任意同一段时间里面都来请求我们的系统的时候,我们就要把这些请求给他给分流分流到不同的服务器上。然后让他们老师让不同服务器呢,然后都能够同时来去处理这样用户。那么,这样做的一个目的是什么呢?第一个就是一定要让我们的服务器就是能够均匀地去呃去承受这些用户的请求,其实从用户的那个角度来讲呢,就是它能够及时的,它发出的请求能够被及时都给处理。

那么,我们来举这么一个例子啊,假如我们没有负载均衡的情况会怎么样?一个分布式系统,哎,哪怕强,如美国队长。那么,非常多的一个系统,它来一个请求就出了一个请求。那么,他也是处理不过来的,而好在呢我们是有非常多的一些服务器,我说一些计算单元,然后来平摊。那么,这些工作,这样的话,我们就我们就会让这个系统,呢然后在同一时间就可以处理非常多的一个系统,呃非常多的一个请求。那么,我们是怎么样去用什么一个方法,然后去把这些系统给它分流,呢这个时候,呢,就说到一个负载均衡的一些策略,最常见的一种两种策略呢是一个时装的,另外一个是软的意思,呢绒的的意思,呢实际上是轮询的意思就是来一个请求分配给你当前的第一台服务器。

那么,来第二个请求的时候,然后就把这一个请求呢,分配给第二个处理器进行呃进行处理,第三个就返回第三个,然后所有的服务器转了一圈以后,然后就这么轮询的完成。那么随机这种策略呢然后也是很好理解了。当一个请求过来的时候,它会随机的a选一台服务器,给他去分配处理这么一个星球。这样这两种复赛翻飞的策略,呢然后都能够让所有的机器呢然后都在运行,而不会出不会出现某一台机器,比如说不干活,另外一些机器又特别忙是吧?

这样子,呢然后就可以更快地去处理一些用户用户理解请求呃这么做的一个目的,呢实际上就是为了让那个用户呢然后及时的去那个得到呃得到处理。其实我们有的时候,我们可能会去一些。比如说政府部门或者说去银行办业务的时候,其实我们会排队,其实,这个原理呢,其实已经是非常类似了,在银行当中获得政府过程当中,它有非常多的一扇窗口。那么,他的那个排队系统用的是什么样的策略?呢其实很多时候是一个荣的是吧?那其实也不全是中的落地呃也不全是一个软得对,因为实际过程中很复杂,其实,我们解决计算机的问题。

啊跟我们的解决。现实生活中这么一个I期银行排队这么一个我其实很类似的,但是呢,我们也面临一个一个问题,就是有的服务器呢,年富力状有的服务器呢,性能比较差。那么这个时候你就不能用,就简单粗暴地用轮询,或者说说用随机的这种方法变得给他去哪个范围这个任务。那么,这个时候,有用的一个什么方法呢?就是一个加权,我们叫价钱的一个轮询,有些有些服务器呢年富力强,我们给他给他个全职,比如说一点吧,有些机器呢弱一点,啊我们给它的系数是0.8。
这样子的话,那么,一点的服务器他就可以接受更多的那些处理接触更多的请求去处理它,能够更快的呢,然后去返回那个性情用户请求的结果就有这么一种方法。那么,我们除此之外呢,实际上还有一种负载均衡方法。教你的意思呢,是叫最小负载的一种方法,呃这句话是怎么理解呢?这句话呢?实际上是一种比较,我们设置权重都要学人工的去标,实是把这台机器是个年富力强的还是一个呃,比如说是一个特别就的这么一个电脑,一个信号。那么,我们能不能自动系这个时候呢,实际上在我们的负载均衡里面,我们会写一个算法,我们会依据。
比如说CPU某一台机器的响应时间,或者说某台机器,它的那个队列的一个等待时间,我们通过一个一个算法,然后我们就能计算出它的全职来获得全职高的,呢它依然会给他分配更多的更多的一些请求我举个例子,大家如果是做c加加开发的话,会有一个b r PC的c加加框架,它里面那就有一个叫LA的这么一个负载均衡的一个算法,它呢,就依据它下游的服务器的一些等待时间,比如说CPU的负载还有那个返回的状态么?和响应时间,他就能判断出这台机器是不是一个性能更高的机器还是一个性能差的机器?

这样子的话,他就可以比较自动的去那个吧,就是自动的,去给那个依据机器的不同性能给他分配不同数量的一些请求。嗯,咱们再说一下。那么负载均衡他,有的时候,它是依据什么呢?我们依据什么来判断,就说我给哪台机器呢?其实还有一种方式就是我,如果我们做,如果我们做一个映射和随机的话,我还可以通过。比如说听说我校4层4层负载均衡还有呢?叫7层负载均衡这句话是什么意思呢?7层和4层负载均衡脑,只要指的是TCP IP当中的第7层和第4层4层呢,是指我们依据IC IP的地址呢,然后呢,我们就给他进行随机的去分配。

呃依据这个IP地址,然后给他做负载均衡,然后给他分配的后面相应的机器当中去。还有一个7层负载均衡,呢实际上是一具,比如说第7层实际上在TC标题里边是应用层,那么我们就一句在应用层当中的一些什么一些带过来的一些信息,我说是一些cookie,就类似于这样的一些应用带过来一些信息呢,然后给他趣,唉,比如说随机分配给那个下游的一些服务器,这个呢,是依据网络环境,然后我们来去见4层呢还是西,曾因为4层大家都知道的IP内层会更基础一些,会底层一些好了,大家我们来就是总结和分享一下。

总结一下,就是我们的负载均衡的方法都有哪些几种呢?实际上其实呢?都有这四种,主要都是有4楼,第一个呢,就是轮询随机。还有最小负载对外还有一个最小链接,但是也让过程中我们可能不是在实际过程中不会经常常见,我们来那个给负载均衡下一个定义啊。那么,食什么是负载均衡负载均衡,其实它的本质,啊它是一个起到一个路由的作用就是当一个用户过来的一个用户的请求。过来的时候,我们怎么给他录入到后端的一台机器上,让他去处理。
这有这个嗯完成了这么一个负载均衡以后,呢实际上分布式对于一个应用来讲,是否就结束了呢?呃其实不是的,啊就是我们怎么样去找到它下面的服务。呢这个内容呢我们会在下节课的时候,然后进行一个呃进行一个讲解。

分布式系统讲义(一、什么是分布式系统?)

同学们大家好,今天由我来同大家一起,通过大白话的方式,聊一聊分布式系统的那些事儿。

到今天为止,分布式系统已经是一种非常基础或者说非常通用的一种技术了。在互联网应用的后端服务中,分布式技术几乎无所不在,那么到底什么是分布式系统?我们为什么要用分布式系统?它到底有哪些优势?以及我们今后工作中面临的一些问题,如何通过设计分布式系统,解决这些问题呢。围绕着这些问题,我们在这门课中展开来讨论。

回到这门课的最开始的一个问题,什么是分布式系统?在给出这个定义之前,我们先来看一下一个后端服务的演化过程,拿一个外部网站来举例,最早的时候,它只部署在一台机器上,这一台机器承受的用户的数量是很有限的,一台服务器大概只能承受住300个人同时进行访问,但是用户量会慢慢的变大,从最开始的300到后来的1000人,在到后来的几万人。中国现在大概有接近10个亿的网民,这么1个数量级的用户数,我们这么一台服务器绝对是承受不住的,这个时候应该怎么办?

一般想到的它有两种方法,我们说的第一种方法升级我们的服务器,把我们的服务器从一个性能比较差的变成一个性能比较好的,它数据的存储本来是比如说只有几百g然后把它变成几个t这么一个容量的磁盘放到这个服务器里边去,这样子也能解决问题,但是它只解决了比如说用户数从300到1000这么一个问题,但是我们的用户数如果是几个亿,那个时候怎么办?

我们会有一个第二种方法,就是用几千台机器,然后让几千台机器,然后相互协作,去满足几亿用户的这么一个数量的这么一个访问,所以这个方法基本上已经给出了一个分布式系统的一个雏形了,由此我们其实是可以引出来,那么分布式系统到底是一个什么样的定义。然后出自分布式的概念与设计这么一本书,就是国外的一个经典的用来用来介绍分布式的这么一本经典巨著。

他说分布式系统是指位于联网计算机上的组件,仅通过传递信心进行通信和协调行动的系统。同时他又给了这么一句话叫分布式系统,它是一个即便是已经崩溃了部分机器,但是它仍然能够完成工作这么一个系统。除此听到这么几个定义,大家可能还是感觉比较细。

我来给大家举一个例子,我所在的智能平台部,我所负责的系统就是一个服分布式系统效果广告的预估服务,那么这个预估服务现在有多少台机器,这个事情我给大家说一下,大概是就是说大概是有3000台左右的机器,那么我们的小广告服务大概有3000台机器这么一个数量级的,而且这3000台机器各有分工,相互协作,然后能够满足我们现在广告这么一个业务。

同时它也满足这么一个定义。如果只有一部分机器宕机,那么它仍然能够满足效果广告,预估的工作,而不至于整个服务都不可用。

今天的例子,介绍什么是分布式系统,我们就是到此为止,下节课我们会去探索分布式系统它面临哪些技术挑战。谢谢大家。

Linux一些查看系统状态的命令

查看网卡速率

ethtool eth0 |grep Speed

这里的Speed是网卡的实际最大速率;如果开启了自动协商,就是是网卡和交换机协商之后的速率。

附带一个自动化脚本

PREFIX="p1p1"
ETH=ifconfig |grep ${PREFIX} | head -n1 | awk -F":" '{print $1;}'

if [ -z "${ETH}" ];then
   echo "emxx is not found"
   ifconfig
   exit 1
fi

ethtool eth0 |grep Speed

查看内存信息

  • 支持的内存槽数
  • 支持的最大内存

sudo dmidecode -t memory

输出如下:

# dmidecode 3.2
Getting SMBIOS data from sysfs.
SMBIOS 2.8 present.

Handle 0x0028, DMI type 16, 23 bytes
Physical Memory Array
        Location: System Board Or Motherboard
        Use: System Memory
        Error Correction Type: None
        Maximum Capacity: 8 GB
        Error Information Handle: Not Provided
        Number Of Devices: 2

Handle 0x002A, DMI type 17, 40 bytes
Memory Device
        Array Handle: 0x0028
        Error Information Handle: Not Provided
        Total Width: 64 bits
        Data Width: 64 bits
        Size: 4 GB
        Form Factor: DIMM
        Set: None
        Locator: A1_DIMM0
        Bank Locator: A1_BANK0
        Type: DDR3
        Type Detail: Unknown
        Speed: 1600 MT/s
        Manufacturer: Samsung
        Serial Number: 37A52995
        Asset Tag: A1_AssetTagNum0
        Part Number: M471B5173DB0-YK0
        Rank: 1
        Configured Memory Speed: 1333 MT/s
        Minimum Voltage: 1.35 V
        Maximum Voltage: 1.5 V
        Configured Voltage: 1.35 V

Handle 0x002C, DMI type 17, 40 bytes
Memory Device
        Array Handle: 0x0028
        Error Information Handle: Not Provided
        Total Width: Unknown
        Data Width: 64 bits
        Size: No Module Installed
        Form Factor: DIMM
        Set: None
        Locator: A1_DIMM1
        Bank Locator: A1_BANK1
        Type: Unknown
        Type Detail: Unknown
        Speed: Unknown
        Manufacturer: A1_Manufacturer1
        Serial Number: A1_SerNum1
        Asset Tag: A1_AssetTagNum1
        Part Number: Array1_PartNumber1
        Rank: Unknown
        Configured Memory Speed: Unknown
        Minimum Voltage: Unknown
        Maximum Voltage: Unknown
        Configured Voltage: Unknown

服务迁移至Kubernetes实践总结

梗概

Kubernetes相比传统的集群管理方式,表现出更多优势,因而国内互联网公司也都已经大规模使用 Kubernetes。

Kubernetes 拥有大量的用户群体和落地实践,做为生产环境的编排系统在成熟度和稳定性上已得到业界的认可。对于开发工程师来讲,把服务部署到k8s中,将会是运维工作的常态。

把服务迁移到K8S集群之前,需要对服务本身进行一些升级,对K8S进行适配。本文对升级过程中的一些常见问题,以及注意事项进行总结。

把服务迁移到K8S的意义是什么?

先说K8S有哪些特性呢?依据官方文档,有自我修复、弹性伸缩、自动部署和回滚、服务发现和负载均衡、机密和配置管理、批处理等特性。

这些特性无疑解放了开发工程师运维服务器集群的人力,可以把节省出的精力投入到系统设计、编码开发等价值更高的工作当中。

不要只看收益,不看风险。

K8S带给我们诸多收益的同时,也要注意到K8S给我们带来新的问题。

容器漂移

容器漂移是最常见的问题。容器在一台机器关闭,然后再另外一台机器上被启动。发生容器漂移的原因有滚动发布、物理机故障被回收、手工关闭容器、K8S集群进行碎片整理等。

容器漂移会给系统带来的负面影响有,节点一段时间内不可用、IP地址变化、缓存丢失、临时数据文件丢失等。

资源碎片

总有机器的一部分小块资源被闲置,却又不能满足有大块资源需求的服务。资源碎片会影响K8S集群的机器利用率,严重时会影响服务的发布。

资源隔离不完善-IO不能隔离

目前容器技术,没有实现对磁盘IO的资源隔离,因此同一宿主机上如果存在两个磁盘IO密集型的服务,有很大概率会相互影响。

尽管当前的容器技术已经实现对网络IO资源的隔离,但是可能受限于K8S的版本,以及公司内部CICD系统不提供对网络IO隔离的设置,因此对于网路IO资源密集的服务,部署到K8S时也要谨慎处理。

root权限

一些服务会调用只能root才有权限访问的接口,比如mlock。这些服务如果迁移到K8S,需要重新进行设计,因为公司里的CICD服务往往不会允许k8s中的容器以root用户运行。

系统调用失效

物理机和容器里面的接口发生了很大变化,举例说明:

  • CPU核数,获取物理机器核数的接口要替换成获取容器CPU核数的接口
  • 可用内存大小,获取物理机器内存大小的接口要替换成获取容器内存大小的接口

集群机器性能差异

k8s集群中,服务器可能来自不同厂商,采购的新旧型号不同,配件性能也可能会有差异。尤其不同cpu型号的性能差别巨大。而K8S不能把这种性能差异对应用变得透明。

一般方法是:

  • 添加机型约束
  • 把资源量化,比如1core 等于100算力(K8S暂未实现此功能)。

把服务迁移到K8S之前需要做哪一些升级?

支持容器化运行和部署

K8S本身就是容器的编排系统,所以服务要能在容器中运行起来。需要编写Dockerfile或docker-compose.yaml文件来实现编译镜像。

服务本身要具有良好的可伸缩性

服务本身需要具有容错机制,其上游也要有失败访问后的重试。除此之外,服务检测、坏点摘除、服务发现,也是必不可少的,所以需要把服务接入名字服务。

支持日志的集中采集

不同于传统的运维方法,我们难以进入每一个容器来查看进程的运行状况。把服务要接入日志中心的系统,在日志中心查阅、追踪系统状态是最好的选择。

监控指标集中上报

和把日志上报到日志中心的原因相同,把服务的监控指标,上传到监控平台/系统。而且为了减少依赖,尽可能使用SDK通过网络的方式上传监控指标,避免通过采集容器内日志文件的方式上报。

数据分片

对于一些数据分片的服务,需要有一个统一的分片管理模块。

可用性

K8S集群不总是可用的,我们在设计服务时,就要考虑对服务进行同城双活、异地多活等方式的部署。为了保证服务的可用性,把服务部署到同城不同K8S集群或者异地不同的k8s集群当中。

其他的注意事项

服务的日志自动清理,不然宿主机的磁盘会被用满,引发故障。

使用分布式文件系统来共享文件,比如ceph、OSS等。

程序异常时产生的coredump文件保留下来,以便排查问题。

自动弹性扩缩容

这是k8s的一个能大大节省运维工作,以及提升资源利用率的功能。

但自动弹性扩容在某些场景下会影响服务的可用性。以故障的灾后恢复为例,有问题的服务,被上线后,CPU负载降低,被自动缩容;问题修复后,开始重新扩容,扩容服务的时间可能是漫长的(可能加载内存数据慢、可能机器较多),服务不可用的时间也会变长。

总结

K8S也是一把双刃剑,一方面把开发工程师从繁琐的运维工作中解放出来;另一方面如何运维部署在K8S上面的服务,K8S较传统方法有较大的不同,开发工程师面对K8S这个新工具,要不断学习和总结。

算法工程团队的测试方法总结

服务于算法的工程团队的测试需求

测试的目的是为了保证在线服务的可用性、稳定性、正确性、找到并能消除系统的性能瓶颈。

后端服务的测试一般包含下面几个方面:

  • 功能测试
  • 性能测试(压力测试,测试cpu、内存、io)
  • 稳定性测试

除此之外,服务于算法的工程团队有一种特有的测试需求,就是一致性测试,包含如下两项:

  • 特征一致性验证,为了保证在线服务要保证所使用的特征传入
  • 模型一致性验证,保证在线服务和离线训练的服务一致
  • 打分一致性验证,在线模型打分与离线打分进行对比,主要为了保证在线模型的准确性

当前团队的开发特点

  • 使用C++语言开发在线服务,少数服务由python编写
  • 网络框架和协议使用 grpc/brpc/tars+protobuf
  • 微服务化,没有测试工程师,由研发工程师自主把控软件质量
  • 算法工程师和架构工程师,彼此分工协作,架构工程师需要向算法工程师提供简单便捷的测试工具。

因为上面前两点原因,当前团队无法使用当前业界里优秀的开源测试工具或平台,比如JMeter、iago 、Gatling、Grinder、Locust,还有阿里云的性能测试PTS等。因为这些工具或平台仅适用于http协议,无法满足当前团队使用的网络框架和rpc协议。

那行业中有没有其他公司做过类似的工具呢?有的公司在这方面进行了尝试,比如美团技术团队自有一套针对thrift的压测工具,可惜并不开源,参考https://tech.meituan.com/2016/01/08/loading-test.html。

最终团队只能自己开发测试工具。

团队测试开发的几个发展阶段

团队在从0到1开发在线服务过程中的同时,也在搭建团队自己的开发测试工具。工具不停地被完善,其中过程包含了下面3个阶段。

  • 测试脚本、小工具
  • 工具升级,做出流量回放工具、面向模型服务的专用测试工具等
  • 通用的后端压力测试平台

测试脚本、小工具

团队初期是通过编写测试脚本,来构造测试用例。用随机数生成的方式或通过简单的参数样本集合构造请求。

压力测试则是通过编写小工具来进行模拟上游服务的大量请求。

脚本和工具虽然简陋,但满足了项目初期的测试需求。这种测试方法持续了比较长的时间,逐渐发现如下问题:

  • 随机数生成的参数与实际情况差异较大;
  • 随着业务复杂度的提高,请求参数越来越多,参数的组合越来越多,测试脚本变多,无法管理。
  • 自动化程度低,效率不高

流量回放工具

通过拷贝线上少部分流量,落地到文件,在有测试需求时,把文件中的请求发送给待测试的服务。

这种方式存在如下问题:

  • 自动化程度依然较低,效率不高,尤其测试样本的管理
  • 录制的流量容易过时。模型和策略使用的特征,往往具有时效性。流量回放,特征已经产生变化。

专用测试工具

除了流量回放工具,团队也开发了专用的测试工具,比如面向运载算法模型的在线服务。

运载算法模型的在线服务是算法工程团队运维最多的服务,占用服务器达几千台,加载近百个版本的模型。模型频繁的迭代,需要最频繁的测试。

测试最多的方面为

  • 特征一致性验证
  • 模型的一致性验证
  • 打分的一致性验证

这里测试一致性的工具有

  • 在线diff工具
  • 特征一致性验证工具

在线diff工具

原理是,搭建线上和测试两套环境,同时发送相同流量,验证两个环境返回的字段、打分的不同。对于打分,会统计出不同精度下的差异率。差异结果,会通过监控系统的曲线展示出来。

特征一致性验证工具

原理为,保留每次请求的特征快照。对测试环境发起线上旁路流量,测试环境也同时落下特征快照,对两处快照做join之后,对比快照的不同。

专用工具实现了白屏化、可视化,但缺少通用性。不能适用于其他模块,也难以推广给其他团队使用。

通用的压力测试平台

为了解决自动化、可视化、平台化的问题,通用的压力测试工具被提出。

依据行业内积累的经验指导,尽管专用测试工具总会比通用测试工具在便捷性和功能性上要好于通用性工具。但为了节省重复在测试工具上的开发,最大可能赋能其他团队,通用的测试工具还是有一定意义。

比较容易实现的是压力测试工具。

如何设计一套通用的压力测试工具呢?

整体流程

通用的压力测试工具的基本流程,即流量的录制和回放。

但除了基本的流量回放,需要满足下面几点。

自动化、白屏化

包含两方面:

  • 流程管理的自动化
  • 流量样本的存储自动化
  • 进程管理自动化

前端管理页面由html+css+js实现;流程、存储、进程的管理的实现由脚本语言,比如python来完成;进程的所需的资源交由K8S调度和管理。

可视化

压测结果通过图表展现,常见的指标有:

  • 最大响应时间、平均响应时间
  • PV、QPS
  • TP50、TP90、TP99
  • 成功数、失败数、成功率

一般由时序数据库和图表展示工具实现,时序数据库可以选择使用Prometheus/indexdb等,图型展示使用grafana等。

也可以使用公司自研的数据采集和展示系统。

平台化

需要满足的需求多租户和组件化。

多租户是为了让压力测试平台供多人使用,也可以做到用户间测试资源进行隔离和共享。

组件化,是为了让通用的压力测试平台更简单地支持更多c++在线服务。具体的的实现方法如下:

  • 平台定义组件的统一接口,建立调用框架。
  • 不同组件依据平台的统一接口,定义自己的接口实现,并编译成为动态链接库。
  • 平台通过加载组件的动态链接库,实现面向不同服务的流量录制与回放。

当前问题总结

专用工具和通用压力测试工具的优劣对比

多个团队各有一套测试工具,统一起来有一定难度

brpc框架使用经验汇总

brpc自适应限流的一个bug

  • brpc 会不断尝试调小 max_concurrency, 然后重新计算
  • 因为上面的原因,当qps比较低时,max_concurrency会被计算的非常小
  • 一旦面临流量的突增,即便在机器的负载范围内,但因为超过max_concurrency,而返回限流的错误
  • 紧接上一条,即便客户端重试发送别的机器,因为下游服务整体max_concurrency,依然会接收限流的错误。

结论 brpc的自适应限流,只适合大流量,且流量稳定的场景。不适合小流量的服务。

C++无锁编程之自旋锁(spinlock)的实现

此实例通过c++11实现。

#pragma once

#include 
#include 

class Spinlock {
public:
    Spinlock() : flag(ATOMIC_FLAG_INIT), wait_count(2000) {}

    void lock() {
        int64_t i = 0;
        while(flag.test_and_set(std::memory_order_acquire)) {
            __asm__ ("pause");
            if (++i > wait_count) {
                sched_yield();
                i = 0;
            }
        }
    }

    bool try_lock() {
        if (flag.test_and_set(std::memory_order_acquire)) {
            return false;
        }
        return true;
    }

    void unlock() {
        flag.clear(std::memory_order_release);
    }

private :
    std::atomic_flag flag;
    int32_t wait_count;
};

C++无锁编程之AsyncParallelTask框架

简介
AsyncParallelTask框架,是为了适用于Rank3.0的拆包合包业务场景而设计和实现的异步任务调度框架,且具有良好的通用性和可移植性。

Rank拆包合包的业务场景,即Rank3.0接受到请求后,拆包往下游预估服务分发请求,在接收到返回后合并打分结果,最终把结果返回给上游。

使用AsyncParallelTask框架的优点
拆包合并由框架控制,免去了自己控制拆包后多个子任务的状态管理。
无锁化,没有锁竞争,性能高。
提供超时管理机制,有助于增强系统稳定性。
拥有友好的API,使用简单。
AsyncParallelTask框架可适用的场景举例
需要拆包合包的预估服务,比如Rank模块
搜索引擎的merger模块
其他需要拆包合包的业务场景
设计
设计思想
使用异步IO方式,不会引起线程阻塞,且通过超时控制来避免长尾任务。
通过使用原子变量和原子操作(atomic)来控制计数、状态变化。
支持多线程,且逻辑实现不使用任何种类的锁,使用lockfree数据结构和线程间通信机制来保证线程安全。
通过使用C++11标准的新特性,比如仿函数、参数模板等,来对外提供简洁和更加友好的API。
类域设计
AsyncParallelTask框架,总共包含控制器AsyncTaskController、定时器TimerController、异步并行任务类AsyncParallelTask、分发子任务类AsyncParallelSubTask等4部分组成。

控制器AsyncTaskController
AsyncTaskController是AsyncParallelTask的调度器,提供调度AsyncParallelTask的接口。内部包含线程池、定时器。

当AsyncTaskController调度AsyncParallelTask的运行时,首先把AsyncParallelTask放入线程池中调度,然后启动对AsyncParallelTask的超时监控。

定时器TimerController
TimerController的实现原理
TimerController如何实现定时的原理和libevent并无不同,都使用了Reactor的设计模式。但是TimerController通过生产者、消费者模型,来实现多线程添加定时任务,并保证线程安全。TimerController使用了C++11的新特性,简化了代码实现。

使用最小堆来管理延时任务和周期任务
使用1个timerfd配合epoll来实现定时任务的激活
使用1个eventfd配合epoll来实现定时任务的添加
使用一个LockFree的栈,实现生产者消费者模型。由外部多线程写,TimerController内部1个线程来读。
TimerController内部流程图

异步任务基类AsyncTask
任何继承AsyncTask的类都可供AsyncTaskController进行调度。

AsyncTask中定了一个基本的接口和AsyncTask的状态的转换。

部分代码举例:

class AsyncTask {
public:
enum Status {
UNSCHEDULED,
PROCESSING,
WAIT_CALLBACK,
CALLBACK,
TIMEOUT,
EXCEPTION,
FINISHED
};

AsyncTask() :
    id(0),
    parent_id(0),
    timeout_threshold(0),
    status(UNSCHEDULED) {}
virtual ~AsyncTask() {}
virtual Status process() = 0;
virtual Status timeout() { return TIMEOUT; }
virtual void destroy() {}
virtual void reset() {
    id = 0;
    parent_id = 0;
    timeout_threshold = 0;
    status = UNSCHEDULED;
}

virtual void callback() {}
virtual void callbackExcepiton() {}
virtual void callbackTimeout() {}

…….
private:
int64_t id;
int64_t parent_id;
int32_t timeout_threshold; // millisecond;
std::atomic status;
};
AsyncTask的状态转换图
AsyncTask约束了异步任务的7种状态,和8中状态转换。其中TIMEOUT和EXCEPITON是等效的,为了方便区分两种异常而设置两个状态。

并行任务AsyncParallelTask
AsyncParallelTask内部流程图

并行子任务AsyncParallelSubTask
拆包后分发操作主要在AsyncParallelSubTask中执行。需要创建AsyncParallelSubTask的子类,实现其中分发操作和合并结果的操作。

使用举例
初始化AsyncTaskController
在进程Init中执行,全局单例。设置分发的线程池。

static ThreadPool thread_pool(config.getWorkerThreadNum()); 
auto& task_controller = Singleton::GetInstance();
task_controller.setThreadPool(&thread_pool);

定义AsyncParallelSubTask的子类PredictAsyncParallelSubTask 
主要实现process和mergeResult两个函数,具体参考

https://gitlab.vmic.xyz/iai_common/rank/blob/experiment3/task/predict_async_parallel_subtask.h

https://gitlab.vmic.xyz/iai_common/rank/blob/experiment3/task/predict_async_parallel_subtask.cpp

class PredictAsyncParallelSubTask : public AsyncParallelSubTask {
public:
PredictAsyncParallelSubTask() :
alg_info(nullptr),
context(nullptr),
split_info({0}) {}

virtual ~PredictAsyncParallelSubTask() {}

virtual Status process() {
    if (nullptr == context) {
        throw std::runtime_error("context is nullptr");
    }
    if (nullptr == alg_info) {
        throw std::runtime_error("alg_info is nullptr");
    }
    PredictService::asyncRequestZeusServer(this, *context, *alg_info, split_info);
    return WAIT_CALLBACK;
}

virtual void mergeResult();

virtual void reset() {
    AsyncParallelSubTask::reset();
    alg_info = nullptr;
    split_info = {0};
    context = nullptr;
    temp_res.Clear();
}

void collectResult(const zeus::proto::ZeusResponse& res) {
    auto& zeus_res = const_cast(res);
    temp_res.mutable_item()->Swap(zeus_res.mutable_item());
    temp_res.mutable_model_response()->Swap(zeus_res.mutable_model_response());
}

void setAlgInfo(AlgInfo* alg_info) { this->alg_info = alg_info;};
void setRankContext(RankContext *context) { this->context = context;}
void setSplitInfo(SplitInfo& split_info) { this->split_info = split_info;}

private:
void praseZeusToScoreItem(TargetCode code, double score, ScoreItem *score_item);

AlgInfo* alg_info;
RankContext *context;
SplitInfo split_info;
zeus::proto::ZeusResponse temp_res;

};
创建AsyncParallelTask
具体参考

class PredictRankTask : public AsyncTask {
public:
……
private:
AsyncParallelTask parallel_task;
……
};

……
for (int32_t partition_id = 0; partition_id < partition_count; ++partition_id) {
int64_t total_count = req_item_size;
int64_t offset = split_count * partition_id;
int64_t end = offset + split_count;
end = end > total_count ? total_count : end;
SplitInfo split_info({total_count,
split_count,
partition_count,
partition_id,
offset,
end});

            auto sub_task = std::make_shared();
            sub_task->setAlgInfo(const_cast(&alg_info));
            sub_task->setSplitInfo(split_info);
            sub_task->setRankContext(&getContext());
            parallel_task.addSubTask((std::shared_ptr)sub_task);
        }

……
auto task = this;
parallel_task.setAllDoneCallback([=]() {
task->response();
task->setStatusCallback();
});

    parallel_task.setIncomplateCallback([=]() {
            task->response(Error::E_INCOMPLATE, "some predict server is error!");
            task->setStatusCallback();
            });

    parallel_task.setAllFailCallback([=]() {
            task->response(Error::E_PREDICT_ALL_FAILED, "all predict server is error!");
            task->setStatusCallback();
            });

    int32_t timeout = PredictService::getMaxTimeout(scene_id, sub_alg);
    parallel_task.setTimeoutCallback(timeout, [=]() {
            task->response(Error::E_PREDICT_ALL_TIMEOUT, "all predict server timeout!");
            task->setTimeout();
            });

    auto& task_controller = Singleton::GetInstance();
    parallel_task.setController(&task_controller);
    parallel_task.setId(task_controller.generateUniqueId());
    setStatusWaitCallback(std::memory_order_relaxed);
    task_controller.scheduleImmediately(¶llel_task);

执行调度
task_controller.scheduleImmediately会在当前线程分发拆包到线程池。而task_controller.schedule则会在线程池中选择一个线程分发。

    auto& task_controller = Singleton::GetInstance();
    parallel_task.setController(&task_controller);
    parallel_task.setId(task_controller.generateUniqueId());
    setStatusWaitCallback(std::memory_order_relaxed);
    task_controller.scheduleImmediately(¶llel_task);

编码
源码地址:

测试
压力测试
测试机器为 2017H2-A1-1, 32 core机器

QPS cpu num of items body length session latency P99 session latency AVG
client latency AVG

bandwidth mem remark
300 56% 1W 200KB 43 35 40 3.4 Gb/s 1%
1600 62% 2k 40KB 31 21.6 24 3.64Gb/s 1.1%
稳定性测试
测试方法:
CPU 60%的压力下,持续测试24小时。 

测试结果:
Rank服务可稳定提供服务。无内存泄露。

极限测试
测试过程:

缓慢把CPU压力从30%提升到90%~100%之间,并维持10分钟,然后把cpu压力降低至60%。整个过程中观察Rank稳定性,有无内存泄露。

测试结果:

CPU压力达到90%以上时,Rank内存增长,超时错误日志变多,定时器失准,返回大量超时、错误。 

Cpu压力降低至60%之后,Rank服务恢复正常,内存使用率变小,无内存泄露,超时错误日志不再新的产出。

符合预期。

打分一致性测试
测试方法:
使用rank-diff工具,从passby环境,复制两份流量请求新旧rank生产环境,分别记录打分结果。最后通过python脚本统计打分结果差异。

测试结果:
1000qps,新旧rank打分一致,差异率小于99.9%,满足需求。

产生差异的数据,分为两种。1)为打分近似值,差别0.01以下。 2)打分无效取默认值0.001.

有锁Rank和无锁Rank性能对比
2k条广告时,1600qps,有锁和无锁Rank压力测试性能对比
测试机器  CPU 32 cores,其中QPS、带宽都是相同的。

有锁
无锁
remark 
QPS 1600
相同
CPU 54.1% 63%
session latency AVG 15 21.7
session latency P99 21 31
bandwidth 3.64Gb/s 3.64Gb/s 相同
req body length 40 KB 40 KB 相同
Context Switch

一种压测工具使用的控制流量大小的算法

压力测试工具是软件测试过程中的必用工具,而压力测试工具如何控制流量大小呢?

最常见的是计算每个请求之间的时间间隔,然后通过sleep方法来让两次请求产生间隔。这种方法有2个缺点,sleep时会让线程挂起,所以需要比较多的线程数;其二,当流量非常大的情况,比如qps达到10万以上时,会收到系统时钟精度和线程上下文切换的挑战。

本文设计了另外一种方法,采用了按照概率控制流量大小。但概率的计算并不依赖随机数,而是通过设置一个概率控制变量的方法,让流量的发送更加均衡。

代码如下:

class Transformer {
    public:
        Transformer() :
            send_num(0),
            qps(0),
            benchmark(0),
            counter(0),
            thread_pool(10) {}

        void run();
        void stop() { tc.stop(); }

    private:

        void sendOne();
        void transform();

        int32_t send_num;
        int32_t qps;
        int32_t benchmark;
        std::atomic counter;

        ThreadPool thread_pool;
        TimerController tc;
};
void Transformer::run() {
    qps = FLAGS_qps;
    if (qps <= 0 || qps > 1e5) {
        return;
    }

    int32_t query = (qps << 16) / 1000;
    send_num = query >> 16;
    query &= 0xFFFF;
    for (int i = 0; i < 16; ++i) {
        benchmark <<= 1;
        if (query & 1) {
            ++benchmark;
        }
        query >>= 1;
    }
    tc.cycleProcess(1, [this]() { this->transform(); });
}

void Transformer::transform() {
    uint32_t cur_c = counter.fetch_add(1, std::memory_order_relaxed);
    cur_c &= 0xFFFF;
    if (cur_c <= 0) {
        return;
    }
    int32_t delta = 0;
    for (int i = 0,bit = 1; i < 16; ++i, bit <<= 1) {
        if ((cur_c & bit) == 0) {
            continue;
        }
        if ((benchmark & bit) == 0) {
            break;
        } else {
            delta = 1;
            break;
        }
    }

    int32_t cur_send_num = send_num + delta;
    if (cur_send_num <= 0) {
        return;
    }
    for (int i = 0; i< cur_send_num; ++i) {
        thread_pool.enqueue([this]() { this->sendOne(); });
    }
}

Docker搭建minio server

环境说明

  • Centos 7/8
  • docker/podman
  • minio version RELEASE.2021-07-08T19-43-25Z

搭建步骤

初始化目录

mkdir /minio
mkdir /minio/data
mkdir /minio/config

拉取镜像

docker pull minio/minio

如果是podman,执行:

podman pull minio/minio

启动镜像

ENGINE=podman
MINIO_ROOT=/minio
${ENGINE} run \
    --name minio \
    -p 9005:9005 \
    -p 9006:9006 \
    -e "MINIO_ROOT_USER=XXXXXXXXXXXXXX" \
    -e "MINIO_ROOT_PASSWORD=XXXXXXX/XXXXXX/XXXXXXX" \
    -v ${MINIO_ROOT}/data:/data \
    -v ${MINIO_ROOT}/config:/root/.minio \
    -d \
    minio/minio server /data --console-address ":9006" --address ":9005"

查看启动日志:

docker logs minio

运行成功:

API: http://10.88.0.97:9005  http://127.0.0.1:9005

Console: http://10.88.0.97:9006 http://127.0.0.1:9006

Documentation: https://docs.min.io

总结

和网上的一些文档不同, 新版MINIO参数已经发生了变化。比如:

MINIO_ROOT_USER替代了MINIO_SECRET
MINIO_ROOT_PASSWORD替代了MINIO_SECRET_KEY

另外一处变化则为web管理的地址和API地址已经分离,分别需要参数配置--console-address ":9006"--address ":9005"

附录

C++-双缓冲(DoubleBuffer)的设计与实现

源码如下:

#pragma once

#include 

template
class DoubleBuffer {
    public:
        DoubleBuffer() : cur_index(0) {}

        T& getWorkingBuffer(std::memory_order order = std::memory_order_seq_cst) {
            return buffers[cur_index.load(order)];
        }

        T& getBackupBuffer(std::memory_order order = std::memory_order_seq_cst) {
            return buffers[1 ^ cur_index.load(order)];
        }

        void switchBuffer(std::memory_order order = std::memory_order_seq_cst) {
            cur_index.fetch_xor(1, order);
        }

    private:
        T buffers[2];
        std::atomic cur_index;
};

C++无锁编程-无锁栈的设计与实现

无锁栈实现原理

本文是用过C++11标准下的原子变量atomic的CAS操作compare_exchange_weak,来实现无锁栈(Lock-free Stack)。

  • 通过compare_exchange_weak方法,来实现栈顶元素添加操作的原子性。
  • 栈内元素都会由Node数据结构封装,可以规避CAS操作的ABA问题。

无锁栈实现编码

#ifndef LOCK_FREE_STACK_HPP
#define LOCK_FREE_STACK_HPP

#include 

template
class LockFreeStack {
public:
    struct Node {
        Node() : data(nullptr), pre(nullptr) {}
        void reset() { data = nullptr; pre = nullptr;}

        T *data;
        Node* pre;
    };

    LockFreeStack() : count(0), back(nullptr) {}
    ~LockFreeStack() { clear(); }

    void push(const T* data) {
        Node* node = alloc();
        node->data = const_cast(data);
        for (;;) {
            node->pre = back;
            if (back.compare_exchange_weak(node->pre, node, std::memory_order_seq_cst) ) {
                break;
            } else {
                continue;
            }
        }
        ++count;
    }

    T* pop() {
        for (;;) {
            Node* back_node = back;
            if (nullptr == back_node) { return nullptr; }
            Node* second_node = back_node->pre;
            if(!back.compare_exchange_weak(back_node, second_node, std::memory_order_seq_cst)) {
                continue;
            } else {
                --count;
                T* data = back_node->data;
                recycle(back_node);
                return data;
            }
        }
    }
    bool empty() { return (bool)count; }
    int64_t size() { return count; }
    void clear() { while (nullptr != pop()) {}; }

private:
    Node *alloc() { return (new Node()); }
    void recycle(Node *node) { delete node; }

    std::atomic count;
    std::atomic back;
};
#endif

Linux 常用Yum仓库

EPEL

EPEL的全称叫 Extra Packages for Enterprise Linux 。EPEL是由 Fedora 社区打造,为 RHEL 及衍生发行版如 CentOS、Scientific Linux 等提供高质量软件包的项目。

官方网址为:https://fedoraproject.org/wiki/EPEL

Centos安装命令: sudo yum install epel-release

rpmfusion

RPM Fusion provides software that the Fedora Project or Red Hat doesn’t want to ship. That software is provided as precompiled RPMs for all current Fedora versions and current Red Hat Enterprise Linux or clones versions; you can use the RPM Fusion repositories with tools like yum and PackageKit.

RPM Fusion is a merger of Dribble, Freshrpms, and Livna; our goal is to simplify end-user experience by grouping as much add-on software as possible in a single location. Also see our FoundingPrinciples.

官方网站为 https://rpmfusion.org/

Remi repository

Remi repository 是包含最新版本 PHP、MySQL、Python 包的 Linux 源,由 Remi 提供维护。

官方网站为 http://rpms.remirepo.net/

Enterprise Linux 7 (with EPEL) x86_64 的安装方法:

wget https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
wget https://rpms.remirepo.net/enterprise/remi-release-7.rpm
rpm -Uvh remi-release-7.rpm epel-release-latest-7.noarch.rpm

如何要安装php的7.4版本则执行下面的命令:

yum -y --nogpgcheck install epel-release \
&& yum -y --nogpgcheck install https://rpms.remirepo.net/enterprise/remi-release-7.rpm \
&& yum -y --nogpgcheck install yum-utils \
&& yum-config-manager --enable remi-php74 \
&& yum -y --nogpgcheck install yum install nginx php  php-cli php-fpm php-mysqlnd php-zip php-devel php-gd php-mcrypt php-mbstring php-curl php-xml php-pear php-bcmath php-json

WANDisco

安装新版本的git 库

# 卸载旧版 git
yum -y remove git

# 安装 centos7 WANDisco 仓库
yum install http://opensource.wandisco.com/centos/7/git/x86_64/wandisco-git-release-7-2.noarch.rpm

yum -y install git

podman使用经验

修改镜像容器存储地址(GraphRoot)

podman info # 找到配置文件地址,默认是/etc/containers/storage.conf

store:
  configFile: /etc/containers/storage.conf
  containerStore:

打开或者新建配置文件,修改GraphRoot和runroot地址。

[storage]
# Default Storage Driver
driver = "overlay"

# Temporary storage location
runroot = "/home/podman/containers/temp_storage"

# Primary Read/Write location of container storage
graphroot = "/home/podman/containers/storage"

修改镜像源地址

错误汇总

错误信息

Error processing tar file(exit status 1): there might not be enough IDs available in the namespace (requested 0:42 for /etc/gshadow): lchown /etc/gshadow: invalid argument

机器重启后,服务启动不起来

podman start mariadb

ERRO[0000] Error adding network: failed to allocate for range 0: 10.88.0.127 has been allocated to bb5e67927536b496894013ca3c42221bdfc4f7f72e30878e408bd97d88999e16, duplicate allocation is not allowed
ERRO[0000] Error while adding pod to CNI network "podman": failed to allocate for range 0: 10.88.0.127 has been allocated to bb5e67927536b496894013ca3c42221bdfc4f7f72e30878e408bd97d88999e16, duplicate allocation is not allowed
ERRO[0000] Error preparing container bb5e67927536b496894013ca3c42221bdfc4f7f72e30878e408bd97d88999e16: error configuring network namespace for container bb5e67927536b496894013ca3c42221bdfc4f7f72e30878e408bd97d88999e16: failed to allocate for range 0: 10.88.0.127 has been allocated to bb5e67927536b496894013ca3c42221bdfc4f7f72e30878e408bd97d88999e16, duplicate allocation is not allowed
Error: unable to start container "bb5e67927536b496894013ca3c42221bdfc4f7f72e30878e408bd97d88999e16": failed to mount shm tmpfs "/home/podman/containers/storage/overlay-containers/bb5e67927536b496894013ca3c42221bdfc4f7f72e30878e408bd97d88999e16/userdata/shm": no such file or directory

解决方法:清空网络

echo "" > /var/lib/cni/networks/podman/10.88.0.127

Python环境运维笔记

升级python所有的包

pip install pip-review # 安装第三方包pip-review

pip-review #查看可更新

pip-review --auto #自动批量升级

pip-review --interactive #以交互方式运行,对每个包进行升级

Python更改pip源

创建配置文件

mkdir ~/.pip
cd ~/.pip
vim pip.conf

编辑 pip.conf文件,添加如下内容:

[global]
index-url = https://mirrors.aliyun.com/pypi/simple
[install]
trusted-host=mirrors.aliyun.com

中国国内推荐的pip源如下:

Centos8搭建Mini Server笔记

系统安全设置

配置防火墙

service firewalld start #重启防火墙

firewall-cmd --list-all #查看防火墙规则

firewall-cmd --permanent --add-port=80/tcp #开放80端口,HTTP

firewall-cmd --permanent --add-port=67/udp #开放67端口,DHCP
firewall-cmd --permanent --add-protocol=igmp #支持IGMP协议,路由发现协议
firewall-cmd --reload #重启防火墙

还有另一种方法,设置service,service列表在/usr/lib/firewalld/services目录中

firewall-cmd --permanent --add-service=http
firewall-cmd --permanent --add-service=ssh
firewall-cmd --permanent --add-service=samba
firewall-cmd --permanent --add-service=hdcp
firewall-cmd --permanent --add-service=llmnr

修改防火墙配置文件 /etc/firewalld/zones/public.xml



  Public
  For use in public areas. You do not trust the other computers on networks to not harm your computer. Only selected incoming connections are accepted.
  
  
  
  
  
  
  
  
  
  
  
  

禁止root远程ssh登陆

修改/etc/ssh/sshd_config文件,将其中的PermitRootLogin改成no,

然后重新启动ssh服务/bin/systemctl restart sshd.service

系统设置

挂载的数据硬盘设置自动休眠

  • hdparm -B 127 /dev/sdb1
  • hdparm -S 60 /dev/sdb1

挂载的数据硬盘设置自动休眠(二)

在crontab中添加定时任务,每隔15分钟,关闭磁盘

*/15 * * * * (sdparm --command=stop /dev/sdc >> /tmp/sdparm.log)

设置 supervisord 开机启动

systemctl enable supervisord.service

设置用户 i 可以无密码获得sudo权限

添加 i ALL=(ALL) NOPASSWD: ALL 到 /etc/sudoers

centos 配置无线网卡

参考 https://www.cnblogs.com/asker009/p/10212045.html

安装配置rsyslog

  • sudo yum install rsyslog

查看CPU温度、硬盘温度

  • yum install lm_sensors hddtemp
  • sensors
  • hddtemp /dev/sdb

更新系统时间

yum install -y ntpdate

ntpdate ntp1.aliyun.com

hwclock --systohc

可选的时间服务器 www.pool.ntp.org/zone/cn

cn.ntp.org.cn #中国
edu.ntp.org.cn #中国教育网
ntp1.aliyun.com #阿里云
ntp2.aliyun.com #阿里云
cn.pool.ntp.org #最常用的国内NTP服务器

优化

创建swap文件

sudo -i
dd if=/dev/zero of=/opt/swapfile bs=1024 count=4194304
chown root:root /opt/swapfile
chmod 0600 swapfile
mkswap /opt/swapfile
swapon /opt/swapfile
echo "/opt/swapfile   none  swap  sw    0 0" >> /etc/fstab
swapon -s

卸载订阅管理器 rhsmcertd服务

rhsmcertd服务,即Red Hat Subscription Manager CERTification Daemon

systemctl stop rhsmcertd.service # 停止订阅服务
systemctl disable rhsmcertd.service # 取消订阅服务开机启动
yum remove subscription-manager

问题汇总

USB安装centos8,无法正常启动安装程序

解决方法: 刻录时,把UltraISO的写入方式改为为RAW

Linux wget遭遇证书不可信(Wget error: ERROR: The certificate of is not trusted.)

解决方法:
安装ca-certificates,并同步系统时间为最新。

  • yum install -y ca-certificates
  • ntpdate -u 0.north-america.pool.ntp.org

Error "curl: (60) Peer certificate cannot be authenticated with known CA certificates"

同上一个问题的解决方法

无法 mount ntfs磁盘

  • yum install epel-release
  • yum install ntfs-3g

umount: /mnt: target is busy.

  • 执行sync把buffer的数据写入磁盘
  • ls /mnt 查看占用/mnt的进程,并kill掉
  • 重新执行 umount /mnt

根目录变为只读,只能root登陆

重新挂载根目录, mount -o remount, rw /
一般为/etc/fstab 配置错误导致。

使用Python把数据库中数据导出到CSV文件

需要安装两个库pymysql和csv

pip install pymysql csv

代码示例如下:

import pymysql
import csv

import sys
reload(sys)
sys.setdefaultencoding('utf8')

def from_mysql_get_all_info():
    conn = pymysql.connect(
            host='mysql.dba.zuocheng.net',
            port=13306,
            user='root',
            db='test_db',
            password='test_pw',
            charset='utf8')
    cursor = conn.cursor()
    sql = "select * from test_table"
    cursor.execute(sql)
    data = cursor.fetchall()
    conn.close()
    return data

def write_csv():
    data = from_mysql_get_all_info()
    filename = 'data_full.csv'
    with open(filename,mode='w') as f:
        write = csv.writer(f,dialect='excel')
        for item in data:
            write.writerow(item)

write_csv()

使用C++11的特性来设计和实现API友好的高精度定时器TimerController

为什么设计和实现TimerController?

最新的TimerController代码保存在Github上面:https://github.com/zuocheng-liu/StemCell ,包含timer_controller.h 和 timer_controller.cpp两个文件,欢迎审阅!

因为软件设计中面临了一些实际问题

尤其在使用C++开发网络应用时常遇到下面的问题:

一、软件设计中,不会缺少通过使用定时器的来实现的场景,比如超时控制、定时任务、周期任务。

二、C/C++标准库中只有最原始的定时接口。没有提供功能完备的库来满足上面提到复杂场景。

三、第三方库中的定时器,往往存在一些问题,比如:

  • libevent、libev、libue 不是线程安全的,在多线程系统中,为了保证线程安全需要额外再进行封装。
  • redis的异步库libae对延时时间的处理是不准确的。

以上问题会让开发者开发新系统时带来一些困扰,但C++11新特性的出现,带来了解决上面问题的新思路。

C++11的新特性让定时器的实现更简单友好

TimerController接口更友好

接口参数支持C++11的lamaba表达式,让定时器的接口对开发人员更加友好。

代码更精简

TimerController的代码总计300~400行,而且功能完备,代码健壮。
C++11的新特性的使用,让代码更简洁,增强代码的可读性、可维护性。

保证线程安全

线程安全,是绕不开的问题。第三方库libevent等,在多线程环境中使用总是危险的。TimerController在设计之初就保证多线程环境下运行的安全性。

没有第三方依赖。

TimerController,没有依赖任何第三方库,完全依靠C/C++标准库和C++11的新特性来实现。

TimerController 接口设计

class TimerController {
    bool init(); // 初始化资源,并启动定时器
    void stop(); // 停止定时器,所有定时任务失效

    // 定时运行任务,参数delay_time单位是毫秒。后面参数是lamba表达式
    template<class F, class... Args>
    void delayProcess(uint32_t delay_time, F&& f, Args&&... args);

    // 周期运行任务,参数interval单位是毫秒,后面参数是lamba表达式。
    template<class F, class... Args>
    void  cycleProcess(uint32_t interval, F&& f, Args&&... args);
}

用一个实例来讲解TimerController的使用方法:

#include <iostream>
#include "timer_controller.h"
using namespace std;
using namespace StemCell;
int main() {
    srand((unsigned)time(NULL));
    try {
        TimerController tc;
        tc.init(); // 初始化 TimerController
        tc.cycleProcess(500, [=]() { cout << "cycle 0.5 sec" << endl; });
        for (int i = 0; i < 80; ++i) {
            // 随机产生80个延时任务,并延时执行
            auto seed = rand() % 8000;
            tc.delayProcess(seed + 1, [=]() { cout << "delay:" << seed << endl; });
        }
        sleep(8);  // 主线程睡眠8秒,让延时任务得以执行
        tc.stop(); // 停止 TimerController
        cout << "tc stoped!" << endl;
    } catch (exception& e) {
        cout << "error:" << e.what();
    }
    return 0;
}

TimerController 实现原理

TimerController如何实现定时的原理和libevent并无不同,都使用了Reactor的设计模式。但是TimerController通过生产者、消费者模型,来实现多线程添加定时任务,并保证线程安全。TimerController使用了C++11的新特性,简化了代码实现。

  • 使用最小堆来管理延时任务和周期任务
  • 使用1个timerfd配合epoll来实现定时任务的激活
  • 使用1个eventfd配合epoll来实现定时任务的添加
  • 使用一个线程安全的队列,实现生产者消费者模型。TimerController使用场景为多线程写,TimerController内部1个线程来读。

TimerController 高级用法

高延时的任务的处理

TimerController内部只有1个线程在执行定时任务。当高延时的任务增多时,可能会影响到任务运行的调度时间,高延时的任务需要在放入新的线程中运行。示例如下:

TimerController tc;
tc.init(); // 初始化 TimerController
// 把任务放入新线程或线程池中
tc.delayProcess(50, []() { std::thread([]() { do_long_time_task();}) });

TimerController保持全局单例

为了系统简洁,TimerController全局单例即可。
auto& tc = Singleton< TimerController >::GetInstance();

其他

如何避免CPU负载高时,定时器失准的问题?

TimerController 有待改进的点

  • 无锁化,目前使用了自旋锁在保证task队列的线程间互斥,后续可使用无锁队列替代有锁队列。
  • TimerController精度目前只有1毫秒,主要因为博主做网络开发都是毫秒级的,后续可以让TimerController支持更小的精度。
  • TimerController 使用了epoll、timerfd、eventfd等,只能在linux平台上面使用

源码地址

具体实现在 timer_controller.h 和 timer_controller.cpp两个文件里面。

使用docker搭建C/C++ 开发环境

优点和意义

  • 环境隔离、资源共享、节省机器资源
  • 轻量虚拟机,启动和运行迅速

使用 Dockerfile 构造镜像

Dockerfile 内容

FROM centos:centos7.4.1708

MAINTAINER Zuocheng Liu 

RUN yum -y --nogpgcheck install gcc gcc-c++ kernel-devel make cmake  libstdc++-devel libstdc++-static glibc-devel glibc-headers \
&& yum -y --nogpgcheck install openssl-devel gperftools-libs \
&& yum -y --nogpgcheck install psmisc openssh-server sudo epel-release \
&& yum -y --nogpgcheck install vim git ctags \
&& mkdir /var/run/sshd \
&& echo "root:123456" | chpasswd \
&& sed -ri 's/^#PermitRootLogin\s+.*/PermitRootLogin yes/' /etc/ssh/sshd_config \
&& sed -ri 's/UsePAM yes/#UsePAM yes/g' /etc/ssh/sshd_config \
&& ssh-keygen -t rsa -f /etc/ssh/ssh_host_rsa_key \
&& ssh-keygen -t dsa -f /etc/ssh/ssh_host_dsa_key

EXPOSE 22

CMD ["/usr/sbin/sshd", "-D"]

创建镜像

docker build -t cpp_dev:last .

创建容器

初始化容器, 宿主机调用脚本

  • 挂载容器
  • 调用容器内初始化脚本mount_in_docker.sh
#!/bin/bash

NEWUSER=$1
PORT=$2
PATH=$(dirname $(readlink -f "$0"))
echo "add User ${NEWUSER} sshd Port ${PORT}"
if [ ! -d /data/rtrs/${NEWUSER} ]; then
    /usr/bin/mkdir /data/rtrs/${NEWUSER}
fi
/usr/bin/cp /data/liuzuocheng/.profile /data/${NEWUSER}/.profile
/usr/bin/docker run -itd --name ${NEWUSER} --net=host -v /data/${NEWUSER}:/home/${NEWUSER} -v /data:/data rtrs/dev_cpp:centos7.4.1708
/usr/bin/docker exec -i ${NEWUSER} sh ${PATH}/mount_in_docker.sh ${NEWUSER} ${PORT} ${UID}

镜像内调用脚本

  • 添加用户
  • 启动sshd服务
NEWUSER=$1
PORT=$2
NEWUID=$3
echo "add User ${NEWUSER} sshd Port ${PORT}"
/usr/sbin/useradd -r -u ${NEWUID} -d /home/${NEWUSER} ${NEWUSER}
echo "${NEWUSER}:123456" | /usr/sbin/chpasswd
echo "Port ${PORT}" >> /etc/ssh/sshd_config
/usr/sbin/sshd

其他问题汇总

宿主机docker的目录迁移

docker的默认目录在/var/lib/docker 下面, 往往/var的磁盘比较小,建议把docker的目录改为大磁盘

把用户加入sudoer列表,sudo 执行命令无需密码

修改/etc/sudoers文件, 添加一行 user-name ALL=(ALL) NOPASSWD: ALL

更多问题可参考《Docker使用经验》

现实过程中使用docker搭建C/C++ 开发环境的工程意义

  • 新人入职代码培训,降低学习成本
  • 推动公司平台化建设

Docker使用经验

安装docker

ubuntu 环境

sudo apt-get install docker.io
sudo systemctl start docker

centos环境

sudo yum install docker
sudo service docker start

常用命令

启动docker

sudo docker run --net=host -v /home/zuocheng:/zuocheng -it zuocheng/dev_cpp /bin/bash

创建docker镜像

docker build -t zuocheng/dev_cpp .

挂载镜像

docker run -it --name dev_cpp -p 10022:22 zuocheng/dev_cpp

要免 sudo 调用 docker 命令方法, 创建docker用户组,并把当前用户添加到docker 用户组中。

sudo groupadd docker #创建 docker 用户组

sudo usermod -aG docker ${USER} # 当前用户加入 docker 用户组

sudo systemctl restart docker # 重启 docker 服务

newgrp - docker # 使设置在当前会话生效

使用docker搭建C++开发环境

参考博文《使用docker搭建C/C++ 开发环境》

更改docker的存储目录,避免默认目录磁盘不足的问题

vim /usr/lib/systemd/system/docker.service # 修改docker.service文件.

ExecStart=/usr/bin/dockerd --graph /data/docker # 在里面的EXECStart的后面

重启docker

systemctl disable docker
systemctl enable docker
systemctl daemon-reload
systemctl start docker

搭建 docker registry(私有仓库)

添加insecure-registries配置

修改/etc/docker/daemon.json,添加insecure-registries配置.把本地ip和端口添加进去。

    "insecure-registries": [
        "10.10.10.10:5000"
    ]
}

重启docker服务

sudo systemctl restart docker

下载 registry 镜像

docker pull registry

启动registry镜像的容器

docker run -d -p 5000:5000 -v /myregistry:/var/lib/registry registry

上传镜像

给本地镜像打tag

docker tag dev_cpp:latest 10.10.10.10:5000/dev_cpp:latest

上传镜像
docker push 10.10.10.10:5000/dev_cpp:latest

向量检索-大规模向量检索引擎

梗概

近期在团队内分享之前在阿里巴巴达摩院工作时做过的项目。在这次分享中,简单介绍了向量检索的原理,以及在大规模数据下如何构建一个高性能、低延迟的向量检索引擎。

更详细的信息在PPT 《向量检索-大规模向量检索引擎》里。

脑图

向量检索-大规模向量检索引擎

系统架构图和离线数据流图

大规模向量检索引擎-架构图

其他资料

使用C语言实现的最简单的HTTP服务器

此段代码的特点

  • 功能简单,无论请求是什么,应答只返回<h1>Hello!</h1>
  • 代码量小,总计68行。
  • 可编译、可运行。

如何编译运行?

编译: gcc -o hello_server hello_server.c

运行: ./hello_server

请求: curl http://localhost:8888/any

源文件 hello_server.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>

#define PORT 8888
#define BUFFER_SIZE 4096
#define RESPONSE_HEADER "HTTP/1.1 200 OK\r\nConnection: close\r\nAccept-Ranges: bytes\r\nContent-Type: text/html\r\n\r\n"
#define RESPONSE_BODY "<h1>Hello!</h1>"

int handle(int conn){
    int len = 0;
    char buffer[BUFFER_SIZE];
    char *pos = buffer;
    bzero(buffer, BUFFER_SIZE);
    len = recv(conn, buffer, BUFFER_SIZE, 0);
    if (len <= 0 ) {
        printf ("recv error");
        return -1;
    } else {
        printf("Debug request:\n--------------\n%s\n\n",buffer);
    }

    send(conn, RESPONSE_HEADER RESPONSE_BODY, sizeof(RESPONSE_HEADER RESPONSE_BODY), 0);
    close(conn);//关闭连接
}

int main(int argc,char *argv[]){
    int port = PORT;
    struct sockaddr_in client_sockaddr;     
    struct sockaddr_in server_sockaddr;
    int listenfd = socket(AF_INET,SOCK_STREAM,0);
    int opt = 1; 
    int conn;
    socklen_t length = sizeof(struct sockaddr_in);
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(port);
    server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    if(bind(listenfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1){
        printf("bind error!\n");
        return -1;   
    }  

    if(listen(listenfd, 10) < 0) {
        printf("listen failed!\n");
        return -1;   
    }

    while(1){
        conn = accept(listenfd, (struct sockaddr*)&client_sockaddr, &length);
        if(conn < 0){
            printf("connect error!\n");
            continue;
        }
        if (handle(conn) < 0) {
            printf("connect error!\n");
            close(conn);
            continue;
        }  
    }
    return 0;
}

后记

为什么要写这篇博文?

原因是,在使用公司里的自动化平台部署c++服务时,拿这个简单的示例来测试平台是否有问题。俗称趟一遍坑儿。

在网上也搜索了很多不少博文,发现里面的代码有几个问题,第一个问题就是编译不过,第二个问题则是有的代码应答必须要有文件,这对我的测试也造成了些许麻烦。

所以就自己参考别人的列子,在自己的博客里写一个简单的吧。以后再去趟别的自动化部署系统的坑的时候,顺手就能拿来用。

sed 使用经验

概述

sed 是处理文本时经常用到的工具,可实现对文本的输出,删除,替换,复制,剪切,导入,导出等各种操作。

本文依据作者的自身经验,记录sed的常用的使用方法。本文虽然不能涵盖sed的所有功能,但本文记录的sed用法能够解决了作者日常工作中 90%以上的问题。希望通过本文的分享,能让读者从sed的字数繁多的说明文档中解救出来,减小学习sed的成本。

sed 常用用法

sed -i 's/regular_expression/replacement/g' file.txt

这个命令格式是最常用的用法,在工作中使用sed场景中出现的频率最高。

对参数和各部分说明:

-i 表示修改文件,如果不带此参数,则将匹配的结果打印到屏幕。

s 表示替换字符串

regular_expression 正则表达式

replacement 对正则表达式匹配的目标,进行替换。如果replacement为空

g 对文件全局做替换

举例:

[www@it.zuocheng.net]$ sed -i 's/[a-z]*/+++/g' file.txt

sed 注意事项

在sed的正则表达式中, 有一些需要转移的特殊字符,比如:

+、(、),这些字符如果是正则表达式的语法中使用的字符,则需要进行转移;如果是文本中需要匹配的字符,则不需要转义。

高级用法

& 变量的使用

& 变量用在replacement部分,表示regular_expression匹配到的文本。

[www@it.zuocheng.net]$ echo "hello world"| sed 's/[a-z]*/(&)/g'
(hello) (world)

正则匹配 \1 – \9 变量

使用sed提取正则匹配的各个部分。

正则匹配的变量,主要用在replacement部分。表示regular_expression的子变量。

可用于截取字符串

[www@it.zuocheng.net]$ echo "hello123 world456"| sed 's/\([a-z]\+\)[0-9]\+/\1/g'
hello world

图搜引擎服务器磁盘IO性能测试方案

背景和意义

目前团队对图搜业务使用的服务器的磁盘IO性能没有量化的数据,不能准确把握服务器的磁盘IO性能。

同机型机器的磁盘IO性能表现并不相同,在磁盘IO性能表现差的机器,往往存在故障隐患。

因为不能准确把握服务器的IO性能,对图搜引擎能否支持多少业务量时,不能做出准确评估。在运维团队系统的过程中,会存在不可预知的风险,最终导致系统故障。

机器预算收紧,把索引全部加载到内存的方案暂时无法推行,在图搜引擎检索过程中引依然有磁盘IO操作。对磁盘IO测试仍有重要意义。

测试目标

  • 测试现有机型机器的磁盘IO性能(日常、极限)。
  • 对测试新型存储介质的性能,建立测试方法。
  • 针对图搜业务,建立服务器磁盘IO性能基线标准。

有了磁盘IO性能基线标准,有如下好处:

  • 在系统扩容的过程中,对加入集群的新机器,做磁盘IO性能准入测试,降低线上环境的硬件故障发生率。
  • 以后引入新型服务器,在评估其性能时,做对比和参照。
  • 在搭建新的图搜服务,预估系统可承受的最大业务量时,可以做为参考依据。

测试方法

测试方法分析

图搜搜索引擎磁盘IO读写特点

对于图搜搜索引擎,主要有三种IO操作,分别是随机读、顺序写和顺序读。

  • 随机读

图搜引擎在提供线上服务时,磁盘IO操作主要来自于读取磁盘中存储的正排字段, 随机存储于正排索引文件当中。

  • 顺序写和顺序读

顺序写和顺序读,分别为索引同步和索引加载这两个过程中的磁盘IO读写特点,频率为1天各执行1次。

索引文件打开方式主要为mmap

测试注意点

针对图搜引擎操作磁盘时的IO特点,在做磁盘IO性能测试时需要注意下面几点:

  • 测试随机读、顺序写和顺序读三种读写情况,重点是测试随机读的磁盘IO性能。

  • 随机读测试中,文件大小要到500G,并做96路并发测试。每次读写块的大小为4k

  • 要使用mmap方式打开文件。

  • 在测试的过程中,要排除linux cache机制的影响,只对磁盘IO性能进行测试。

测试指标

重点关注在3种磁盘读写方式(随机读、顺序写和顺序读)下的IO指标:

名称备注
BW平均IO带宽
IOPSInput/Output Operations Per Secondlatency响应时间complete latency完成延迟
CPU CPU使用率

测试工具

FIO 2.1.10
top/vmstat

测试步骤

测试步骤注意要点

  • 相同的测试必须测试3遍,并记录下每次的测试结果数据。
  • 每组性能测试的时间不能少于60秒。
  • 测试前检查服务器上的应用进程,关闭无关进程,减少无关影响。

具体测试步骤

以测试服务器 x.x.x.x 的磁盘IO性能为例:

  • 关闭所有无关进程。
  • 下载编译和安装FIO。
  • 编写fio job文件
  • 此fio job文件,包含3个job,分别是随机读、顺序读、顺序写。
  • 执行测试命令
    ./fio fio_jobs.ini

测试结果如下:

  • 解读并记录测试结果。
  • 测试结果分析方法
  • 对照手册,解读FIO的测试结果。

    如何建立图搜引擎服务器磁盘IO性能基线标准

对目前拍立淘图搜引擎使用的机型,依据本文的方法做性能测试。并统计磁盘IO性能测试结果。

对各个机器磁盘IO性能测试结果中,剔除误差数据,取中位统计结果做平均值为IO性能基线标准。

参考资料

互联网公司里有哪些不好的潜规则

喜欢做0到1,而不喜欢做从1到100的事情

这个现象非常普遍,做从无到有的项目的工程师,往往得到公司的资源倾斜,获得更多的晋升机会;与之相对的,如果是接收老的项目的工程师,承担了先前项目中各种不稳定的风险,产出没有从无到有做项目多,技术进步慢,晋升机会少。

深谙此规则的工程师,在职场上善于利用这个规律,专挑0到1的项目去做,做完后把项目交给其他工程师去维护,自己又挑别的新项目做。打个比喻,就是对项目管生不管养。这类工程师在职场上的表现往往有1个特点,就是非常喜欢挑活儿干。

这个问题的出现,往往是绩效考核和管理层价值取向有问题导致。

汉明距离之算法和实现总结

内容简介

汉明距离,通过比较向量每一位是否相同,求出不同位的个数。用来表示两个向量之间的相似度。

汉明距离计算的步骤,即对两个向量首先进行异或操作,然后对异或的结果的每一位bit进行统计,最后合计出有多少bit的值为1。

本文主要内容为,列举出9种计算汉明距离的算法及其C++代码的实现,并使用本文的测试方法测试得出不同算法的性能。再对不同的算法进行分析比较。

计算机在进行异或操作中,CPU的指令集可以提供多种实现。比如cpu固有指令 xor 和 SSE指令集 、AVX指令集。后两种指令集都是为了提升CPU对向量的处理速度而扩展的指令集。

汉明距离计算的后一步,计算一个变量中所有bit的值为1的个数。可以使用多种算法和实现方式来实现。比如算法上,逐位移位统计、查表法、分治法等;实现方式上,可以使用前面所说的算法,也可以使用cpu的指令popcnt直接求得。

实现算法

本文算法计算的向量为二值向量;

本文中包含算法函数的Algorithm类声明如下,不同计算汉明距离的算法的类都继承这个基类,计算汉明距离由成员函数 uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size)实现,函数输入为3个参数,其中p和q分别为两个向量的指针;size为向量维度的大小,并以64为倍数; 向量最小为64个bit。

class Algorithm {
  public:
    ~Algorithm() {}
    virtual void init() {}
    virtual std::string getName() {
      return "Undefined Algorithm Name.";
    }
    virtual uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) = 0;
};

一般算法

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i) {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        while (r) {
          res += r & 1;
          r = r >> 1;
        }
      }
      return res;
    }

使用gcc内建函数优化一般算法

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i) {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += __builtin_popcountll(r);
      }
      return res;
    }

查表法-按8bit查询

class HammingDistanceTable8Bit : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceTable8Bit";
    }
    void init() {
      pop_count_table_ptr = NULL;
      pop_count_table_8bit_init(&pop_count_table_ptr);
    }
    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_table_8bit(r);
      }
      return res;
    }
  private:
    uint8_t *pop_count_table_ptr; 
    void pop_count_table_8bit_init(uint8_t **pop_count_table_ptr) {
      *pop_count_table_ptr = new uint8_t[256];
      for (int i = 0; i < 256; ++i) {
        (*pop_count_table_ptr)[i] = __builtin_popcount(i);
      }  
    }

    uint64_t pop_count_table_8bit(uint64_t n) {
      int res = 0;
      uint8_t *p = (uint8_t *)&n;
      for (int i = 0; i < 8; ++i) {
        res += pop_count_table_ptr[*(p + i)];
      }
      return res;
    }
};

查表法-按16bit查询

class HammingDistanceTable16Bit : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceTable16Bit";
    }
    void init() {
      pop_count_table_ptr = NULL;
      pop_count_table_16bit_init(&pop_count_table_ptr);
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_table_16bit(r);
      }
      return res;
    }
  private:
    uint8_t *pop_count_table_ptr; 

    void pop_count_table_16bit_init(uint8_t **pop_count_table_ptr) {
      *pop_count_table_ptr = new uint8_t[65536];
      for (int i = 0; i < 65536; ++i) {
        (*pop_count_table_ptr)[i] = __builtin_popcount(i);
      }  
    }

    uint64_t pop_count_table_16bit(uint64_t n) {
      int res = 0;
      uint16_t *p = (uint16_t *)&n;
      for (int i = 0; i < 4; ++i) {
        res += pop_count_table_ptr[*(p + i)];
      }
      return res;
    }
};

分治法

class HammingDistanceDivideConquer : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceDivideConquer";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_divide_conquer(r);
      }
      return res;
    }

    uint64_t pop_count_divide_conquer(uint64_t n) {
      n = (n & 0x5555555555555555) + ((n >> 1 ) & 0x5555555555555555); 
      n = (n & 0x3333333333333333) + ((n >> 2 ) & 0x3333333333333333);
      n = (n & 0x0F0F0F0F0F0F0F0F) + ((n >> 4 ) & 0x0F0F0F0F0F0F0F0F);
      n = (n & 0x00FF00FF00FF00FF) + ((n >> 8 ) & 0x00FF00FF00FF00FF);
      n = (n & 0x0000FFFF0000FFFF) + ((n >> 16) & 0x0000FFFF0000FFFF);
      n = (n & 0x00000000FFFFFFFF) + ((n >> 32) & 0x00000000FFFFFFFF);
      return n;
    }
  private:

};

改进分治法

class HammingDistanceDivideConquerOpt : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceDivideConquerOpt";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_divide_conquer_opt(r);
      }
      return res;
    }

    uint64_t pop_count_divide_conquer_opt(uint64_t n) {
      n = n - ((n >> 1)  & 0x5555555555555555); 
      n = (n & 0x3333333333333333) + ((n >> 2 ) & 0x3333333333333333);
      n = (n + (n >> 4 )) & 0x0F0F0F0F0F0F0F0F;
      n = n + (n >> 8 );
      n = n + (n >> 16);
      n = n + (n >> 32);
      return (uint64_t)(n & 0x7F);
    }
  private:

};

使用SSE指令集

class HammingDistanceSSE : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceSSE";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[2] = {0, 0};
      for (uint64_t i = 0; i < size; i += 2) { 
        __m64 *x1 = (__m64*)(p);
        __m64 *x2 = (__m64*)(p + 1);
        __m64 *y1 = (__m64*)(q);
        __m64 *y2 = (__m64*)(q + 1);
        __m128i z1 = _mm_set_epi64 (*x1, *x2);
        __m128i z2 = _mm_set_epi64 (*y1, *y2);
        __m128i xor_res =  _mm_xor_si128(z1 , z2);
        _mm_store_si128((__m128i*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
      }
      return res;
    }
};

使用AVX2指令集

class HammingDistanceAVX : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceAVX";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[4] = {0, 0, 0, 0};
      for (uint64_t i = 0; i < size - 1; i += 4) { 
        long long int *x1 = (long long int*)(p + i);
        long long int *x2 = (long long int*)(p + i + 1);
        long long int *x3 = (long long int*)(p + i + 2);
        long long int *x4 = (long long int*)(p + i + 3);
        long long int *y1 = (long long int*)(q + i);
        long long int *y2 = (long long int*)(q + i + 1);
        long long int *y3 = (long long int*)(q + i + 2);
        long long int *y4 = (long long int*)(q + i + 3);
        __m256i z1 = _mm256_set_epi64x (*x1, *x2, *x3, *x4);
        __m256i z2 = _mm256_set_epi64x (*y1, *y2, *y3, *y4);
        __m256i xor_res =  _mm256_xor_si256(z1 , z2);
        _mm256_store_si256((__m256i*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
        res += _mm_popcnt_u64(temp_res[2]);
        res += _mm_popcnt_u64(temp_res[3]);
      }
      return res;
    }
};

使用AVX512指令集

截止本文完成时,市场上支持avx-512指令的cpu并没有普及,但是gcc已经提供了avx-512的c/c++ 语言接口。本文先把代码实现,后续购得支持avx-512指令的cpu后,再进行测试。

class HammingDistanceAVX : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceAVX";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[8] = {0, 0, 0, 0, 0, 0, 0, 0};
      for (uint64_t i = 0; i < size - 1; i += 8) { 
        long long int *x1 = (long long int*)(p + i);
        long long int *x2 = (long long int*)(p + i + 1);
        long long int *x3 = (long long int*)(p + i + 2);
        long long int *x4 = (long long int*)(p + i + 3);
        long long int *x5 = (long long int*)(p + i + 4);
        long long int *x6 = (long long int*)(p + i + 5);
        long long int *x7 = (long long int*)(p + i + 6);
        long long int *x8 = (long long int*)(p + i + 7);
        long long int *y1 = (long long int*)(q + i);
        long long int *y2 = (long long int*)(q + i + 1);
        long long int *y3 = (long long int*)(q + i + 2);
        long long int *y4 = (long long int*)(q + i + 3);
        long long int *y5 = (long long int*)(q + i + 4);
        long long int *y6 = (long long int*)(q + i + 5);
        long long int *y7 = (long long int*)(q + i + 6);
        long long int *y8 = (long long int*)(q + i + 7);
        __m512i z1 = _mm512_set_epi64x (*x1, *x2, *x3, *x4, *x5, *x6, *x7, *x8);
        __m512i z2 = _mm512_set_epi64x (*y1, *y2, *y3, *y4);
        __m512i xor_res =  _mm512_xor_si512(z1 , z2);
        _mm512_store_si512((void*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
        res += _mm_popcnt_u64(temp_res[2]);
        res += _mm_popcnt_u64(temp_res[3]);
        res += _mm_popcnt_u64(temp_res[4]);
        res += _mm_popcnt_u64(temp_res[5]);
        res += _mm_popcnt_u64(temp_res[6]);
        res += _mm_popcnt_u64(temp_res[7]);
      }
      return res;
    }
};

性能测试

测试代码框架类:

class AlgorithmBench {
  public:
    void init() {
      startTimer();
      vector = new uint64_t[size];
      ifstream f("sample.txt", ios::in);
      for(int i = 0; i < size;i++ ) {
        uint64_t c;
        f >> vector[i];
      }
      stopTimer();
      getInterval("load sample cost:");
      f.close();
    }
    void setSize(uint64_t size) { this->size = size;};
    void push_back(Algorithm* algorithm) { _algorithm_vector.push_back(algorithm);}

    void start() {
      for (std::vector<Algorithm*>::iterator iter = _algorithm_vector.begin();
          iter != _algorithm_vector.end();
          ++iter) {
        Algorithm* ptr = *iter;
        ptr->init();
        startTimer();
        for (int i = 0; i < size - 3; ++i) {
          ptr->cal(vector + i, vector + i + 1, 4);
        }
        stopTimer();
        getInterval(ptr->getName() + " cost:");
      }
    }

    void startTimer() {
      gettimeofday(&tv,NULL);
      start_timer = 1000000 * tv.tv_sec + tv.tv_usec;
    }
    void stopTimer() {
      gettimeofday(&tv,NULL);
      end_timer = 1000000 * tv.tv_sec + tv.tv_usec;
    }
    void getInterval(std::string prefix) {
      std::cout<<std::left<<setw(40) << prefix 
        << std::right << end_timer - start_timer<<endl;
    }

  private:
    uint64_t size;
    uint64_t *vector;
    timeval tv;
    uint64_t start_timer;
    uint64_t end_timer;
    std::vector<Algorithm*> _algorithm_vector;
};

编译指令:

g++ -msse4.2 -mavx2 -O2 -o test_hamming hamming_distance.cpp

windows-gcc 测试结果

测试环境:

Windows 7
gcc version 7.4.0

HammingDistanceBase cost:               330066
HammingDistanceBuildin cost:            326566
HammingDistanceTable8Bit cost:          2381976
HammingDistanceTable16Bit cost:         1435287
HammingDistanceDivideConquer cost:      1215243
HammingDistanceDivideConquerOpt cost:   1226745
HammingDistanceSSE cost:                972695
HammingDistanceAVX cost:                680636

Linux-gcc 测试结果

测试环境:

Ubuntu Server 19.04
gcc version 8.3.0

测试结果:

load sample cost:                       78070
HammingDistanceBase cost:               145393
HammingDistanceBuildin cost:            75905
HammingDistanceTable8Bit cost:          598789
HammingDistanceTable16Bit cost:         142502
HammingDistanceDivideConquer cost:      343414
HammingDistanceDivideConquerOpt cost:   316748
HammingDistanceSSE cost:                59322
HammingDistanceAVX cost:                115784

总结

不同平台,不同的编译器版本,测试的结果有所差异。但整体表现上使用SSE指令集的性能最好,其次是使用内建函数计算popcnt性能最优。AVX指令性能略好于SSE。性能最差的是查表法8bit。分治法性能居中。

附录

所有代码都存放在github上面: https://github.com/zuocheng-liu/code-samples/blob/master/algorithm/hamming_distance.cpp

后台服务底层网络通信框架设计方案推演

内容梗概

  • 从简单到复杂,推演后台服务的底层通信框架的进化过程,包括网络IO模型、多线程模型的选择和组合。
  • 推演的最基础模型是IO同步阻塞+单线程模型,然后逐步进化。推动网络框架进化的3个因素是,每秒请求量的增大,并发量的增大,逻辑计算量的增大。
  • 本文使用的底层通信协议以TCP为基础,因此所有推演方案不考虑适用于UDP的信号驱动模型和适用于于文件操作的异步IO模型。
  • 只考虑同步阻塞、同步非阻塞、IO多路复用。
  • 本文不讨论多进程和多线程的区别,在提高cpu利用率上,这两个模型的作用是一致的,因此本文只选择多线程模型进行讨论。
  • 本文主要讨论单机网络通信框架的设计演化,不考虑分布式场景下。本文各项性能对比指标也仅指单台服务器的性能。

推演之前,需要一些系统相关数据支撑。

现有的硬件条件,2017年:

  • 单台服务器处理网络连接请求数,每秒约10W量级。
  • 单机并发处理网络长连接数上限在10W左右(C100K)。
  • CPU 单核,3000MHz,一台服务器以24核计,单核每秒亿次运算。
  • 抛离单次请求的网络处理过程,单核每秒处理每个请求的业务计算量在0-10000W之间。

最简单的网络通信模型, 同步IO阻塞+单线程

此模型是我们推演的基础模型。

适用场景,最简单的网络请求、处理和返回。每秒处理请求量低,并发处理请求数只有1,计算量小。系统硬件、网络IO都不会构成瓶颈。

实例,各类demo、模拟后台服务的测试服务、大型系统中开发的调试接口、数据接口、监控接口。

开始推演:

  1. 假如处理单个请求的计算量不变(依然很小),但请求量增大,并发量增大,网络IO成为瓶颈,这种模型是不能满足需求的。因此需要使用 IO多路复用 + 单线程模型

  2. 假如 请求量、并发量不变,但是处理请求计算量变大,单核CPU成为瓶颈,这种模型也是不能满足需求的。此时需要使用 IO阻塞 + 多线程模型,利用CPU多核提高计算能力。

  3. 假如请求量、并发量变大,而且处理单个请求的计算量也变大,这种模型更是不能满足需求,但此种情况比较复杂,下面需要详细论述。不过一般情况下也可以使用 IO多路复用 + 多线程模型

IO同步阻塞 + 多线程

使用这种模型,则是计算量变大,单核CPU往往成为瓶颈,必须使用多核来提高计算能力,但并发度低。数据举例,24核CPU处理每秒处理请求数小于1W,并发度小于24,请求量小于1000/s。

实例,各类 FastCGI 后台服务、php-fpm,用于机器学习模型计算的服务,图像处理服务。

开始推演:

IO同步阻塞 + 多线程,并发度受限于线程数,不适合处理并发,一旦并发量变高,则网络模型应该改用IO多路复用。

IO多路复用 + 单线程

使用这种模型,请求量大,并发量大,但处理每个请求的计算量小。数据举例,qps 5W以上,并发数高,但单核cpu每秒处理也在5W以上。

实例, redis和memcache的网络模型。

IO多路复用 + 多线程

经过上面的推演,IO多路复用 + 多线程模型应该是推演过程的终点。既能处理大量请求,又能提升并发度,提高CPU的利用率解决计算量大的问题。

实例, 大型网络应用。

总结

无论选择什么样的模型,最终的目的就是提高服务器硬件的利用率,并避免资源浪费。

选择合适模型,必须依据其所在的业务场景,根据请求量、并发量、计算量这个3个指标,选择合适的模型。

问题总结

  1. 为什么不是所有情况都选择IO多路复用 + 多线程模型,IO多路复用 + 多线程解决了高访问量、高并发、计算量大的业务?

主要是因为在一些非高访问量、非高并发、非计算量大的业务场景下,IO多路复用 + 多线程是一种过度设计,容易造成资源浪费。

  1. 为什么同步IO非阻塞并没有在推演过程中使用?

    非阻塞的编码,会让代码逻辑复杂,一般不会使用。

Memcache源代码阅读总结

Memcache的组成部分

从设计的层面讲,Memcache的基本组成元素只有3个:

  • 网络IO多路复用, 由libevent库来支持
  • 内存存储,使用Slab的数据结构 和 LRU内存回收算法,管理内存。
  • Hash算法索引数据,使用链表法来存储hash node/item和解决hash冲突。

Memcache 为什么这么设计?

可以拆解为下面的问题:

  • 为什么网络IO多路复用模型?
  • 为什么要用Hash算法?
  • 为什么要使用Slab的内存管理方法?

为了解答上面的问题,首先要探讨Memcache业务场景的特点

Memcache 的使用场景都是被作为分布式缓存

  • 请求量大,连接数多
  • 单次请求数据小,作为缓存消耗的存储空间小
  • 请求延时要求极小

正是以上特点:

  • IO多路复用用于解决请求量大,连接数多的问题
  • 内存存储数据,读写快,解决延时小的问题,并且满足分布式缓存的需求
  • 使用hash索引数据,则提升数据查询速度,更能满足请求延迟小的要求。

这样设计的好处是,既能满足业务场景的需要,又能有很高的性能。

问题汇总

有没有比memcache更好的设计了?

网络模型可以解除libevent依赖,直接使用epoll。 代码质量可以进一步优化。

memcache 选择单线程还是多线程?

早期memcache使用单线程,后期换为多线程。

为什么高版本的memcache选择了多线程模型?

突破单核cpu计算瓶颈,增强并发能力。但我认为提升成本大,效果小,单线程更合理些。

广告引擎技术特点

是一种搜索引擎

广告引擎,是适用于广告搜索场景下的搜索引擎。广告引擎特点,也是由广告业务需求所决定的。

流量大

  • 依附于大流量的用户产品之中,负责将用户流量转化为商业流量。
  • 用户量大,但客户数广告数少。
  • 虽然流量大,但是对并发度的要求比较低。和用户产品相比,广告引擎的业务场景往往不需要和客户端维持长连接。

低延迟

  • 信息流广告必须快于信息,才能不伤害用户体验。
  • 特殊场景广告,必须低延迟,比如APP的开屏、插屏,不然会影响客户使用APP。
  • 在RTB系统中,对低延迟的要求更高。

一方面,由于exchange的存在,增加了一次网络请求和中间的竞价过程,DSP系统的响应时间就要更加短;另一方面,exchange对于DSP的响应时间会作为二次竞价的权值,提高响应速度对提高收入有利。

检索数据量少

  • 业界广告主数量少, 100万~1000万量级
  • 广告物料也少,1亿量级

和传统的搜索引擎相比,广告引擎的数据量并不大。

生产的数据量大

  • 用户每次的访问、展示打点追踪、点击打点追踪都要被记录,每日产生的数据按TB计算。

高可用

虽说高可用几乎成了所有后台服务的特点,但是广告引擎对高可用的要求尤其要高。

  • 系统的每次故障,都可能会造成公司收入的直接降低。

用户的产品的后台系统一旦出现故障,可能损失是用户数,或者伤害了用户体验,但是广告系统损失的是最直接的收入。

实时性

实时性的要求主要体现在广告的及时上下线。如果广告上线不及时,则不能及时产生收入;反之,如果广告下线不及时,则会引发广告超投,也是一种收入损失。

时序性

时序性比较表现在,同一广告的上线、修改、下线等改动,作为消息传递给广告引擎时,必须保证时序性。不然广告的上线和下线等投放状态会出现错误。

二八原则规律的一些特征

  • 80%的收入来自20%的大客户
  • 80%的收益得益于20%的数据

广告收入异常原因追踪总结

本文内容梗概

总结和列举作者常见的广告收入异常的原因,总结问题定位和修复的经验。

因为作者日常多从事广告引擎的开发工作,所以下面的内容往往从广告引擎的视角来阐述。

本文内容多依赖数据监控的一些指标,比如流量、广告展示数、广告点击数、CTR、CPC、CPM、广告召回率、广告存量等。

正常的收入曲线特征

以下特征,仅适用于pv、广告数量级大,收入高的广告系统。

  • 收入曲线平滑变化,没有骤升骤降。
  • 收入变化呈现周期规律特征
  • 同比和环比没有大的变化。日期上与近几日或上周同一时间、上月同一时间,时间上取相同时间间隔内(5分钟)的收入相比,没有特别大的变化
  • 曲线变化符合当日特殊事件给广告收入带来的影响,比如双十一春节等节日、国家重要事件发布、大面积自然灾害等。

如果不满足上面的特征,则收入可能异常。

广告收入异常的数据表现

  • 流量异常
  • 广告展示量异常
  • 广告点击量异常
  • CTR异常
  • CPC、CPM异常

不同原因数据指标的表现不同

  • 流量异常

流量的异常,往往也导致展示量、点击量降低和升高,但是CTR、CPC、CPM等指标不会发生变化。这种情况将收入下降的排查范围缩小到流量端。

  • 广告展示量异常,但流量没有变化
  1. 如果广告召回率(广告存量)是否有异常,排查范围则缩小至广告上下线流程。比如系统问题导致广告上下线异常,广告主是否有大量的上下线操作等。
  2. 如果广告投放机有问题,则把排查范围缩小至广告投放机。
  • ctr 异常
  1. 广告展示量、CPC、CPM不变,则广告主投放的广告质量降低,广告创意素材吸引力差等。
  2. 广告展示量不变,CPC、CPM异常,有可能是由广告主调整价格。
  • CPC、CPM异常,流量、ctr、展示量、点击量都无变化
  1. 算法和竞价策略
  2. 广告主调价

为便于浏览,可参考下面的鱼骨图:

广告收入异常原因

总结

  • 有一个直观易读的数据监控和展示平台非常重要,有利于排查问题的效率。
  • 问题的排查特别依赖数据监控平台,因此数据监控平台必须先保证高可用和数据准确。

技术随笔

此文内容为未经整理的技术工程碎片。

分布式技术已经非常普及,但是为了提高系统的qps和并发能力,我们为什么优先挖掘机器潜力(提高硬件利用率),而不是优先扩容集群。

原因:

  • 服务器运维费用比电费贵多了。提高硬件利用率,虽然不节省电费,但是省服务器。

架构思想

  • 分层 (垂直分离)
  • 分而治之 (水平分离)
  • 动静分离、冷热分离、读写分离
  • 缓冲区
  • 扩展和复用
  • 总线思想、归一
  • 保持简单

方法论

  • 代码和配置分离

工程师手记-将多进程后台服务改造为多线程

背景

2016年秋,部门计划将移动广告引擎和新的移动DSP引擎做架构融合。保留原来的移动广告引擎的业务逻辑,将其移植到新的框架当中去。

新框架有很多特点,其中之一是所有模块都使用了多线程模型而老的移动广告引擎的一个模块则使用了多进程模型。

改造注意点

  • 临界资源的共享
  • 单例资源
  • 内存的共享

改造的陷阱

改造方法

在多进程模型中,单例模式可以安全地被使用。但是在多线程环境中,则要考虑多线程都要抢占单例类,单例类会成为瓶颈,而且还有可能出现线程不安全的问题。

解决方法:

将多进程的单例类,改造成进程体里多例模式,但是在每个线程体内单例。具体方法参考线程安全的单例模式

改造结果

改造成功,并且顺利上线,正常服务。

改造后带来的好处和坏处

  • 性能的提升
  • 内存

广告引擎技术总结

日志尚未完成、持续总结中……

广告引擎,是适用于广告搜索场景下的搜索引擎。

广告引擎特点

广告引擎特点,也是由广告业务需求所决定的。

  • 流量大
  • 低延迟
  • 生产的数据量大
  • 高可用
  • 实时性

详细阐述,请看这里

业务

  • 业务的抽象

架构

  • 广告检索流程
  • 前端
  • 索引
  • 实时数据流

详细阐述,请看这里

算法

  • 竞价排名
  • 机器学习

trouble shoot

性能测试

  • 压力测试
  • IO测试

监控

  • 系统监控

CPU负载、内存使用率、qps、连接数、网络通信延时、第三方服务网络通信延时、计算延时、分业务分模块统计延时、网络IO。

  • 业务监控

广告召回率、广告返回为空比率、ctr、CPC、CPM、展示量、点击量、索引广告数目。

运维

测试

团队

  • 流程管理

Linux Shell 命令小技巧

Top 监控某一多进程任务

以nginx 为例: d PL=pidof nginx

PL=${PL//\ /,}

top -p $PL

SSH

ssh scp等消除每次问yes/no

ssh -o StrictHostKeyChecking no dev.zuocheng.net

链接复用

创建ssh sock链接目录

mkdir ~/.ssh/socks

修改~/.ssh/config文件,若该文件不存在,则创建。增加以下内容:

Host *
    KeepAlive yes
    ServerAliveInterval 60
    ControlMaster auto
    ControlPersist yes
    ControlPath ~/.ssh/socks/%h-%p-%r

grep

常用参数 -Irn

find

清空所有log文件

find ./ -name *.log | xargs truncate -s 0

删除所有nohup文件

find ./ -name nohup.out | xargs rm -rf

find ./ -name *.swp | xargs rm -rf

删除过期日志(60天前)

find ./ -mtime +60 -name log.* -exec rm -rf {} \;

使用正则删除过期日志(7天有效期)
30 4 * * * (find /workdir -type f -mtime +7 -regex '.+?/log/app\.20[0-9]+' -exec rm -f {} \;)

把当前目录下所有文件都MD5

find ./ -type f -exec md5sum {} \; | sort -k 2 > md5sum

date

依据时间戳获取时间

date -d @1525856172

生成日期
DATE=$(date +%Y%m%d)

比较两个文件夹的不同

生成MD5值 find ./dir -type f -exec md5sum {} \; > md5sum.txt

生成MD5值,并排除日志文件 find "dir" -path 'dir/log*' -prune -o -type f -exec md5sum {} \; > md5sum.txt

校验md5值 md5sum -c > md5sum.txt

sed 相关

sed -i '/^$/d' 删除空行
sed -i 's/\s*$//g' #删除行尾空白