汉明距离之算法和实现总结

内容简介

汉明距离,通过比较向量每一位是否相同,求出不同位的个数。用来表示两个向量之间的相似度。

汉明距离计算的步骤,即对两个向量首先进行异或操作,然后对异或的结果的每一位bit进行统计,最后合计出有多少bit的值为1。

本文主要内容为,列举出9种计算汉明距离的算法及其C++代码的实现,并使用本文的测试方法测试得出不同算法的性能。再对不同的算法进行分析比较。

计算机在进行异或操作中,CPU的指令集可以提供多种实现。比如cpu固有指令 xor 和 SSE指令集 、AVX指令集。后两种指令集都是为了提升CPU对向量的处理速度而扩展的指令集。

汉明距离计算的后一步,计算一个变量中所有bit的值为1的个数。可以使用多种算法和实现方式来实现。比如算法上,逐位移位统计、查表法、分治法等;实现方式上,可以使用前面所说的算法,也可以使用cpu的指令popcnt直接求得。

实现算法

本文算法计算的向量为二值向量;

本文中包含算法函数的Algorithm类声明如下,不同计算汉明距离的算法的类都继承这个基类,计算汉明距离由成员函数 uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size)实现,函数输入为3个参数,其中p和q分别为两个向量的指针;size为向量维度的大小,并以64为倍数; 向量最小为64个bit。

class Algorithm {
  public:
    ~Algorithm() {}
    virtual void init() {}
    virtual std::string getName() {
      return "Undefined Algorithm Name.";
    }
    virtual uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) = 0;
};

一般算法

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i) {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        while (r) {
          res += r & 1;
          r = r >> 1;
        }
      }
      return res;
    }

使用gcc内建函数优化一般算法

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i) {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += __builtin_popcountll(r);
      }
      return res;
    }

查表法-按8bit查询

class HammingDistanceTable8Bit : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceTable8Bit";
    }
    void init() {
      pop_count_table_ptr = NULL;
      pop_count_table_8bit_init(&pop_count_table_ptr);
    }
    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_table_8bit(r);
      }
      return res;
    }
  private:
    uint8_t *pop_count_table_ptr; 
    void pop_count_table_8bit_init(uint8_t **pop_count_table_ptr) {
      *pop_count_table_ptr = new uint8_t[256];
      for (int i = 0; i < 256; ++i) {
        (*pop_count_table_ptr)[i] = __builtin_popcount(i);
      }  
    }

    uint64_t pop_count_table_8bit(uint64_t n) {
      int res = 0;
      uint8_t *p = (uint8_t *)&n;
      for (int i = 0; i < 8; ++i) {
        res += pop_count_table_ptr[*(p + i)];
      }
      return res;
    }
};

查表法-按16bit查询

class HammingDistanceTable16Bit : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceTable16Bit";
    }
    void init() {
      pop_count_table_ptr = NULL;
      pop_count_table_16bit_init(&pop_count_table_ptr);
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_table_16bit(r);
      }
      return res;
    }
  private:
    uint8_t *pop_count_table_ptr; 

    void pop_count_table_16bit_init(uint8_t **pop_count_table_ptr) {
      *pop_count_table_ptr = new uint8_t[65536];
      for (int i = 0; i < 65536; ++i) {
        (*pop_count_table_ptr)[i] = __builtin_popcount(i);
      }  
    }

    uint64_t pop_count_table_16bit(uint64_t n) {
      int res = 0;
      uint16_t *p = (uint16_t *)&n;
      for (int i = 0; i < 4; ++i) {
        res += pop_count_table_ptr[*(p + i)];
      }
      return res;
    }
};

分治法

class HammingDistanceDivideConquer : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceDivideConquer";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_divide_conquer(r);
      }
      return res;
    }

    uint64_t pop_count_divide_conquer(uint64_t n) {
      n = (n & 0x5555555555555555) + ((n >> 1 ) & 0x5555555555555555); 
      n = (n & 0x3333333333333333) + ((n >> 2 ) & 0x3333333333333333);
      n = (n & 0x0F0F0F0F0F0F0F0F) + ((n >> 4 ) & 0x0F0F0F0F0F0F0F0F);
      n = (n & 0x00FF00FF00FF00FF) + ((n >> 8 ) & 0x00FF00FF00FF00FF);
      n = (n & 0x0000FFFF0000FFFF) + ((n >> 16) & 0x0000FFFF0000FFFF);
      n = (n & 0x00000000FFFFFFFF) + ((n >> 32) & 0x00000000FFFFFFFF);
      return n;
    }
  private:

};

改进分治法

class HammingDistanceDivideConquerOpt : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceDivideConquerOpt";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      for (uint64_t i = 0; i < size; ++i)
      {
        uint64_t r = (*(p + i)) ^ (*(q + i));
        res += pop_count_divide_conquer_opt(r);
      }
      return res;
    }

    uint64_t pop_count_divide_conquer_opt(uint64_t n) {
      n = n - ((n >> 1)  & 0x5555555555555555); 
      n = (n & 0x3333333333333333) + ((n >> 2 ) & 0x3333333333333333);
      n = (n + (n >> 4 )) & 0x0F0F0F0F0F0F0F0F;
      n = n + (n >> 8 );
      n = n + (n >> 16);
      n = n + (n >> 32);
      return (uint64_t)(n & 0x7F);
    }
  private:

};

使用SSE指令集

class HammingDistanceSSE : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceSSE";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[2] = {0, 0};
      for (uint64_t i = 0; i < size; i += 2) { 
        __m64 *x1 = (__m64*)(p);
        __m64 *x2 = (__m64*)(p + 1);
        __m64 *y1 = (__m64*)(q);
        __m64 *y2 = (__m64*)(q + 1);
        __m128i z1 = _mm_set_epi64 (*x1, *x2);
        __m128i z2 = _mm_set_epi64 (*y1, *y2);
        __m128i xor_res =  _mm_xor_si128(z1 , z2);
        _mm_store_si128((__m128i*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
      }
      return res;
    }
};

使用AVX2指令集

class HammingDistanceAVX : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceAVX";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[4] = {0, 0, 0, 0};
      for (uint64_t i = 0; i < size - 1; i += 4) { 
        long long int *x1 = (long long int*)(p + i);
        long long int *x2 = (long long int*)(p + i + 1);
        long long int *x3 = (long long int*)(p + i + 2);
        long long int *x4 = (long long int*)(p + i + 3);
        long long int *y1 = (long long int*)(q + i);
        long long int *y2 = (long long int*)(q + i + 1);
        long long int *y3 = (long long int*)(q + i + 2);
        long long int *y4 = (long long int*)(q + i + 3);
        __m256i z1 = _mm256_set_epi64x (*x1, *x2, *x3, *x4);
        __m256i z2 = _mm256_set_epi64x (*y1, *y2, *y3, *y4);
        __m256i xor_res =  _mm256_xor_si256(z1 , z2);
        _mm256_store_si256((__m256i*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
        res += _mm_popcnt_u64(temp_res[2]);
        res += _mm_popcnt_u64(temp_res[3]);
      }
      return res;
    }
};

使用AVX512指令集

截止本文完成时,市场上支持avx-512指令的cpu并没有普及,但是gcc已经提供了avx-512的c/c++ 语言接口。本文先把代码实现,后续购得支持avx-512指令的cpu后,再进行测试。

class HammingDistanceAVX : public Algorithm {
  public:
    std::string getName() {
      return "HammingDistanceAVX";
    }

    uint64_t cal(const uint64_t* p, const uint64_t* q, const uint64_t size) {
      uint64_t res = 0;
      uint64_t temp_res[8] = {0, 0, 0, 0, 0, 0, 0, 0};
      for (uint64_t i = 0; i < size - 1; i += 8) { 
        long long int *x1 = (long long int*)(p + i);
        long long int *x2 = (long long int*)(p + i + 1);
        long long int *x3 = (long long int*)(p + i + 2);
        long long int *x4 = (long long int*)(p + i + 3);
        long long int *x5 = (long long int*)(p + i + 4);
        long long int *x6 = (long long int*)(p + i + 5);
        long long int *x7 = (long long int*)(p + i + 6);
        long long int *x8 = (long long int*)(p + i + 7);
        long long int *y1 = (long long int*)(q + i);
        long long int *y2 = (long long int*)(q + i + 1);
        long long int *y3 = (long long int*)(q + i + 2);
        long long int *y4 = (long long int*)(q + i + 3);
        long long int *y5 = (long long int*)(q + i + 4);
        long long int *y6 = (long long int*)(q + i + 5);
        long long int *y7 = (long long int*)(q + i + 6);
        long long int *y8 = (long long int*)(q + i + 7);
        __m512i z1 = _mm512_set_epi64x (*x1, *x2, *x3, *x4, *x5, *x6, *x7, *x8);
        __m512i z2 = _mm512_set_epi64x (*y1, *y2, *y3, *y4);
        __m512i xor_res =  _mm512_xor_si512(z1 , z2);
        _mm512_store_si512((void*)temp_res, xor_res);
        res += _mm_popcnt_u64(temp_res[0]);
        res += _mm_popcnt_u64(temp_res[1]);
        res += _mm_popcnt_u64(temp_res[2]);
        res += _mm_popcnt_u64(temp_res[3]);
        res += _mm_popcnt_u64(temp_res[4]);
        res += _mm_popcnt_u64(temp_res[5]);
        res += _mm_popcnt_u64(temp_res[6]);
        res += _mm_popcnt_u64(temp_res[7]);
      }
      return res;
    }
};

性能测试

测试代码框架类:

class AlgorithmBench {
  public:
    void init() {
      startTimer();
      vector = new uint64_t[size];
      ifstream f("sample.txt", ios::in);
      for(int i = 0; i < size;i++ ) {
        uint64_t c;
        f >> vector[i];
      }
      stopTimer();
      getInterval("load sample cost:");
      f.close();
    }
    void setSize(uint64_t size) { this->size = size;};
    void push_back(Algorithm* algorithm) { _algorithm_vector.push_back(algorithm);}

    void start() {
      for (std::vector<Algorithm*>::iterator iter = _algorithm_vector.begin();
          iter != _algorithm_vector.end();
          ++iter) {
        Algorithm* ptr = *iter;
        ptr->init();
        startTimer();
        for (int i = 0; i < size - 3; ++i) {
          ptr->cal(vector + i, vector + i + 1, 4);
        }
        stopTimer();
        getInterval(ptr->getName() + " cost:");
      }
    }

    void startTimer() {
      gettimeofday(&tv,NULL);
      start_timer = 1000000 * tv.tv_sec + tv.tv_usec;
    }
    void stopTimer() {
      gettimeofday(&tv,NULL);
      end_timer = 1000000 * tv.tv_sec + tv.tv_usec;
    }
    void getInterval(std::string prefix) {
      std::cout<<std::left<<setw(40) << prefix 
        << std::right << end_timer - start_timer<<endl;
    }

  private:
    uint64_t size;
    uint64_t *vector;
    timeval tv;
    uint64_t start_timer;
    uint64_t end_timer;
    std::vector<Algorithm*> _algorithm_vector;
};

编译指令:

g++ -msse4.2 -mavx2 -O2 -o test_hamming hamming_distance.cpp

windows-gcc 测试结果

测试环境:

Windows 7
gcc version 7.4.0

HammingDistanceBase cost:               330066
HammingDistanceBuildin cost:            326566
HammingDistanceTable8Bit cost:          2381976
HammingDistanceTable16Bit cost:         1435287
HammingDistanceDivideConquer cost:      1215243
HammingDistanceDivideConquerOpt cost:   1226745
HammingDistanceSSE cost:                972695
HammingDistanceAVX cost:                680636

Linux-gcc 测试结果

测试环境:

Ubuntu Server 19.04
gcc version 8.3.0

测试结果:

load sample cost:                       78070
HammingDistanceBase cost:               145393
HammingDistanceBuildin cost:            75905
HammingDistanceTable8Bit cost:          598789
HammingDistanceTable16Bit cost:         142502
HammingDistanceDivideConquer cost:      343414
HammingDistanceDivideConquerOpt cost:   316748
HammingDistanceSSE cost:                59322
HammingDistanceAVX cost:                115784

总结

不同平台,不同的编译器版本,测试的结果有所差异。但整体表现上使用SSE指令集的性能最好,其次是使用内建函数计算popcnt性能最优。AVX指令性能略好于SSE。性能最差的是查表法8bit。分治法性能居中。

附录

所有代码都存放在github上面: https://github.com/zuocheng-liu/code-samples/blob/master/algorithm/hamming_distance.cpp