pthread_mutexはプロセスを越える
futex(2) を使ってせっせとmutexとconditionを実装していたら、意外なところに盲点が。
Linux では pthread_mutex と pthread_cond がプロセスを越えられる。移植性は無いようで、Mac OS Xでは動かなかった。
やり方は mmap(2) に MAP_SHARED フラグを渡して2つのプロセス間でメモリ空間を共有しておき、そのメモリを pthread_mutex_t や pthread_cond_t にキャストして使う。ただしアドレスはページサイズの倍数でないとうまくいかない。(※動くかも?)
※追記:pthread_mutex/condはfutexで実装されていたのでした
※追記:これは linux-2.6.22.9 + glibc-2.6.1 での挙動ですが、おそらくバグです。少なくとも linux-2.6.27 + glibc-2.8 では pthread_mutexattr_setpshared と pthread_condattr_setpshared で PTHREAD_PROCESS_SHARED をセットしないとプロセスを越えられません
実は勘違いかも知れないので↓このテストコードを動かして試してみて欲しい :-p
$ make $ # メッセージ受け取り側を起動 $ ./cli > x $ # 別の端末でメッセージ送信側を起動 $ # 30スレッドで200,000個のメッセージを送る $ ./app 200000 30 > y $ # 送った数と受け取った数が一致している $ wc -l x y 6000000 x 6000000 y 12000000 total
何故こんなことを試していたかというと、libmemcachedの関数をフックして、別のプロセスからデータを取得するライブラリを書きたかったから。
アプリケーションののコードを変えなくてもLD_PRELOADでフックライブラリをぶち込んでやれば、別のプロセスでリクエストを受け渡したりできる。同一のホスト内でmemcachedサーバーを立てていると、プロトコルをパースしたりするオーバーヘッドは無駄になる。特に遅延に大きく影響するので、小さなデータが多いと激しく遅くなる。
そこで別のプロセスにリクエストを受け渡すとき、カーネル空間を経由せずに高速に受け渡せないか、というモチベーション。
具体的な用途は、ストレージのクライアント側でリッチなことをやりたいが、アプリケーションにリッチなライブラリをリンクするのははばかられるので、別のプロセスに分けたいとき。
分散ストレージを実装するとき、サーバーの負荷を減らすためにクライアントに多くの機能を委譲する設計にしたいのだけど、アプリケーション毎にクライアントインスタンスを立ち上げるとリソースをたくさん消費してしまうので避けたい。
それから、memcachedのバイナリプロトコルは同期プロトコル*1なのにも関わらずパイプライン化を行うので、非常に実装しにくい。
上記の分散ストレージと関係するのだが、アプリケーションから(memcachedプロトコルで)リクエストを受け取ってから、別のサーバーにリクエストを投げるたりする。当然サーバーごとに応答時間は違うので、リクエストを投げた順番通りにレスポンスが返ってくるとは限らない。しかしアプリケーションに返すときに順番をそろえて返さないといけない。イケてない。
そもそもアプリケーションのコードは get_multi(key1, key2, key3); みたいなコードなのだから、順番は関係なくすべてのサーバーからレスポンスが得られれば値を返せる。つまりmemcachedプロトコルを介さずに直接リクエストを受け取れると嬉しい。
common.h
共通のヘッダ。mmapに使うファイルのパスを定義している。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <fcntl.h> #include <unistd.h> #include <sys/time.h> #include <errno.h> #include <pthread.h> #include <sys/mman.h> #include <stdint.h> static const char* const vecfile = "mapfile"; static const size_t vecsz = 1024 * 1024 * 16; static const char* const ctlfile = "mapfile.ctl"; static const size_t ctlsz = 1024 * 1024;
app.cc
リクエストを投げる側のコード。
#include "common.h" static int num_send; static volatile size_t* sz; static volatile int* array; static char* ctlp; static long pagesz; void* func(void*) { pthread_mutex_t* mutex = (pthread_mutex_t*)&ctlp[0]; pthread_cond_t* cond = (pthread_cond_t*)&ctlp[pagesz]; int err; for(int i=0; i < num_send; ++i) { //printf("locking...\n"); if((err=pthread_mutex_lock(mutex))) { printf("pthread_mutex_lock failed: %s\n", strerror(err)); exit(1); } //printf("locked\n"); array[(*sz)++] = i; printf("** send %d **\n", i); //printf("size = %lu\n", *sz); if((err=pthread_cond_signal(cond))) { printf("pthread_cond_signal failed: %s\n", strerror(err)); exit(1); } if((err=pthread_mutex_unlock(mutex))) { printf("pthread_mutex_unlock failed: %s\n", strerror(err)); exit(1); } } fflush(stdout); return NULL; } int main(int argc, char* argv[]) { if(argc < 3) { fprintf(stderr, "usage: <num_send> <threads>\n"); exit(1); } num_send = atoi(argv[1]); unsigned int num_threads = atoi(argv[2]); int vecfd = open(vecfile, O_RDWR); if(vecfd < 0) { perror("open"); exit(1); } char* vecp = (char*)mmap(NULL, vecsz, PROT_READ|PROT_WRITE, MAP_SHARED, vecfd, 0); if(!vecp) { perror("mmap"); exit(1); } sz = (volatile size_t*)&vecp[0]; array = (volatile int*)&vecp[sizeof(size_t)]; int ctlfd = open(ctlfile, O_RDWR); if(ctlfd < 0) { perror("open"); exit(1); } ctlp = (char*)mmap(NULL, ctlsz, PROT_READ|PROT_WRITE, MAP_SHARED, ctlfd, 0); if(!ctlp) { perror("mmap"); exit(1); } pagesz = sysconf(_SC_PAGESIZE); if(pagesz < 0) { perror("sysconf"); exit(1); } if(pagesz < sizeof(pthread_mutex_t)) { pagesz = sizeof(pthread_mutex_t); } pthread_t th[num_threads]; for(unsigned int i=0; i < num_threads; ++i) { pthread_create(&th[i], NULL, func, NULL); } for(unsigned int i=0; i < num_threads; ++i) { pthread_join(th[i], NULL); } }
cli.cc
リクエストを受け取る側のコード。
#include "common.h" int main(int argc, char* argv[]) { int vecfd = open(vecfile, O_RDWR|O_CREAT, S_IRWXU); if(vecfd < 0) { perror("open"); exit(1); } if( ftruncate(vecfd, vecsz) < 0) { perror("ftruncate"); exit(1); } char* vecp = (char*)mmap(NULL, vecsz, PROT_READ|PROT_WRITE, MAP_SHARED, vecfd, 0); if(!vecp) { perror("mmap"); exit(1); } volatile size_t* sz = (volatile size_t*)&vecp[0]; volatile int* array = (volatile int*)&vecp[sizeof(size_t)]; int ctlfd = open(ctlfile, O_RDWR|O_CREAT, S_IRWXU); if(ctlfd < 0) { perror("open"); exit(1); } if( ftruncate(ctlfd, ctlsz) < 0) { perror("ftruncate"); exit(1); } char* ctlp = (char*)mmap(NULL, ctlsz, PROT_READ|PROT_WRITE, MAP_SHARED, ctlfd, 0); if(!ctlp) { perror("mmap"); exit(1); } long pagesz = sysconf(_SC_PAGESIZE); if(pagesz < 0) { perror("sysconf"); exit(1); } if(pagesz < sizeof(pthread_mutex_t)) { pagesz = sizeof(pthread_mutex_t); } pthread_mutex_t* mutex = (pthread_mutex_t*)&ctlp[0]; pthread_cond_t* cond = (pthread_cond_t*)&ctlp[pagesz]; pthread_mutex_init(mutex, NULL); pthread_cond_init(cond, NULL); *sz = 0; int err; while(true) { //printf("locking...\n"); if((err=pthread_mutex_lock(mutex))) { printf("pthread_mutex_lock failed: %s\n", strerror(err)); exit(1); } //printf("locked\n"); while(*sz == 0) { //printf("wating...\n"); if((err=pthread_cond_wait(cond, mutex))) { printf("pthread_cond_wait failed: %s\n", strerror(err)); exit(1); } //printf("waked\n"); } //printf("size = %lu\n", *sz); for(size_t i=0; i < *sz; ++i) { printf("** recv %d **\n", array[i]); fflush(stdout); } *sz = 0; if((err=pthread_mutex_unlock(mutex))) { printf("pthread_mutex_unlock failed: %s\n", strerror(err)); exit(1); } } }