リクルートインターンシップ2013 参加者からの挑戦状を高速化した話
友人が出題してた奴。
http://recruit-jinji.jp/intern2014-engineer/challenge.html
これのQ2.
小文字アルファベットからなる文字列 S,Tが与えられます。 Sの部分文字列の うちTと同じ長さでTとのハミング距離が最も小さいものすべての先頭文字を、 その部分文字列の出現順に出力してください。
最初アルファベットごとにフーリエ変換する想定解を書いて300秒超くらい。
OpenMPでスレッド並列化をやったら120秒弱くらい。
出題者はフーリエ変換の部分をnumpyライブラリに投げたので30秒くらいで実行できたとのこと。
その後別の友人が、「キター!完全に愚直なコードで、119.56秒!GPGPUすごい!」「クラスタ並列もやってみた(CUDA+MPI)!11.73 sec! 」(8台のサーバマシンを使ったらしい)などとやりだして爆笑したので、自分も高速化に手を出してみることにした。
GPUなんて高級な計算機はもってないので、SIMD命令を使ってみることに。
まず愚直解を書いて1800秒。
AVX命令でバイトごとに一致を比較する命令があったので、それを使うと52秒。
6スレッド並列化で12秒。
コードを手動でループアンローリングして、8スレッド並列にしてみたら6.9秒。
で、1ヶ月以上たった今日、唐突に思い出したのでもう少し詰めてみた。
まず、AVX命令のロード命令vmovdqaを使うために、文字列のアラインメントを32にそろえる必要があるので、アドレスを32で割ったあまりごとに、メモリ上で再コピーして、オフセットを調整していた部分。
本当にこれが効果があるのか調べてみると、連続したメモリ領域から順番に読み出していく分にはvmovdqu命令を使っても、1%も性能低下がないらしい。
実際、アラインメントをそろえないコードにしてみると、32回のメモリ転送のオーバーヘッド分だけ速くなった。
次にメモリアクセスの効率化。
文字列S, TをL1キャッシュに乗るサイズのブロックに区切って、ブロック毎に処理をしていったら速いだろうという考えにより。
これは、何故かSをブロックに区切ったらオーバーヘッドのほうが大きくなった。
Tのほうを区切るのは効果があって、L1キャッシュは32KBのはずなのだが、何故か9KBずつに区切るのが一番高速化した。
で、最後に一致数の集計部分の高速化。
32個のバイト型変数をループで足していたのを、AVX命令を使って並列に水平加算を繰り返すことで、7AVX命令くらいで集計できるようになった。
以上の最適化により、5秒を切って、
time elapsed: 4.65603s
の実行結果を出すことができた。
double getnow(); const int MX_S = 3000100; const int MX_T = (300000 | 31) + 1; int len_s, len_t; char S[MX_S]; char T[MX_T]; int ans[MX_S]; inline int solve2(char *s, char *t, int len){ int iter = len / 32 / 128; int res = 0; asm( "vpxor %%ymm4, %%ymm4, %%ymm4\n\t" "vpxor %%ymm5, %%ymm5, %%ymm5\n\t" "vpxor %%ymm6, %%ymm6, %%ymm6\n\t" : : : ); while(iter--){ #define DOUBLE(x) x x #define PROC \ asm( \ "vmovdqu 0(%0), %%ymm0\n\t" \ "vmovdqu 0(%1), %%ymm1\n\t" \ "vpcmpeqb %%ymm0, %%ymm1, %%ymm0\n\t" \ "vpsubb %%ymm0, %%ymm6, %%ymm6\n\t" \ : : "r"(s), "r"(t) : \ ); \ s += 32; t += 32; DOUBLE(DOUBLE(DOUBLE(DOUBLE(DOUBLE(DOUBLE(DOUBLE(PROC))))))) asm( "vpunpcklbw %%ymm5, %%ymm6, %%ymm0\n\t" "vpunpckhbw %%ymm5, %%ymm6, %%ymm1\n\t" "vpaddw %%ymm0, %%ymm1, %%ymm6\n\t" "vphaddw %%ymm6, %%ymm6, %%ymm6\n\t" "vphaddw %%ymm6, %%ymm6, %%ymm6\n\t" "vphaddw %%ymm6, %%ymm6, %%ymm6\n\t" "vpaddw %%ymm6, %%ymm4, %%ymm4\n\t" "vpxor %%ymm6, %%ymm6, %%ymm6\n\t" : : : ); } iter = len / 32 % 128; while(iter--){ PROC } unsigned short tmp[16] = {}; asm( "vpunpcklbw %%ymm5, %%ymm6, %%ymm0\n\t" "vpunpckhbw %%ymm5, %%ymm6, %%ymm1\n\t" "vpaddw %%ymm0, %%ymm1, %%ymm6\n\t" "vphaddw %%ymm6, %%ymm6, %%ymm6\n\t" "vphaddw %%ymm6, %%ymm6, %%ymm6\n\t" "vphaddw %%ymm6, %%ymm6, %%ymm6\n\t" "vpaddw %%ymm6, %%ymm4, %%ymm4\n\t" "vmovdqu %%ymm4, %0\n\t" : "=m"(tmp) : : ); res += tmp[0] + tmp[8]; return res; } void solve(){ const int N = len_s - len_t + 1; const int BLOCK_SIZE = 9 * 1024; //9KBずつ for(int jj = 0; jj < len_t; jj += BLOCK_SIZE){ int cur_len = min(BLOCK_SIZE, len_t - jj); int loop_i = N; #ifdef _OPENMP omp_set_num_threads(7); #endif #pragma omp parallel for rep(i, loop_i){ ans[i] += solve2(S + i + jj, T + jj, cur_len); } } int mx = *max_element(ans, ans + N); rep(i, N) if(ans[i] == mx) putchar(S[i]); puts(""); } int main(){ double start = getnow(); gets(S); gets(T); len_s = strlen(S); len_t = strlen(T); dbg(len_s); dbg(len_t); while(len_t % 32) T[len_t++] = -1; solve(); cerr << "time elapsed: " << getnow() - start << "s" << endl; return 0; } #include<sys/time.h> double getnow(){ struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec + tv.tv_usec * 1e-6; }
SIMD命令を使うのは初めてだったので楽しかった。
多分フーリエ変換解法を並列化して、クラスタ並列+GPGPUで、フーリエ変換の部分をGPU用のフーリエ変換ライブラリを使ってやるのが最強だと思うけど、環境のある方は誰かやってみてください。1秒切れると思います。