一种压测工具使用的控制流量大小的算法-时间轮算法

压力测试工具是软件测试过程中的必用工具,而压力测试工具如何控制流量大小呢?

最常见的是计算每个请求之间的时间间隔,然后通过sleep方法来让两次请求产生间隔。这种方法有2个缺点,sleep时会让线程挂起,所以需要比较多的线程数;其二,当流量非常大的情况,比如qps达到10万以上时,会收到系统时钟精度和线程上下文切换的挑战。

本文设计了另外一种方法,底层原理为时间轮算法,利用二进制的与操作来实现。按照概率控制流量大小。但概率的计算并不依赖随机数,而是通过设置一个概率控制变量的方法,让流量的发送更加均衡。

代码如下:

class Transformer {
    public:
        Transformer() :
            send_num(0),
            qps(0),
            benchmark(0),
            counter(0),
            thread_pool(10) {}

        void run();
        void stop() { tc.stop(); }

    private:

        void sendOne();
        void transform();

        int32_t send_num;
        int32_t qps;
        int32_t benchmark;
        std::atomic counter;

        ThreadPool thread_pool;
        TimerController tc;
};
void Transformer::run() {
    qps = FLAGS_qps;
    if (qps <= 0 || qps > 1e5) {
        return;
    }

    int32_t query = (qps << 16) / 1000;
    send_num = query >> 16;
    query &= 0xFFFF;
    for (int i = 0; i < 16; ++i) {
        benchmark <<= 1;
        if (query & 1) {
            ++benchmark;
        }
        query >>= 1;
    }
    tc.cycleProcess(1, [this]() { this->transform(); });
}

void Transformer::transform() {
    uint32_t cur_c = counter.fetch_add(1, std::memory_order_relaxed);
    cur_c &= 0xFFFF;
    if (cur_c <= 0) {
        return;
    }
    int32_t delta = 0;
    for (int i = 0,bit = 1; i < 16; ++i, bit <<= 1) {
        if ((cur_c & bit) == 0) {
            continue;
        }
        if ((benchmark & bit) == 0) {
            break;
        } else {
            delta = 1;
            break;
        }
    }

    int32_t cur_send_num = send_num + delta;
    if (cur_send_num <= 0) {
        return;
    }
    for (int i = 0; i< cur_send_num; ++i) {
        thread_pool.enqueue([this]() { this->sendOne(); });
    }
}

汉明距离之算法和实现总结

内容简介

汉明距离,通过比较向量每一位是否相同,求出不同位的个数。用来表示两个向量之间的相似度。

汉明距离计算的步骤,即对两个向量首先进行异或操作,然后对异或的结果的每一位bit进行统计,最后合计出有多少bit的值为1。

本文主要内容为,列举出9种计算汉明距离的算法及其C++代码的实现,并使用本文的测试方法测试得出不同算法的性能。再对不同的算法进行分析比较。

计算机在进行异或操作中,CPU的指令集可以提供多种实现。比如cpu固有指令 xor 和 SSE指令集 、AVX指令集。后两种指令集都是为了提升CPU对向量的处理速度而扩展的指令集。

汉明距离计算的后一步,计算一个变量中所有bit的值为1的个数。可以使用多种算法和实现方式来实现。比如算法上,逐位移位统计、查表法、分治法等;实现方式上,可以使用前面所说的算法,也可以使用cpu的指令popcnt直接求得。

实现算法

本文算法计算的向量为二值向量;

本文中包含算法函数的Algorithm类声明如下,不同计算汉明距离的算法的类都继承这个基类,计算汉明距离由成员函数 uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size)实现,函数输入为3个参数,其中p和q分别为两个向量的指针;size为向量维度的大小,并以64为倍数; 向量最小为64个bit。

class Algorithm {
  public:
    ~Algorithm() {}
    virtual void init() {}
    virtual std::string getName() {
      return "Undefined Algorithm Name.";
    }
    virtual uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) = 0;
};

一般算法

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i) {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        while (r) {
          res += r & 1;
          r = r >> 1;
        }
      }
      return res;
    }

使用gcc内建函数优化一般算法

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i) {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += __builtin_popcountll(r);
      }
      return res;
    }

查表法-按8bit查询

class HammingDistanceTable8Bit : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceTable8Bit";
    }
    void init() {
      pop_count_table_ptr = NULL;
      pop_count_table_8bit_init(&pop_count_table_ptr);
    }
    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_table_8bit(r);
      }
      return res;
    }
  private:
    uint8_t *pop_count_table_ptr; 
    void pop_count_table_8bit_init(uint8_t **pop_count_table_ptr) {
      *pop_count_table_ptr = new uint8_t[256];
      for (int i = 0; i < 256; ++i) {
        (*pop_count_table_ptr)[i] = __builtin_popcount(i);
      }  
    }

    uint64_t pop_count_table_8bit(uint64_t n) {
      int res = 0;
      uint8_t *p = (uint8_t *)&n;
      for (int i = 0; i < 8; ++i) {
        res += pop_count_table_ptr[*(p + i)];
      }
      return res;
    }
};

查表法-按16bit查询

class HammingDistanceTable16Bit : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceTable16Bit";
    }
    void init() {
      pop_count_table_ptr = NULL;
      pop_count_table_16bit_init(&pop_count_table_ptr);
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_table_16bit(r);
      }
      return res;
    }
  private:
    uint8_t *pop_count_table_ptr; 

    void pop_count_table_16bit_init(uint8_t **pop_count_table_ptr) {
      *pop_count_table_ptr = new uint8_t[65536];
      for (int i = 0; i < 65536; ++i) {
        (*pop_count_table_ptr)[i] = __builtin_popcount(i);
      }  
    }

    uint64_t pop_count_table_16bit(uint64_t n) {
      int res = 0;
      uint16_t *p = (uint16_t *)&n;
      for (int i = 0; i < 4; ++i) {
        res += pop_count_table_ptr[*(p + i)];
      }
      return res;
    }
};

分治法

class HammingDistanceDivideConquer : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceDivideConquer";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_divide_conquer(r);
      }
      return res;
    }

    uint64_t pop_count_divide_conquer(uint64_t n) {
      n = (n & 0x5555555555555555) + ((n >> 1 ) & 0x5555555555555555); 
      n = (n & 0x3333333333333333) + ((n >> 2 ) & 0x3333333333333333);
      n = (n & 0x0F0F0F0F0F0F0F0F) + ((n >> 4 ) & 0x0F0F0F0F0F0F0F0F);
      n = (n & 0x00FF00FF00FF00FF) + ((n >> 8 ) & 0x00FF00FF00FF00FF);
      n = (n & 0x0000FFFF0000FFFF) + ((n >> 16) & 0x0000FFFF0000FFFF);
      n = (n & 0x00000000FFFFFFFF) + ((n >> 32) & 0x00000000FFFFFFFF);
      return n;
    }
  private:

};

改进分治法

class HammingDistanceDivideConquerOpt : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceDivideConquerOpt";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_divide_conquer_opt(r);
      }
      return res;
    }

    uint64_t pop_count_divide_conquer_opt(uint64_t n) {
      n = n - ((n >> 1)  & 0x5555555555555555); 
      n = (n & 0x3333333333333333) + ((n >> 2 ) & 0x3333333333333333);
      n = (n + (n >> 4 )) & 0x0F0F0F0F0F0F0F0F;
      n = n + (n >> 8 );
      n = n + (n >> 16);
      n = n + (n >> 32);
      return (uint64_t)(n & 0x7F);
    }
  private:

};

使用SSE指令集

class HammingDistanceSSE : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceSSE";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[2] = {0, 0};
      for (uint64_t i = 0; i < size; i += 2) { 
        __m64 *x1 = (__m64*)(p);
        __m64 *x2 = (__m64*)(p + 1);
        __m64 *y1 = (__m64*)(q);
        __m64 *y2 = (__m64*)(q + 1);
        __m128i z1 = _mm_set_epi64 (*x1, *x2);
        __m128i z2 = _mm_set_epi64 (*y1, *y2);
        __m128i xor_res =  _mm_xor_si128(z1 , z2);
        _mm_store_si128((__m128i*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
      }
      return res;
    }
};

使用AVX2指令集

class HammingDistanceAVX : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceAVX";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[4] = {0, 0, 0, 0};
      for (uint64_t i = 0; i < size - 1; i += 4) { 
        long long int *x1 = (long long int*)(p + i);
        long long int *x2 = (long long int*)(p + i + 1);
        long long int *x3 = (long long int*)(p + i + 2);
        long long int *x4 = (long long int*)(p + i + 3);
        long long int *y1 = (long long int*)(q + i);
        long long int *y2 = (long long int*)(q + i + 1);
        long long int *y3 = (long long int*)(q + i + 2);
        long long int *y4 = (long long int*)(q + i + 3);
        __m256i z1 = _mm256_set_epi64x (*x1, *x2, *x3, *x4);
        __m256i z2 = _mm256_set_epi64x (*y1, *y2, *y3, *y4);
        __m256i xor_res =  _mm256_xor_si256(z1 , z2);
        _mm256_store_si256((__m256i*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
        res += _mm_popcnt_u64(temp_res[2]);
        res += _mm_popcnt_u64(temp_res[3]);
      }
      return res;
    }
};

使用AVX512指令集

截止本文完成时,市场上支持avx-512指令的cpu并没有普及,但是gcc已经提供了avx-512的c/c++ 语言接口。本文先把代码实现,后续购得支持avx-512指令的cpu后,再进行测试。

class HammingDistanceAVX : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceAVX";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[8] = {0, 0, 0, 0, 0, 0, 0, 0};
      for (uint64_t i = 0; i < size - 1; i += 8) { 
        long long int *x1 = (long long int*)(p + i);
        long long int *x2 = (long long int*)(p + i + 1);
        long long int *x3 = (long long int*)(p + i + 2);
        long long int *x4 = (long long int*)(p + i + 3);
        long long int *x5 = (long long int*)(p + i + 4);
        long long int *x6 = (long long int*)(p + i + 5);
        long long int *x7 = (long long int*)(p + i + 6);
        long long int *x8 = (long long int*)(p + i + 7);
        long long int *y1 = (long long int*)(q + i);
        long long int *y2 = (long long int*)(q + i + 1);
        long long int *y3 = (long long int*)(q + i + 2);
        long long int *y4 = (long long int*)(q + i + 3);
        long long int *y5 = (long long int*)(q + i + 4);
        long long int *y6 = (long long int*)(q + i + 5);
        long long int *y7 = (long long int*)(q + i + 6);
        long long int *y8 = (long long int*)(q + i + 7);
        __m512i z1 = _mm512_set_epi64x (*x1, *x2, *x3, *x4, *x5, *x6, *x7, *x8);
        __m512i z2 = _mm512_set_epi64x (*y1, *y2, *y3, *y4);
        __m512i xor_res =  _mm512_xor_si512(z1 , z2);
        _mm512_store_si512((void*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
        res += _mm_popcnt_u64(temp_res[2]);
        res += _mm_popcnt_u64(temp_res[3]);
        res += _mm_popcnt_u64(temp_res[4]);
        res += _mm_popcnt_u64(temp_res[5]);
        res += _mm_popcnt_u64(temp_res[6]);
        res += _mm_popcnt_u64(temp_res[7]);
      }
      return res;
    }
};

性能测试

测试代码框架类:

class AlgorithmBench {
  public:
    void init() {
      startTimer();
      vector = new uint64_t[size];
      ifstream f("sample.txt", ios::in);
      for(int i = 0; i < size;i++ ) {
        uint64_t c;
        f >> vector[i];
      }
      stopTimer();
      getInterval("load sample cost:");
      f.close();
    }
    void setSize(uint64_t size) { this->size = size;};
    void push_back(Algorithm* algorithm) { _algorithm_vector.push_back(algorithm);}

    void start() {
      for (std::vector<Algorithm*>::iterator iter = _algorithm_vector.begin();
          iter != _algorithm_vector.end();
          ++iter) {
        Algorithm* ptr = *iter;
        ptr->init();
        startTimer();
        for (int i = 0; i < size - 3; ++i) {
          ptr->cal(vector + i, vector + i + 1, 4);
        }
        stopTimer();
        getInterval(ptr->getName() + " cost:");
      }
    }

    void startTimer() {
      gettimeofday(&tv,NULL);
      start_timer = 1000000 * tv.tv_sec + tv.tv_usec;
    }
    void stopTimer() {
      gettimeofday(&tv,NULL);
      end_timer = 1000000 * tv.tv_sec + tv.tv_usec;
    }
    void getInterval(std::string prefix) {
      std::cout<<std::left<<setw(40) << prefix 
        << std::right << end_timer - start_timer<<endl;
    }

  private:
    uint64_t size;
    uint64_t *vector;
    timeval tv;
    uint64_t start_timer;
    uint64_t end_timer;
    std::vector<Algorithm*> _algorithm_vector;
};

编译指令:

g++ -msse4.2 -mavx2 -O2 -o test_hamming hamming_distance.cpp

windows-gcc 测试结果

测试环境:

Windows 7
gcc version 7.4.0

HammingDistanceBase cost:               330066
HammingDistanceBuildin cost:            326566
HammingDistanceTable8Bit cost:          2381976
HammingDistanceTable16Bit cost:         1435287
HammingDistanceDivideConquer cost:      1215243
HammingDistanceDivideConquerOpt cost:   1226745
HammingDistanceSSE cost:                972695
HammingDistanceAVX cost:                680636

Linux-gcc 测试结果

测试环境:

Ubuntu Server 19.04
gcc version 8.3.0

测试结果:

load sample cost:                       78070
HammingDistanceBase cost:               145393
HammingDistanceBuildin cost:            75905
HammingDistanceTable8Bit cost:          598789
HammingDistanceTable16Bit cost:         142502
HammingDistanceDivideConquer cost:      343414
HammingDistanceDivideConquerOpt cost:   316748
HammingDistanceSSE cost:                59322
HammingDistanceAVX cost:                115784

总结

不同平台,不同的编译器版本,测试的结果有所差异。但整体表现上使用SSE指令集的性能最好,其次是使用内建函数计算popcnt性能最优。AVX指令性能略好于SSE。性能最差的是查表法8bit。分治法性能居中。

附录

所有代码都存放在github上面: https://github.com/zuocheng-liu/code-samples/blob/master/algorithm/hamming_distance.cpp

判断平面上点和不规则多边形位置关系算法

本文列举5中计算点和不规则多边形位置关系的算法。

计算机算法实现方案,最好的是弧长法。

射线法 (铅垂线法、水平线法)

射线法是使用最广泛的算法,这是由于相比较其他算法而言,它不但可以正确使用在凹多边形上,而且不需要考虑精度误差问题。

该算法思想是从点出发向右水平做一条射线,计算该射线与多边形的边的相交点个数,当点不在多边形边上时,如果是奇数,那么点就一定在多边形内部,否则,在外部。

其中铅垂线法和水平线法是射线法的两个具体算法,更容易实现,在坐标计算中更方便。

面积法

面积法的思想是如果点在多边形内部或者边上,那么点与多边形所有边组成的三角形面积和等于多边形面积。多边形的面积公式可以用叉积计算。不过计算面积是会有一定误差的,需要设置精度的误差范围。

点线判断法

对于多边形,如果一个点它的所有边的左边,那么这个点一定在多边形内部。利用叉积正好可以判断点与给定边的关系,即点是在边的左边右边还是边上。

转角法

以被测点O为坐标原点,计算其与所有相邻两个多边形顶点点P[i]之间的角度和,所有角度相加后得到360度,即为点在多边形之内,若得到180度,则在多边形边上,0度则在多边形外。

弧长法

弧长法是改进后的转角法,解决了传统转角法的精度问题。算法思想是,以被测点O为坐标原点,将平面划分为4个象限,对每个多边形顶点P[i],计算其所在的象限,然后顺序访问多边形的各个顶点P[i],分析P[i]和P[i+1],有下列三种情况:

  • P[i+1]在P[i]的下一象限。此时弧长和加π/2;

  • P[i+1]在P[i]的上一象限。此时弧长和减π/2;

  • P[i+1]在Pi的相对象限。利用叉积f=y[i+1]x[i]-x[i+1]y[i]计算Op[i]与Op[i+1]的关系,若f=0,Op[i]与Op[i+1]共线,点在多边形边上;若f<0,Op[i+1]在Op[i]逆时针方向,弧长和减π;若f>0,Op[i+1]在Op[i]顺时针方向,弧长和加π。

由于顶点在原点和坐标轴上时需要特殊处理,为了准确性,应该在每次计算顶点时都用叉积判断P是否在当前边上,另外将π用整数代替可以避免浮点数的精度误差。

二进制运算技巧

假设都在x86或x86_64架构CPU上进行运算,二进制基本运算包括,加减乘除、与、或、异或、同或、移位

假设n 为 32 位整形数,取正整数n除以8的余数 : n & 0x07

假设n 为 32 位整形数,取正整数n除以16的余数 : n & 0x0F

假设n 为 32 位整形数,8位对齐 : (n + 7) & 0xFFFFFFF8)

假设n 为 32 位整形数,32位对齐 : (n + 31) & 0xFFFFFF80)

假设n 为 32 位整形数,求8位对齐填充位数 ((n + 7) & 0xFFFFFFF8) – n;

求int 最大值 : (1 << 31) – 1 或者干脆 0x7FFFFFFF

求int 最小值 : 1 << 31

判断奇偶数 : n & 1 == 1

交换a b 两个变量的值 : a ^= b ^= a ^= b;

n 乘以2的m次方 : n << m