リクルートインターンシップ2013 参加者からの挑戦状を高速化した話

友人が出題してた奴。
http://recruit-jinji.jp/intern2014-engineer/challenge.html
これのQ2.

小文字アルファベットからなる文字列 S,Tが与えられます。
Sの部分文字列の
うちTと同じ長さでTとのハミング距離が最も小さいものすべての先頭文字を、
その部分文字列の出現順に出力してください。


最初アルファベットごとにフーリエ変換する想定解を書いて300秒超くらい。
OpenMPでスレッド並列化をやったら120秒弱くらい。
出題者はフーリエ変換の部分をnumpyライブラリに投げたので30秒くらいで実行できたとのこと。


その後別の友人が、「キター!完全に愚直なコードで、119.56秒!GPGPUすごい!」「クラスタ並列もやってみた(CUDA+MPI)!11.73 sec! 」(8台のサーバマシンを使ったらしい)などとやりだして爆笑したので、自分も高速化に手を出してみることにした。


GPUなんて高級な計算機はもってないので、SIMD命令を使ってみることに。
まず愚直解を書いて1800秒。
AVX命令でバイトごとに一致を比較する命令があったので、それを使うと52秒。
6スレッド並列化で12秒。
コードを手動でループアンローリングして、8スレッド並列にしてみたら6.9秒。


で、1ヶ月以上たった今日、唐突に思い出したのでもう少し詰めてみた。


まず、AVX命令のロード命令vmovdqaを使うために、文字列のアラインメントを32にそろえる必要があるので、アドレスを32で割ったあまりごとに、メモリ上で再コピーして、オフセットを調整していた部分。
本当にこれが効果があるのか調べてみると、連続したメモリ領域から順番に読み出していく分にはvmovdqu命令を使っても、1%も性能低下がないらしい。
実際、アラインメントをそろえないコードにしてみると、32回のメモリ転送のオーバーヘッド分だけ速くなった。


次にメモリアクセスの効率化。
文字列S, TをL1キャッシュに乗るサイズのブロックに区切って、ブロック毎に処理をしていったら速いだろうという考えにより。
これは、何故かSをブロックに区切ったらオーバーヘッドのほうが大きくなった。
Tのほうを区切るのは効果があって、L1キャッシュは32KBのはずなのだが、何故か9KBずつに区切るのが一番高速化した。


で、最後に一致数の集計部分の高速化。
32個のバイト型変数をループで足していたのを、AVX命令を使って並列に水平加算を繰り返すことで、7AVX命令くらいで集計できるようになった。


以上の最適化により、5秒を切って、

time elapsed: 4.65603s

の実行結果を出すことができた。

double getnow();


const int MX_S = 3000100;
const int MX_T = (300000 | 31) + 1;

int len_s, len_t;
char S[MX_S];
char T[MX_T];

int ans[MX_S];

inline int solve2(char *s, char *t, int len){
	int iter = len / 32 / 128;
	int res = 0;
	
	asm(
		"vpxor %%ymm4, %%ymm4, %%ymm4\n\t"
		"vpxor %%ymm5, %%ymm5, %%ymm5\n\t"
		"vpxor %%ymm6, %%ymm6, %%ymm6\n\t"
		: : :
	);
	while(iter--){
		
		#define DOUBLE(x) x x
		#define PROC \
		asm( \
			"vmovdqu 0(%0), %%ymm0\n\t" \
			"vmovdqu 0(%1), %%ymm1\n\t" \
			"vpcmpeqb %%ymm0, %%ymm1, %%ymm0\n\t" \
			"vpsubb %%ymm0, %%ymm6, %%ymm6\n\t" \
			: : "r"(s), "r"(t) : \
		); \
		s += 32; t += 32;
		
		DOUBLE(DOUBLE(DOUBLE(DOUBLE(DOUBLE(DOUBLE(DOUBLE(PROC)))))))
		
		asm(
			"vpunpcklbw %%ymm5, %%ymm6, %%ymm0\n\t"
			"vpunpckhbw %%ymm5, %%ymm6, %%ymm1\n\t"
			"vpaddw %%ymm0, %%ymm1, %%ymm6\n\t"
			
			"vphaddw %%ymm6, %%ymm6, %%ymm6\n\t"
			"vphaddw %%ymm6, %%ymm6, %%ymm6\n\t"
			"vphaddw %%ymm6, %%ymm6, %%ymm6\n\t"
			
			"vpaddw %%ymm6, %%ymm4, %%ymm4\n\t"
			"vpxor %%ymm6, %%ymm6, %%ymm6\n\t"
			: : :
		);
	}
	iter = len / 32 % 128;
	while(iter--){
		PROC
	}
	
	unsigned short tmp[16] = {};
	asm(
		"vpunpcklbw %%ymm5, %%ymm6, %%ymm0\n\t"
		"vpunpckhbw %%ymm5, %%ymm6, %%ymm1\n\t"
		"vpaddw %%ymm0, %%ymm1, %%ymm6\n\t"
		
		"vphaddw %%ymm6, %%ymm6, %%ymm6\n\t"
		"vphaddw %%ymm6, %%ymm6, %%ymm6\n\t"
		"vphaddw %%ymm6, %%ymm6, %%ymm6\n\t"
		
		"vpaddw %%ymm6, %%ymm4, %%ymm4\n\t"
		"vmovdqu %%ymm4, %0\n\t"
		: "=m"(tmp) : :
	);
	res += tmp[0] + tmp[8];
	return res;
}

void solve(){
	
	const int N = len_s - len_t + 1;
	const int BLOCK_SIZE = 9 * 1024; //9KBずつ
	
	
	for(int jj = 0; jj < len_t; jj += BLOCK_SIZE){
		
		int cur_len = min(BLOCK_SIZE, len_t - jj);
		int loop_i = N;
		
		#ifdef _OPENMP
		omp_set_num_threads(7);
		#endif
		#pragma omp parallel for
		rep(i, loop_i){
			ans[i] += solve2(S + i + jj, T + jj, cur_len);
		}
	}
	int mx = *max_element(ans, ans + N);
	rep(i, N) if(ans[i] == mx) putchar(S[i]);
	puts("");
}


int main(){
	double start = getnow();
	gets(S);
	gets(T);
	len_s = strlen(S);
	len_t = strlen(T);
	
	dbg(len_s);
	dbg(len_t);
	
	while(len_t % 32) T[len_t++] = -1;
	
	solve();
	cerr << "time elapsed: " << getnow() - start << "s" << endl;
	return 0;
}

#include<sys/time.h>
double getnow(){
	struct timeval tv;
	gettimeofday(&tv, NULL);
	return tv.tv_sec + tv.tv_usec * 1e-6;
}

SIMD命令を使うのは初めてだったので楽しかった。


多分フーリエ変換解法を並列化して、クラスタ並列+GPGPUで、フーリエ変換の部分をGPU用のフーリエ変換ライブラリを使ってやるのが最強だと思うけど、環境のある方は誰かやってみてください。1秒切れると思います。