murmur-hash\Md5\crc32等算法对用户信息散列均匀程度测试

问题的提出

  • 一致性hash,散列并不均匀
  • 集群中机器性能并不均衡。因素众多,比如型号不同、硬件不同;同型号下,机器性能表现也不同。
  • 当使用一致性hash作为负载均衡策略时,会让机器负载更加不均衡。

因此需要对一致性hash作为负载均衡策略时表现的的特性进行详细探索。最主要观察的指标则是散列均匀程度。

提前给出实验结果

  • murmur hash的散列均匀程度远优于crc32,和MD5相当。但murmur hash在计算消耗上低于MD5
  • 使用murmur hash,在100台物理节点,5000虚拟节点倍数的情况下,可以做到散列均匀和性能的平衡

brpc中如何实现一致性hash负载均衡

// code file: brpc/src/brpc/policy/consistent_hashing_load_balancer.cpp
int ConsistentHashingLoadBalancer::SelectServer(
    const SelectIn &in, SelectOut *out) {
    if (!in.has_request_code) {
        LOG(ERROR) << "Controller.set_request_code() is required";
        return EINVAL;
    }
    if (in.request_code > UINT_MAX) {
        LOG(ERROR) << "request_code must be 32-bit currently";
        return EINVAL;
    }
    butil::DoublyBufferedData >::ScopedPtr s;
    if (_db_hash_ring.Read(&s) != 0) {
        return ENOMEM;
    }
    if (s->empty()) {
        return ENODATA;
    }
    std::vector::const_iterator choice =
        std::lower_bound(s->begin(), s->end(), (uint32_t)in.request_code);
    if (choice == s->end()) {
        choice = s->begin();
    }
    for (size_t i = 0; i < s->size(); ++i) {
        if (((i + 1) == s->size() // always take last chance
             || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
            && Socket::Address(choice->server_sock.id, out->ptr) == 0
            && (*out->ptr)->IsAvailable()) {
            return 0;
        } else {
            if (++choice == s->end()) {
                choice = s->begin();
            }
        }
    }
    return EHOSTDOWN;
}
bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
    std::vector add_nodes;
    add_nodes.reserve(_num_replicas);
    if (!GetReplicaPolicy(_type)->Build(server, _num_replicas, &add_nodes)) {
        return false;
    }
    std::sort(add_nodes.begin(), add_nodes.end());
    bool executed = false;
    const size_t ret = _db_hash_ring.ModifyWithForeground(
                        AddBatch, add_nodes, &executed);
    CHECK(ret == 0 || ret == _num_replicas) << ret;
    return ret != 0;
}

由代码看出hash环中的每台服务器会变为100个虚拟节点,假设有100台机器,则在hash环分布上会有10000个节点

不同散列算法对比

hash/曼哈顿距离/虚拟节点数 100*100 1000*100 3000*100 5000*100 10000*1000
murmur 30% 20% 10% 5% 5%
MD5 30% 20% 10% 5% 5%
crc32 150% 100% 50% 30% 30%

结论,murmur hash的散列均匀程度优于crc32、MD5等

MurMurHash算法分析

测试murmurhash 散列均匀程度的代码

int main () {
    int32_t rep = 100;
    int32_t base = 100;
    std::vector  nodes;
    nodes.resize(base);
    std::vector imeis;

    std::map node_info;
    std::vector rings;

    // init cs rings
    for (auto& node : nodes) {
        node.host = std::to_string(rand());
        for (int32_t i = 0; i < rep; ++i) {
            std::string host = node.host +  "-" + std::to_string(i);
            uint32_t hash = MurmurHash32(host.c_str(), host.size());
            node.hash = hash;
            node_info[hash] = &node;
            rings.push_back(hash);
        }
    }
    std::sort(rings.begin(), rings.end());
    // read imeis
    {
        std::ifstream if_stream("./imei_sample.txt");
        std::string imei;
        while(if_stream >> imei) {
            imeis.emplace_back(imei);
        }
        if_stream.close();
        cout << "imei size:" << imeis.size()
            << endl;
    }
    // search
    for (auto& imei : imeis) {
        uint32_t hash = MurmurHash32(imei.c_str(), imei.size());
        auto iter = std::lower_bound(rings.begin(), rings.end(), hash);
        if (iter == rings.end()) {
            iter = rings.begin();
        }
        node_info[*iter]->count += 1;
    }

    // print
    std::sort(nodes.begin(), nodes.end(), [](const Node& a, Node& b)->bool { return( a.count < b.count); });
    for (auto& node : nodes) {
        cout << "host:" << node.host
            << "\thash:" << node.hash
            << "\tcount:" << node.count
            << "\trate:" << (double)node.count/(double)imeis.size()
            << endl;
    }

    auto max = nodes.rbegin()->count;
    auto avg = imeis.size()/nodes.size();
    auto min = nodes.begin()->count;
    cout << "max:" << nodes.rbegin()->count << endl;
    cout << "avg:" << imeis.size()/nodes.size() << endl;
    cout << "min:" << nodes.begin()->count << endl;
    cout << "up:" << (double)(max-avg)/(double)avg << endl;
    cout << "down:" << (double)(avg-min)/(double)avg << endl;
    return 0;

测试结果

nodes_num chash_num_replicas 单核cpu使用率% 内存使用率% 100万次查询/s 波动率 hash ring时间 us hash ring sort时间 us
100 100 3 0.3 1.56 0.403038 14014 3564
100 1000 3 0.4 1.89783 0.0884258 156900 42973
100 5000 4.7 0.5 2.49725 0.0536913 880742 244460
100 10000 5 0.6 2.60658 0.0416814 1900755 519096
100 50000 10 1 3.24731 0.024373 11680936 2852591
100 100000 15 3 3.52997 0.0270811 25035142 5932167
100 500000 100 11 4.62568 0.0300247 148615862 32576504

结论

总量 235W imei
波动范围:-21~35%
+0.352182
-0.210649

./murmur_hash
imei size:2362768
host:278722862  hash:1925471051 count:18650 rate:0.00789328
host:783368690  hash:4002282954 count:19363 rate:0.00819505
host:1548233367 hash:3058897622 count:19635 rate:0.00831017
host:491705403  hash:611553964  count:19699 rate:0.00833726
host:1984210012 hash:1268219422 count:19909 rate:0.00842613
host:1303455736 hash:2326844163 count:20035 rate:0.00847946
host:855636226  hash:3637853163 count:20067 rate:0.008493
host:1059961393 hash:2408710735 count:20074 rate:0.00849597
host:1957747793 hash:1288415059 count:20084 rate:0.0085002
host:1477171087 hash:1988358724 count:20170 rate:0.0085366
host:1540383426 hash:411668640  count:20173 rate:0.00853787
host:610515434  hash:663691886  count:20526 rate:0.00868727
host:1973594324 hash:34305553   count:20670 rate:0.00874821
host:760313750  hash:3679971615 count:20685 rate:0.00875456
host:137806862  hash:1969944721 count:20771 rate:0.00879096
host:294702567  hash:2546435079 count:20788 rate:0.00879816
host:468703135  hash:646134042  count:20832 rate:0.00881678
host:943947739  hash:90910835   count:21160 rate:0.0089556
host:709393584  hash:574850828  count:21243 rate:0.00899073
host:1681692777 hash:3333659905 count:21308 rate:0.00901824
host:1102520059 hash:1769249379 count:21493 rate:0.00909653
host:1632621729 hash:1495248802 count:21503 rate:0.00910077
host:233665123  hash:1873607595 count:21784 rate:0.00921969
host:859484421  hash:3330505905 count:21797 rate:0.0092252
host:424238335  hash:4012193417 count:22014 rate:0.00931704
host:184803526  hash:2094014013 count:22097 rate:0.00935217
host:1911759956 hash:824332016  count:22155 rate:0.00937671
host:412776091  hash:2345249892 count:22194 rate:0.00939322
host:1125898167 hash:3699427959 count:22223 rate:0.00940549
host:135497281  hash:3267716888 count:22224 rate:0.00940592
host:572660336  hash:2471704921 count:22463 rate:0.00950707
host:596516649  hash:3577994617 count:22477 rate:0.00951299
host:719885386  hash:1271161395 count:22746 rate:0.00962684
host:1843993368 hash:209167012  count:22748 rate:0.00962769
host:1131176229 hash:253515204  count:22804 rate:0.00965139
host:805750846  hash:3215568319 count:22873 rate:0.00968059
host:35005211   hash:2142699485 count:22887 rate:0.00968652
host:1649760492 hash:1041784901 count:22967 rate:0.00972038
host:1967513926 hash:1618568325 count:22975 rate:0.00972376
host:2038664370 hash:3103071240 count:23099 rate:0.00977625
host:2044897763 hash:1674256958 count:23241 rate:0.00983634
host:1656478042 hash:1438521939 count:23273 rate:0.00984989
host:511702305  hash:3873703906 count:23349 rate:0.00988205
host:752392754  hash:3262287234 count:23352 rate:0.00988332
host:304089172  hash:1732284559 count:23392 rate:0.00990025
host:84353895   hash:2348091605 count:23450 rate:0.0099248
host:1469348094 hash:646865057  count:23522 rate:0.00995527
host:1726956429 hash:4181074938 count:23539 rate:0.00996247
host:945117276  hash:4238293984 count:23567 rate:0.00997432
host:1141616124 hash:1571084204 count:23687 rate:0.0100251
host:1937477084 hash:4082607906 count:23749 rate:0.0100513
host:1474612399 hash:3177345892 count:23788 rate:0.0100679
host:1129566413 hash:1470592129 count:23800 rate:0.0100729
host:1734575198 hash:2699928391 count:23809 rate:0.0100767
host:628175011  hash:2004400594 count:23829 rate:0.0100852
host:356426808  hash:1763444061 count:23858 rate:0.0100975
host:1424268980 hash:4064778616 count:23945 rate:0.0101343
host:1100661313 hash:277960042  count:23951 rate:0.0101368
host:336465782  hash:2967368358 count:24001 rate:0.010158
host:861021530  hash:204708028  count:24044 rate:0.0101762
host:1264095060 hash:2871390615 count:24102 rate:0.0102007
host:1780695788 hash:3482626452 count:24127 rate:0.0102113
host:635723058  hash:1724005082 count:24184 rate:0.0102355
host:42999170   hash:3780009203 count:24188 rate:0.0102371
host:608413784  hash:1746093087 count:24209 rate:0.010246
host:1315634022 hash:2863942689 count:24290 rate:0.0102803
host:846930886  hash:3078961593 count:24352 rate:0.0103066
host:1914544919 hash:3276410919 count:24523 rate:0.0103789
host:1801979802 hash:4146573372 count:24672 rate:0.010442
host:1365180540 hash:625462217  count:24727 rate:0.0104653
host:939819582  hash:2372438818 count:24831 rate:0.0105093
host:982906996  hash:247356574  count:24930 rate:0.0105512
host:521595368  hash:3895493184 count:24982 rate:0.0105732
host:2145174067 hash:1076998566 count:25050 rate:0.010602
host:2084420925 hash:1383371566 count:25103 rate:0.0106244
host:1433925857 hash:4101903831 count:25117 rate:0.0106303
host:1956297539 hash:1715097599 count:25379 rate:0.0107412
host:1374344043 hash:649457066  count:25383 rate:0.0107429
host:1101513929 hash:1467247780 count:25506 rate:0.010795
host:756898537  hash:1686538984 count:25587 rate:0.0108292
host:1189641421 hash:176009951  count:25675 rate:0.0108665
host:1159126505 hash:1773799926 count:25939 rate:0.0109782
host:1827336327 hash:1008242909 count:25984 rate:0.0109973
host:1714636915 hash:4167178899 count:26113 rate:0.0110519
host:749241873  hash:3575756704 count:26202 rate:0.0110895
host:1025202362 hash:2435165496 count:26208 rate:0.0110921
host:1585990364 hash:1528149985 count:26263 rate:0.0111154
host:2001100545 hash:3446016448 count:26377 rate:0.0111636
host:1998898814 hash:1116227003 count:26415 rate:0.0111797
host:1350490027 hash:1503077054 count:26472 rate:0.0112038
host:1804289383 hash:4001391486 count:26715 rate:0.0113067
host:1749698586 hash:1034046264 count:26729 rate:0.0113126
host:1411549676 hash:2524237029 count:26744 rate:0.0113189
host:149798315  hash:2620653860 count:26859 rate:0.0113676
host:1369133069 hash:474177003  count:26875 rate:0.0113744
host:2089018456 hash:2266701595 count:27028 rate:0.0114391
host:1889947178 hash:533689514  count:27869 rate:0.0117951
host:1653377373 hash:2085222316 count:28508 rate:0.0120655
host:2053999932 hash:399467675  count:30092 rate:0.0127359
host:1918502651 hash:941599913  count:31948 rate:0.0135214
max:31948
avg:23627
min:18650
up:0.352182
down:0.210649

一道关于字符串算法题

题目

输入a b c 三个字符串,判断c是否是a、b中的字符,并且分别按在a、b中的顺序出现在c里

#include 
using namespace std;

// case 0 , nomal
// case 1 , null
// case 2 , a、b重复
// case 3 , a 或 b有叠词
// case 4 , a、b 字符串逆序

bool foo(char* a, int sa, char* b, int sb,  char* c, int sc) {
    if (nullptr == a || nullptr == b || nullptr == c ) {
        return false;
    }
    if (sa < 0 || sb < 0 || sc < 0) {
        return false;
    }
    if ( sa + sb != sc) {
        return false;
    }
    if (*c != *a && *c != *b) {
        return false;
    }
    if (sa == 0 && sb == 0 && sc == 0) {
        return true;
    }
    if (*a == *b && sa > 0 && sb > 0) {
        bool check = false;
        check |= foo(a+1, sa-1, b, sb, c+1, sc-1);
        check |= foo(a, sa, b+1, sb-1, c+1, sc-1);
        return check;
    } else if (*a == *c && sa > 0) {
        return foo(a+1, sa-1, b, sb, c+1, sc -1);
    } else if (*b == *c && sb > 0) {
        return foo(a, sa, b+1, sb-1, c+1, sc-1);
    }
    return true;
}

int main() {
    cout << "0 foo expect 1:" << foo("",0, "",0, "",0) << endl;
    cout << "1 foo expect 1:" << foo("abc",3, "123",3, "a1b2c3",6) << endl;
    cout << "2 foo expect 0:" << foo("acb",3, "123",3, "a1b2c3",6) << endl;
    cout << "3 foo expect 0:" << foo("acb",3, "123",3, "a1b2b3",6) << endl;
    cout << "4 foo expect 0:" << foo("acc",3, "123",3, "a1b2c3",6) << endl;
    cout << "5 foo expect 1:" << foo("acc",3, "123",3, "acc123",6) << endl;
    cout << "6 foo expect 1:" << foo("acc",3, "1b23",4, "a1cbc23",7) << endl;
    cout << "6 foo expect 0:" << foo("acc",3, "1b23",4, "a1cbc2",6) << endl;
    return 0;
}
  • 题目来自今日头条

一种压测工具使用的控制流量大小的算法-时间轮算法

压力测试工具是软件测试过程中的必用工具,而压力测试工具如何控制流量大小呢?

最常见的是计算每个请求之间的时间间隔,然后通过sleep方法来让两次请求产生间隔。这种方法有2个缺点,sleep时会让线程挂起,所以需要比较多的线程数;其二,当流量非常大的情况,比如qps达到10万以上时,会收到系统时钟精度和线程上下文切换的挑战。

本文设计了另外一种方法,底层原理为时间轮算法,利用二进制的与操作来实现。按照概率控制流量大小。但概率的计算并不依赖随机数,而是通过设置一个概率控制变量的方法,让流量的发送更加均衡。

代码如下:

class Transformer {
    public:
        Transformer() :
            send_num(0),
            qps(0),
            benchmark(0),
            counter(0),
            thread_pool(10) {}

        void run();
        void stop() { tc.stop(); }

    private:

        void sendOne();
        void transform();

        int32_t send_num;
        int32_t qps;
        int32_t benchmark;
        std::atomic counter;

        ThreadPool thread_pool;
        TimerController tc;
};
void Transformer::run() {
    qps = FLAGS_qps;
    if (qps <= 0 || qps > 1e5) {
        return;
    }

    int32_t query = (qps << 16) / 1000;
    send_num = query >> 16;
    query &= 0xFFFF;
    for (int i = 0; i < 16; ++i) {
        benchmark <<= 1;
        if (query & 1) {
            ++benchmark;
        }
        query >>= 1;
    }
    tc.cycleProcess(1, [this]() { this->transform(); });
}

void Transformer::transform() {
    uint32_t cur_c = counter.fetch_add(1, std::memory_order_relaxed);
    cur_c &= 0xFFFF;
    if (cur_c <= 0) {
        return;
    }
    int32_t delta = 0;
    for (int i = 0,bit = 1; i < 16; ++i, bit <<= 1) {
        if ((cur_c & bit) == 0) {
            continue;
        }
        if ((benchmark & bit) == 0) {
            break;
        } else {
            delta = 1;
            break;
        }
    }

    int32_t cur_send_num = send_num + delta;
    if (cur_send_num <= 0) {
        return;
    }
    for (int i = 0; i< cur_send_num; ++i) {
        thread_pool.enqueue([this]() { this->sendOne(); });
    }
}

汉明距离之算法和实现总结

内容简介

汉明距离,通过比较向量每一位是否相同,求出不同位的个数。用来表示两个向量之间的相似度。

汉明距离计算的步骤,即对两个向量首先进行异或操作,然后对异或的结果的每一位bit进行统计,最后合计出有多少bit的值为1。

本文主要内容为,列举出9种计算汉明距离的算法及其C++代码的实现,并使用本文的测试方法测试得出不同算法的性能。再对不同的算法进行分析比较。

计算机在进行异或操作中,CPU的指令集可以提供多种实现。比如cpu固有指令 xor 和 SSE指令集 、AVX指令集。后两种指令集都是为了提升CPU对向量的处理速度而扩展的指令集。

汉明距离计算的后一步,计算一个变量中所有bit的值为1的个数。可以使用多种算法和实现方式来实现。比如算法上,逐位移位统计、查表法、分治法等;实现方式上,可以使用前面所说的算法,也可以使用cpu的指令popcnt直接求得。

实现算法

本文算法计算的向量为二值向量;

本文中包含算法函数的Algorithm类声明如下,不同计算汉明距离的算法的类都继承这个基类,计算汉明距离由成员函数 uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size)实现,函数输入为3个参数,其中p和q分别为两个向量的指针;size为向量维度的大小,并以64为倍数; 向量最小为64个bit。

class Algorithm {
  public:
    ~Algorithm() {}
    virtual void init() {}
    virtual std::string getName() {
      return "Undefined Algorithm Name.";
    }
    virtual uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) = 0;
};

一般算法

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i) {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        while (r) {
          res += r & 1;
          r = r >> 1;
        }
      }
      return res;
    }

使用gcc内建函数优化一般算法

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i) {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += __builtin_popcountll(r);
      }
      return res;
    }

查表法-按8bit查询

class HammingDistanceTable8Bit : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceTable8Bit";
    }
    void init() {
      pop_count_table_ptr = NULL;
      pop_count_table_8bit_init(&pop_count_table_ptr);
    }
    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_table_8bit(r);
      }
      return res;
    }
  private:
    uint8_t *pop_count_table_ptr; 
    void pop_count_table_8bit_init(uint8_t **pop_count_table_ptr) {
      *pop_count_table_ptr = new uint8_t[256];
      for (int i = 0; i < 256; ++i) {
        (*pop_count_table_ptr)[i] = __builtin_popcount(i);
      }  
    }

    uint64_t pop_count_table_8bit(uint64_t n) {
      int res = 0;
      uint8_t *p = (uint8_t *)&n;
      for (int i = 0; i < 8; ++i) {
        res += pop_count_table_ptr[*(p + i)];
      }
      return res;
    }
};

查表法-按16bit查询

class HammingDistanceTable16Bit : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceTable16Bit";
    }
    void init() {
      pop_count_table_ptr = NULL;
      pop_count_table_16bit_init(&pop_count_table_ptr);
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_table_16bit(r);
      }
      return res;
    }
  private:
    uint8_t *pop_count_table_ptr; 

    void pop_count_table_16bit_init(uint8_t **pop_count_table_ptr) {
      *pop_count_table_ptr = new uint8_t[65536];
      for (int i = 0; i < 65536; ++i) {
        (*pop_count_table_ptr)[i] = __builtin_popcount(i);
      }  
    }

    uint64_t pop_count_table_16bit(uint64_t n) {
      int res = 0;
      uint16_t *p = (uint16_t *)&n;
      for (int i = 0; i < 4; ++i) {
        res += pop_count_table_ptr[*(p + i)];
      }
      return res;
    }
};

分治法

class HammingDistanceDivideConquer : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceDivideConquer";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_divide_conquer(r);
      }
      return res;
    }

    uint64_t pop_count_divide_conquer(uint64_t n) {
      n = (n & 0x5555555555555555) + ((n >> 1 ) & 0x5555555555555555); 
      n = (n & 0x3333333333333333) + ((n >> 2 ) & 0x3333333333333333);
      n = (n & 0x0F0F0F0F0F0F0F0F) + ((n >> 4 ) & 0x0F0F0F0F0F0F0F0F);
      n = (n & 0x00FF00FF00FF00FF) + ((n >> 8 ) & 0x00FF00FF00FF00FF);
      n = (n & 0x0000FFFF0000FFFF) + ((n >> 16) & 0x0000FFFF0000FFFF);
      n = (n & 0x00000000FFFFFFFF) + ((n >> 32) & 0x00000000FFFFFFFF);
      return n;
    }
  private:

};

改进分治法

class HammingDistanceDivideConquerOpt : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceDivideConquerOpt";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_divide_conquer_opt(r);
      }
      return res;
    }

    uint64_t pop_count_divide_conquer_opt(uint64_t n) {
      n = n - ((n >> 1)  & 0x5555555555555555); 
      n = (n & 0x3333333333333333) + ((n >> 2 ) & 0x3333333333333333);
      n = (n + (n >> 4 )) & 0x0F0F0F0F0F0F0F0F;
      n = n + (n >> 8 );
      n = n + (n >> 16);
      n = n + (n >> 32);
      return (uint64_t)(n & 0x7F);
    }
  private:

};

使用SSE指令集

class HammingDistanceSSE : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceSSE";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[2] = {0, 0};
      for (uint64_t i = 0; i < size; i += 2) { 
        __m64 *x1 = (__m64*)(p);
        __m64 *x2 = (__m64*)(p + 1);
        __m64 *y1 = (__m64*)(q);
        __m64 *y2 = (__m64*)(q + 1);
        __m128i z1 = _mm_set_epi64 (*x1, *x2);
        __m128i z2 = _mm_set_epi64 (*y1, *y2);
        __m128i xor_res =  _mm_xor_si128(z1 , z2);
        _mm_store_si128((__m128i*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
      }
      return res;
    }
};

使用AVX2指令集

class HammingDistanceAVX : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceAVX";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[4] = {0, 0, 0, 0};
      for (uint64_t i = 0; i < size - 1; i += 4) { 
        long long int *x1 = (long long int*)(p + i);
        long long int *x2 = (long long int*)(p + i + 1);
        long long int *x3 = (long long int*)(p + i + 2);
        long long int *x4 = (long long int*)(p + i + 3);
        long long int *y1 = (long long int*)(q + i);
        long long int *y2 = (long long int*)(q + i + 1);
        long long int *y3 = (long long int*)(q + i + 2);
        long long int *y4 = (long long int*)(q + i + 3);
        __m256i z1 = _mm256_set_epi64x (*x1, *x2, *x3, *x4);
        __m256i z2 = _mm256_set_epi64x (*y1, *y2, *y3, *y4);
        __m256i xor_res =  _mm256_xor_si256(z1 , z2);
        _mm256_store_si256((__m256i*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
        res += _mm_popcnt_u64(temp_res[2]);
        res += _mm_popcnt_u64(temp_res[3]);
      }
      return res;
    }
};

使用AVX512指令集

截止本文完成时,市场上支持avx-512指令的cpu并没有普及,但是gcc已经提供了avx-512的c/c++ 语言接口。本文先把代码实现,后续购得支持avx-512指令的cpu后,再进行测试。

class HammingDistanceAVX : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceAVX";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[8] = {0, 0, 0, 0, 0, 0, 0, 0};
      for (uint64_t i = 0; i < size - 1; i += 8) { 
        long long int *x1 = (long long int*)(p + i);
        long long int *x2 = (long long int*)(p + i + 1);
        long long int *x3 = (long long int*)(p + i + 2);
        long long int *x4 = (long long int*)(p + i + 3);
        long long int *x5 = (long long int*)(p + i + 4);
        long long int *x6 = (long long int*)(p + i + 5);
        long long int *x7 = (long long int*)(p + i + 6);
        long long int *x8 = (long long int*)(p + i + 7);
        long long int *y1 = (long long int*)(q + i);
        long long int *y2 = (long long int*)(q + i + 1);
        long long int *y3 = (long long int*)(q + i + 2);
        long long int *y4 = (long long int*)(q + i + 3);
        long long int *y5 = (long long int*)(q + i + 4);
        long long int *y6 = (long long int*)(q + i + 5);
        long long int *y7 = (long long int*)(q + i + 6);
        long long int *y8 = (long long int*)(q + i + 7);
        __m512i z1 = _mm512_set_epi64x (*x1, *x2, *x3, *x4, *x5, *x6, *x7, *x8);
        __m512i z2 = _mm512_set_epi64x (*y1, *y2, *y3, *y4);
        __m512i xor_res =  _mm512_xor_si512(z1 , z2);
        _mm512_store_si512((void*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
        res += _mm_popcnt_u64(temp_res[2]);
        res += _mm_popcnt_u64(temp_res[3]);
        res += _mm_popcnt_u64(temp_res[4]);
        res += _mm_popcnt_u64(temp_res[5]);
        res += _mm_popcnt_u64(temp_res[6]);
        res += _mm_popcnt_u64(temp_res[7]);
      }
      return res;
    }
};

性能测试

测试代码框架类:

class AlgorithmBench {
  public:
    void init() {
      startTimer();
      vector = new uint64_t[size];
      ifstream f("sample.txt", ios::in);
      for(int i = 0; i < size;i++ ) {
        uint64_t c;
        f >> vector[i];
      }
      stopTimer();
      getInterval("load sample cost:");
      f.close();
    }
    void setSize(uint64_t size) { this->size = size;};
    void push_back(Algorithm* algorithm) { _algorithm_vector.push_back(algorithm);}

    void start() {
      for (std::vector<Algorithm*>::iterator iter = _algorithm_vector.begin();
          iter != _algorithm_vector.end();
          ++iter) {
        Algorithm* ptr = *iter;
        ptr->init();
        startTimer();
        for (int i = 0; i < size - 3; ++i) {
          ptr->cal(vector + i, vector + i + 1, 4);
        }
        stopTimer();
        getInterval(ptr->getName() + " cost:");
      }
    }

    void startTimer() {
      gettimeofday(&tv,NULL);
      start_timer = 1000000 * tv.tv_sec + tv.tv_usec;
    }
    void stopTimer() {
      gettimeofday(&tv,NULL);
      end_timer = 1000000 * tv.tv_sec + tv.tv_usec;
    }
    void getInterval(std::string prefix) {
      std::cout<<std::left<<setw(40) << prefix 
        << std::right << end_timer - start_timer<<endl;
    }

  private:
    uint64_t size;
    uint64_t *vector;
    timeval tv;
    uint64_t start_timer;
    uint64_t end_timer;
    std::vector<Algorithm*> _algorithm_vector;
};

编译指令:

g++ -msse4.2 -mavx2 -O2 -o test_hamming hamming_distance.cpp

windows-gcc 测试结果

测试环境:

Windows 7
gcc version 7.4.0

HammingDistanceBase cost:               330066
HammingDistanceBuildin cost:            326566
HammingDistanceTable8Bit cost:          2381976
HammingDistanceTable16Bit cost:         1435287
HammingDistanceDivideConquer cost:      1215243
HammingDistanceDivideConquerOpt cost:   1226745
HammingDistanceSSE cost:                972695
HammingDistanceAVX cost:                680636

Linux-gcc 测试结果

测试环境:

Ubuntu Server 19.04
gcc version 8.3.0

测试结果:

load sample cost:                       78070
HammingDistanceBase cost:               145393
HammingDistanceBuildin cost:            75905
HammingDistanceTable8Bit cost:          598789
HammingDistanceTable16Bit cost:         142502
HammingDistanceDivideConquer cost:      343414
HammingDistanceDivideConquerOpt cost:   316748
HammingDistanceSSE cost:                59322
HammingDistanceAVX cost:                115784

总结

不同平台,不同的编译器版本,测试的结果有所差异。但整体表现上使用SSE指令集的性能最好,其次是使用内建函数计算popcnt性能最优。AVX指令性能略好于SSE。性能最差的是查表法8bit。分治法性能居中。

附录

所有代码都存放在github上面: https://github.com/zuocheng-liu/code-samples/blob/master/algorithm/hamming_distance.cpp

STL容器 vector list 性能对比(附带测试方法和测试数据)

最近在重构公司的一个C++模块,逻辑里有排序、过滤等操作。开发过程中,遇到下面的一些问题:

问题和猜想

  • 单链表的归并排序线性表的快速排序的时间复杂度都是 O(nlog(n)),但在实际情况中哪个更快呢?需要测试一下。
  • 猜想的发散,stl vector 和 list(双向链表)的迭代器遍历、顺序插入、clear操作, 时间复杂度好像都是相同的,但谁更快呢?
  • 看stl代码去研究vector和list的实现,并不能得到我们最终想要的结果,测试数据会更直观。

测试总结

先写测试总结,测试方法、测试结果都在下面。

  • 迭代器遍历,list 比vector 稍块,使用for_each 比使用for更快。
  • 顺序插入, vector比list 约快3倍
  • clear操作,vector 几乎不耗时,list 要耗费好多时间,vector比list至少快1千倍以上
  • 排序, vector 大约比list 快2倍。

结合项目,对比vector和list的性能

  • 在我重构前,模块使用的是vector存储数据,排序使用的是快排(大于16小于堆),过滤则是用临时数据vector转存。
  • 在我重构后,使用list存储,排序使用list的归并排序,过滤则是直接使用list的erase操作。
  • 重构前,耗时4000微秒,重构后1500微秒,时间减少60%。
  • list除了删除操作和遍历操作,排序和插入都比vector慢,但是我使用list后,却提升了性能。原因可能有2个: 1.单个元素数据结构大,vector移动这些元素代价较大,而list移动这些元素反而代价很小;2.去掉了中间临时存储,数据的转存代价也比较大。
  • 使用list后,模块的可扩展性也变得更好了。

这次项目最大的收获就是,如果有过滤操作,优选使用list。vector的缺点如下,1.vector快速排序需要移动元素,如果元素占据空间大,移动代价也非常大。2.vector过滤需要借助中间临时存储,直接erase的代价更大。

测试方法

  • 随机生成10万数据量,由vector和list结构的变量分别存储
  • 对vector 和 list 的数据分别做如下, 1)迭代器遍历 2)顺序插入 3) clear操作 4)排序
  • 记录每种操作消耗的时间
  • 多次测试,记录典型的数据结果

测试结果

直接列出测试结果,单位是微秒

vector for_each : 3263
list for_each : 2556
vector traverse : 4783
vector num traverse : 666
list traverse : 3078
vector push_back : 8136
list push_back : 24581
list delete : 7428
vector push_back : 7329
list push_back : 22968
vector clear : 0
list clear : 13079
vector sort : 89512
list sort : 146529

(附)测试程序

Github 托管地址,保持更新:https://github.com/zuocheng-liu/code-samples/blob/master/linux-c/stl/vector_list_performance_test.cpp

#include<stdio.h>
#include<stdlib.h>
#include<sys/time.h>
#include <list>
#include <vector>
#include <iostream>

#define MAX_NUM 100000

using namespace std;

timeval tv;
uint64_t timer;


bool NumCmp(uint32_t a, uint32_t b) { return (a > b); }

void startTimer() {
    gettimeofday(&tv,NULL);
    timer = 1000000 * tv.tv_sec + tv.tv_usec;
}

uint64_t stopTimer() {
    gettimeofday(&tv,NULL);
    timer = 1000000 * tv.tv_sec + tv.tv_usec - timer;
    return timer;
}

int main() {
    vector<uint32_t> v;
    vector<uint32_t> v2;
    list<uint32_t> l;
    list<uint32_t> l2;
    for (int i = 0; i< MAX_NUM; ++i) {
        srand((int)time(0)  * i * i + 1);
        int num = rand();
        v.push_back(num);
        l.push_back(num);
    }

    // compare oper traverse
    startTimer();   
    for (vector<uint32_t>::iterator iter = v.begin(); iter != v.end(); ++ iter) {
        *iter;
    }
    cout<<"vector\t traverse\t :\t"<< stopTimer() << endl;  

    startTimer();   
    for (int i = 0 ; i < MAX_NUM; ++ i) {
        //v[i];
    }
    cout<<"vector\t num traverse\t :\t"<< stopTimer() << endl;  


    startTimer();   
    for (list<uint32_t>::iterator iter = l.begin(); iter != l.end(); ++ iter) {
    }
    cout<<"list\t traverse\t :\t"<< stopTimer() << endl;  


    // compare oper push_back   
    startTimer();   
    for (vector<uint32_t>::iterator iter = v.begin(); iter != v.end(); ++ iter) {
        v2.push_back(*iter);
    }
    cout<<"vector\t push_back\t :\t"<< stopTimer() << endl;  

    startTimer();   
    for (list<uint32_t>::iterator iter = l.begin(); iter != l.end(); ++ iter) {
        l2.push_back(*iter);
    }
    cout<<"list\t push_back\t :\t"<< stopTimer() << endl;  

    // compare oper delete
    startTimer();
    v2.clear();
    for (vector<uint32_t>::iterator iter = v2.begin(); iter != v2.end(); ++ iter) {
    //    v2.erase(iter);
    }
    //cout<<"vector\t delete\t :\t"<< stopTimer() << endl;  

    startTimer();   
    for (list<uint32_t>::iterator iter = l2.begin(); iter != l2.end(); ++ iter) {
        iter = l2.erase(iter);
    }
    cout<<"list\t delete\t :\t"<< stopTimer() << endl;  

    // compare oper push_back   
    startTimer();   
    for (vector<uint32_t>::iterator iter = v.begin(); iter != v.end(); ++ iter) {
        v2.push_back(*iter);
    }
    cout<<"vector\t push_back\t :\t"<< stopTimer() << endl;  

    startTimer();   
    for (list<uint32_t>::iterator iter = l.begin(); iter != l.end(); ++ iter) {
        l2.push_back(*iter);
    }
    cout<<"list\t push_back\t :\t"<< stopTimer() << endl;  


    // compare oper clear
    startTimer();   
    v2.clear();
    cout<<"vector\t clear      \t:\t"<< stopTimer() << endl;  

    startTimer();   
    l2.clear();
    cout<<"list\t clear     \t :\t"<< stopTimer() << endl;  

    // compare oper sort
    startTimer();   
    std::sort(v.begin(), v.end(), NumCmp);
    cout<<"vector\t sort    \t :\t"<< stopTimer() << endl;  

    startTimer();   
    l.sort(NumCmp);
    cout<<"list\t sort    \t :\t"<< stopTimer() << endl;  

    return 0;
}

判断平面上点和不规则多边形位置关系算法

本文列举5中计算点和不规则多边形位置关系的算法。

计算机算法实现方案,最好的是弧长法。

射线法 (铅垂线法、水平线法)

射线法是使用最广泛的算法,这是由于相比较其他算法而言,它不但可以正确使用在凹多边形上,而且不需要考虑精度误差问题。

该算法思想是从点出发向右水平做一条射线,计算该射线与多边形的边的相交点个数,当点不在多边形边上时,如果是奇数,那么点就一定在多边形内部,否则,在外部。

其中铅垂线法和水平线法是射线法的两个具体算法,更容易实现,在坐标计算中更方便。

面积法

面积法的思想是如果点在多边形内部或者边上,那么点与多边形所有边组成的三角形面积和等于多边形面积。多边形的面积公式可以用叉积计算。不过计算面积是会有一定误差的,需要设置精度的误差范围。

点线判断法

对于多边形,如果一个点它的所有边的左边,那么这个点一定在多边形内部。利用叉积正好可以判断点与给定边的关系,即点是在边的左边右边还是边上。

转角法

以被测点O为坐标原点,计算其与所有相邻两个多边形顶点点P[i]之间的角度和,所有角度相加后得到360度,即为点在多边形之内,若得到180度,则在多边形边上,0度则在多边形外。

弧长法

弧长法是改进后的转角法,解决了传统转角法的精度问题。算法思想是,以被测点O为坐标原点,将平面划分为4个象限,对每个多边形顶点P[i],计算其所在的象限,然后顺序访问多边形的各个顶点P[i],分析P[i]和P[i+1],有下列三种情况:

  • P[i+1]在P[i]的下一象限。此时弧长和加π/2;

  • P[i+1]在P[i]的上一象限。此时弧长和减π/2;

  • P[i+1]在Pi的相对象限。利用叉积f=y[i+1]x[i]-x[i+1]y[i]计算Op[i]与Op[i+1]的关系,若f=0,Op[i]与Op[i+1]共线,点在多边形边上;若f<0,Op[i+1]在Op[i]逆时针方向,弧长和减π;若f>0,Op[i+1]在Op[i]顺时针方向,弧长和加π。

由于顶点在原点和坐标轴上时需要特殊处理,为了准确性,应该在每次计算顶点时都用叉积判断P是否在当前边上,另外将π用整数代替可以避免浮点数的精度误差。

二进制运算技巧

假设都在x86或x86_64架构CPU上进行运算,二进制基本运算包括,加减乘除、与、或、异或、同或、移位

假设n 为 32 位整形数,取正整数n除以8的余数 : n & 0x07

假设n 为 32 位整形数,取正整数n除以16的余数 : n & 0x0F

假设n 为 32 位整形数,8位对齐 : (n + 7) & 0xFFFFFFF8)

假设n 为 32 位整形数,32位对齐 : (n + 31) & 0xFFFFFF80)

假设n 为 32 位整形数,求8位对齐填充位数 ((n + 7) & 0xFFFFFFF8) – n;

求int 最大值 : (1 << 31) – 1 或者干脆 0x7FFFFFFF

求int 最小值 : 1 << 31

判断奇偶数 : n & 1 == 1

交换a b 两个变量的值 : a ^= b ^= a ^= b;

n 乘以2的m次方 : n << m