vimでShift-oを押して前の行に改行を入れるときに1秒ほど待たされる
のを修正する方法:
set ttimeoutlen=10
HDDイメージをブロックデバイスにマップするスクリプト for linux
が転がっていたので忘れないうちに貼り付けておこう。
パーティションイメージではなくてHDDイメージ。VMを使っているとHDDイメージの中のパーティションを mount したくなるときがあって、このスクリプトを使うとお手軽に個々のパーティションをそれぞれブロックデバイスに割り付けられる。
sfdisk を使ってパーティションのオフセットを解析して、Device Mapper に割り当てている。
split-hddimg
#!/bin/bash if [ -z "$2" ]; then echo "Usage: `basename $0` <hdd image> </dev/loopX>" exit 1 fi img=$1 dev=$2 if ! losetup $dev $img; then echo "losetup failed." exit 1 fi sfdisk=$(which sfdisk) dmsetup=$(which dmsetup) # expand link if [ -L $dev ];then realdev=$(dirname $dev)/$(readlink $dev) else realdev=$dev fi devdir=$(dirname $realdev) devfile=$(basename $realdev) # expand relative path if ! echo "$devdir" | grep -q '^/'; then devdir="$(pwd)/devdir/" else devdir="$devdir/" fi # expand . while echo "$devdir" | grep -q '/\./'; do devdir="$(echo "$devdir" | sed -e 's/\/\.\//\//g')" done # expand .. while echo "$devdir" | grep -q '/[^/][^/]*/\.\./'; do devdir="$(echo "$devdir" | sed -e 's/\/[^/][^/]*\/\.\.\//\//g')" done device="$devdir$devfile" if [ ! -b "$device" -o ! -r "$device" ]; then echo "Error: Block device $1 can't be accessed" losetup -d $dev exit 1 fi if [ ! -n "$sfdisk" -o ! -x "$sfdisk" ]; then echo "Error: sfdisk utility not found" losetup -d $dev exit 1 fi if [ ! -n "$dmsetup" -o ! -x "$dmsetup" ]; then echo "Error: dmsetup utility not found" losetup -d $dev exit 1 fi if [ ! -c /dev/mapper/control ];then modprobe dm_mod fi $sfdisk -d "$device" | grep "^ *$devdir" | sed -e 's/[=,]/ /g' | awk '{ print $1 " " $4 " " $6 }' | \ while read DEV START SIZE; do DEV="$(echo "$DEV" | sed -e 's/^\/dev\///' -e 's/\//-/g')" test "$SIZE" -gt 0 || continue echo "/dev/mapper/$DEV $START $SIZE" echo "0 $SIZE linear $device $START" | $dmsetup create $DEV done echo "OK. Use unsplit-hddimg $dev to unmap partitions."
unsplit-hddimg
#!/bin/bash if [ -z "$1" ]; then echo "Usage: `basename $0` </dev/loopX>" exit 1 fi dev=$1 sfdisk=$(which sfdisk) dmsetup=$(which dmsetup) # expand link if [ -L $dev ];then realdev=$(dirname $dev)/$(readlink $dev) else realdev=$dev fi devdir=$(dirname $realdev) devfile=$(basename $realdev) # expand relative path if ! echo "$devdir" | grep -q '^/'; then devdir="$(pwd)/devdir/" else devdir="$devdir/" fi # expand . while echo "$devdir" | grep -q '/\./'; do devdir="$(echo "$devdir" | sed -e 's/\/\.\//\//g')" done # expand .. while echo "$devdir" | grep -q '/[^/][^/]*/\.\./'; do devdir="$(echo "$devdir" | sed -e 's/\/[^/][^/]*\/\.\.\//\//g')" done device="$devdir$devfile" if [ ! -b "$device" -o ! -r "$device" ]; then echo "Error: Block device $1 can't be accessed" exit 1 fi if [ ! -n "$sfdisk" -o ! -x "$sfdisk" ]; then echo "Error: sfdisk utility not found" exit 1 fi if [ ! -n "$dmsetup" -o ! -x "$dmsetup" ]; then echo "Error: dmsetup utility not found" exit 1 fi $sfdisk -d "$device" | grep "^ *$devdir" | sed -e 's/[=,]/ /g' | awk '{ print $1 " " $4 " " $6 }' | \ while read DEV START SIZE; do DEV="$(echo "$DEV" | sed -e 's/^\/dev\///' -e 's/\//-/g')" test "$SIZE" -gt 0 || continue echo "removing $DEV" $dmsetup remove $DEV done losetup -d $device
非同期プロトコルのクライアント
非同期プロトコルとは、サーバーから返ってくる応答が、必ずしも要求した順番通りに返ってこないプロトコル(ソース無し。オレオレ定義)。
順不同で返ってくる応答と要求を対応づけるのはクライアントの仕事で、典型的には要求の中にシーケンス番号を入れておき、サーバーは要求と同じシーケンス番号を応答の中にも含める。
例:MessagePack-RPC
非同期プロトコルの特徴:
- イベント駆動型のサーバーの場合、サーバーの実装が簡単になる
- 同期プロトコルだと順番を揃えてから返さないといけない。サーバーの実装が(要求1つに対してスレッドを割り当てて処理するのではなく)ソケット1つに対してスレッドを割り当てて処理する方式だとあまり関係なくて、特に実装は簡単にならない。
- 処理が重い要求と軽い要求を続けて送っても、重い要求に詰まって後の応答が返ってこなくなることが無い
- 同期プロトコルだと、応答を送り返すにはその前の応答を全部返してからでないと送れない。ネットワーク帯域を効率よく使い切れなかったり、すべての応答を受け取り終わるまでの遅延が増える可能性がある。
- クライアントの実装が複雑になる
- これからこの件について。
以下 分類・名前は勝手に付けた。
コールバック型API
イベント駆動型のサーバーを実装する延長で考えると、応答をコールバック関数で受け取る方式を思いつく。
コールバック関数(...)
{
// 応答が返ってくると呼ばれる
いろいろ処理の続き...
}
いろいろ処理...
call(内容, コールバック関数, コールバック関数に引き継ぐ変数);
call(内容, コールバック関数, コールバック関数に引き継ぐ変数);
call(内容, コールバック関数, コールバック関数に引き継ぐ変数);
要求を送るたびに関数が途切れるので、アプリケーションが非常に書きにくい。
クライアントから受け取った要求を別のサーバーに転送するサーバー(プロキシ)を実装するには便利な方式。逆に言えば非同期プロトコルだと(イベント駆動で並列性の高い)プロキシはとても実装しやすい。
基本的にはマルチスレッドなサーバーで使う方式。
send/join型API
sendで送った要求に対する応答を、joinで待ち受ける(同期する)方式。非同期プロトコルの性能上の利点を最大限に活かせるAPI。
要求1を送信 // 要求1 = send(...); 要求2を送信 // 要求2 = send(...); 何か別の処理を行う... 要求1を待つ // 要求1.join(); 要求1を使った処理を行う... 要求2を待つ // 要求2.join(); 要求2を使った処理を行う...
要求2の応答が返ってくるのが遅れても、要求1を使った処理をしている間に届けば遅延を隠蔽できる = 性能が向上する。
要求を1つしか送らないなら、
send(...).join();
と書けばいいだけで、(同期プロトコルのAPIと同じようにでも使えるという意味で)使い勝手も良い。
memcachedクライアントの cloudy*1 は この方式のAPIを実装 している。memcachedのバイナリプロトコルにはクライアントから送った変数をそのまま返してくれるフィールドがあり、そこにシーケンス番号を入れている。
この方式を実装するには、前回のネットワークプログラムのI/O戦略 で シングルスレッド・イベント駆動 のところに書いた方法を使える:
send(内容) { 要求構造体を生成 // 要求 = new 要求構造体(); // 要求.result = nil; シーケンス番号を生成 // シーケンス番号 = ++roud_robin_variable; 要求構造体をテーブルに登録 // テーブル[シーケンス番号] = 要求; サーバーに要求を送る // write(ソケット, 要求); イベントハンドラを登録 // イベントループ.add(ソケット, 要求); return 要求; } join(要求) { 要求 に対するの応答が届いていない間 { // while( 要求.result == nil ) { イベントループを回す... // イベントループ.next(); } // } return 要求.result; } イベントハンドラ(...) { プロトコルを解析 応答が1つ分届いたら { テーブルから要求構造体を取り出す // 要求 = テーブル[シーケンス番号] && テーブル.remove(シーケンス番号); 要求構造体に結果かエラーを埋める // 要求.result = 結果かエラー; } }
関数呼び出しが多かったり、応答を受け取るたびにテーブルを引く操作が入るので、1つの要求の処理が非常に軽いサービスだとクライアントのCPU負荷が無視できないほど高くなってしまう可能性がある。
でもがんばって最適化すればだいぶ改善できると思う。登録されているイベントハンドラが1つだけならpollせずにreadでブロックする + SO_RCVTIMEOを使う、要求構造体はヒープではなくスタック(sendの呼び出し元)に置くなど。
scatter/gather型API
send/join 方式も良いが、そこまで細かく指定したいという状況はたぶんあまり多くない。たくさんsendして一気に待ち受けられれば十分で、そっちの方が便利だという方式。
// プール = new イベントループ(); 要求1を送信 // プール.add( send(...) ); 要求2を送信 // プール.add( send(...) ); 何か別の処理を行う... すべての応答が返ってくるのを待つ // プール.join(); 要求1や2を使った処理を行う...
この方式はsend/join型 API の延長で実装できる。
cloudyはこの方式のAPIも実装している。
send/fetch型API
要求と応答の対応付けをクライアントライブラリの中に隠蔽せずに、アプリケーション側で対応づける方式。
要求1を送信 // id1 = send(...); 要求2を送信 // id2 = send(...); 2回ループ { // for(i=0; i < 2; ++i) { 応答を受信 // id, 内容 = fetch(); 要求1の応答なら: // if(id == id1) { ... // id1 = 0; ... 要求2の応答なら: // } else if(id == id2) { ... // id2 = 0; ... それ以外なら // } else { エラー // break; // } } // }
クライアントライブラリの実装は簡単になる(テーブルを管理しなくて済む)が、アプリケーションは複雑になる。アプリケーションの知識を使って最大限に最適化できる可能性があるが、クライアントでそんなに最適化するのは面倒なのでやらない気がする。
早く届いた応答から順に処理したい場合には有効。
libmemcached の memcached_mget / memcached_fetch はこの方式…のように見えるが、memcachedのプロトコルは同期プロトコルだから応答は要求した順番に通りに届くため、要求した順番通りに同期できる…と見せかけて、valueが見つからなかったときは「見つからなかった」と返ってくるのではなく単に通りすぎられたりするので結構厄介、
応答が2回ダブって返ってきた場合の対処などは、基本的にはアプリケーション側でエラー処理をする必要がある。クライアントライブラリ側でテーブルを管理してチェックしてもいいかもしれない。
同期型API
要求1を送信して応答を待つ // call(...); 要求2を送信して応答を待つ // call(...);
非同期プロトコルだからと言って常に非同期で使いたいわけではない。実は同期型で使う場合がほとんどだという場合は、send/join型やscatter/gather型を使うとテーブルを引くなどCPUを喰うのでもったいない。そこで同期型のAPIを別に用意すると最適化ができる。
call(内容) { テーブルが空でないなら { // パイプライン化しているとこの最適化は使えない return join( send(内容) ); } シーケンス番号を生成 // シーケンス番号 = ++roud_robin_variable; サーバーに要求を送る // write(ソケット, 要求); 応答が1つ分届くまで { 応答を待ち受ける // read(ソケット, 要求); もしシーケンス番号が違っていたらエラー return 結果 } }
シーケンス番号の生成(インクリメント1つ)と、応答のシーケンス番号が違っていたときのチェック(if文1つ)が入るだけで、同期プロトコルと比べてほとんどオーバーヘッドが無い。
パイプライン化しないのであれば非同期プロトコルは同期プロトコルの上位互換性があると言える。
汎用的なプロトコル
非同期プロトコルで性能を出すには、気合いを入れてクライアントライブラリを実装する必要があるので大変。そこで汎用的なプロトコルを作ってクライアントを実装しておくと便利に使える。そんなわけで MessagePack-RPC でした。
ネットワークプログラムのI/O戦略
図解求む。
以下「プロトコル処理」と「メッセージ処理」を分けて扱っているが、この差が顕著に出るのは全文検索エンジンや非同期ジョブサーバーなど、小さなメッセージで重い処理をするタイプ。ストリーム指向のプロトコルの場合は「プロトコル処理」を「ストリーム処理」に置き換えるといいかもしれない。
シングルスレッド・イベント駆動
コネクションN:スレッド1。epoll/kqueue/select を1つ使ってイベントループを作る。
マルチコアCPUでスケールしないので、サーバーでは今時このモデルは流行らない。
クライアントで非同期なメッセージングをやりたい場合はこのモデルを使える:
1コネクション1スレッド
コネクションN:スレッドN。accept(2)またはconnect(2)するごとに新しいスレッドを立て、ソケットを閉じたらスレッドも終了する。
プロトコル処理もメッセージ処理もスレッドを1本占有してコールスタックを切らずに書けるので、コードの見通しが良くなる。
コネクションが増えるとスレッドも増えるので、いわゆるC10K問題にハマる。メッセージを送るたびにコネクションを張り直すプロトコルだと、スレッドを作る遅延が無視できない。スレッドをプロセスにしても同じ。
Webサーバーには本当に向いてない、と言うか何でコレにするかワカラン*1。
スレッドプール
コネクションN:スレッドM。epoll/kqueue/select を1つ使ってイベントループを作る。重いタスクはスレッドプールに投入して実行する。
サーバーで良くあるパターン。イベントハンドラはプロトコルを解析してメッセージを切り出し、メッセージをスレッドプールに投入する。
イベントハンドラはシングルスレッドで動くので、プロトコル処理が重いとマルチコアCPUでスケールしない。メッセージの処理が非常に軽いプログラムの場合、スレッドプールに投入 -> コンテキストスイッチ する遅延が無視できないことがある。
スレッドプールにタスクを投入する典型的なデータ構造はBlockingQueue。
マルチスレッド・イベント駆動(scatter方式)
コネクションN:スレッドM。それぞれのスレッドで epoll/kqueue/select を作り、accept(2) または connect(2) したソケットを各スレッドに振り分ける。スレッドの数はCPUのコア数に応じて固定にするか、負荷に応じて数を変えるなど。
スレッドプール方式と違ってメッセージを処理するときにコンテキストスイッチが発生しない。
プロトコル処理もメッセージ処理もスレッドを1本占有して書ける…と思いきや、あるソケットに届いたデータを処理している間は、そのスレッドに割り振られた他のソケットに届いたデータは処理されないので、イベント駆動にしないとスケールしない・遅延が増える。
メッセージ処理だけでもコールスタックを切らずに書きたい場合は、次のスレッドプールを組み合わせる方法を使う。
マルチスレッド・イベント駆動(scatter方式)+スレッドプール
コネクションN:スレッドM。それぞれのスレッドで epoll/kqueue/select を作り、accept(2) または connect(2) したソケットを各スレッドに振り分ける。イベントハンドラでプロトコルを処理してメッセージを切り出し、メッセージをスレッドプールに投入する。
メッセージ処理は書きやすいが、プロトコル処理はイベント駆動でしか書けない*2。メッセージ処理もあまりに長くブロックするとスレッドが足りなくなる可能性がなきにしもあらず。それなら1コネクション・1スレッドでもいいような。
スレッドプールにタスクを投入するときにコンテキストスイッチが発生するので、軽いメッセージが大量に届く用途だとコンテキストスイッチの遅延が無視できないことがある。
あとコード量が増える。次のwavy方式と比べるとスレッド数が無駄に増える。
※2009-07-13 追記:
nginx-0.7.x はこの方式(スレッドの代わりにプロセス)。親プロセスがaccept(2)してworkerプロセスにファイルディスクリプタを振り分けるのではなく、workerプロセスが早い者勝ちでaccept(2)する。このためファイルディスクリプタは必ずしも均等には分散されず、タイミングによって1つworkerプロセスに偏ったりする。しかしその方がaccept(2)周辺の負荷や遅延は小さいため、コネクションを細かく切ったり張ったりするケースではおそらく有効。
プロセス間でファイルディスクリプタを転送するのは移植性があまり高くない(Mac OS Xの実装が壊れ気味)という事情もあるかもしれない。
See also:prefork サーバーと thundering herd 問題, Thundering Herd のやつ
マルチスレッド・イベント駆動(wavy方式)
コネクションN:スレッドM。それぞれのスレッドでイベントループを作るが、1つの epoll/kqueue/select を共有する。当店オススメの方式w
プロトコル処理もメッセージ処理もスレッド1本占有して書ける。ただスレッドプールと同様にあまりに長くブロックするとスレッド数が足りなくなるので、基本的にはイベント駆動にした方がスケールする。メッセージ処理のたびにコンテキストスイッチが発生することはない。
イベントループの実装が難しい。実装方法は mp::wavy::coreimpl::operator() を参照。
詳しくは マルチコア時代の高並列性IOアーキテクチャ Wavy(Fiberのくだりは使えないのでスルーで)
Fiber方式
他の方式とは一線を画するダークホース。
Cagra はシングルスレッド・イベント駆動で動作するコア部分で fiber をスケジューリングしていた。他のやり方もあるかもしれない。マルチスレッド・イベント駆動(scatter方式)で fiber をスケジューリングするなど。
実装例は mp::fiber など。たぶん5回くらい読み返さないと分からないほどに難解。複数の fiber にまたがる処理など、複雑な処理を書きたくなるとちょっとヤバい。
UNIX系のOSでは ucontext という関数を使って fiber を作る。しかし ucontext で作った fiber は、それを作ったスレッドでしか実行できないという相当に厳しい制限がある。あと Mac OS X では挙動がアヤシイ(と言うか完全にバグがある)ので、ucontext は直接使わずに Io libcoroutine のようなライブラリを使うと良い。
※2009-07-13 追記:ucontext がスレッドをまたげない件は要検証。
送信側の戦略
経験上、1つの epoll/kqueue/select でread待ちとwrite待ちの両方をサポートすると、コードが複雑になるので避けた方が良い。read待ち専用の epoll/kqueue/select とwrite待ち専用の epoll/kqueue/select を作るとシンプルに書ける。
read待ちとwrite待ちで別のスレッドにしても良いが、epoll/kqueue/eventport(selectはムリ)はネスト(epoll の中に epoll を入れる)ができるので、read待ちのイベントハンドラの1つとしてwrite待ちをする手もある。mp::wavyはその方式。mp::wavy::out がwrite待ち epoll/kqueue のイベントハンドラになっている。
イベント駆動型のプロトコルパーサ
プロトコル処理をイベント駆動で書けるとI/O戦略の選択肢が広がる。
memcachedプロトコルのストリームパーサ で少し書いたが、イベント駆動でプロトコルをパースするには「データを次々に投げ込んでいくと内部の状態が遷移していき、ゴールの状態にたどり着くとパース完了」というタイプのパーサが必要になる。バイナリプロトコルなら「ヘッダ部を受信中」と「データ部を受信中」の2つくらいしか状態がないので手で書いても良いが、テキストプロトコルだと凄まじく面倒なので Ragel を使うと書きやすい*3。
ちなみにプロトコルを自作する場合は MessagePack を使っておくと、バッファリングまで含めて面倒を見てくれるストリームデシリアライザが付いているのでオススメw
:2009-06-24 追記
# 与太話:これら各種のI/O戦略をライブラリとして提供することを目的としてmpioライブラリを開発していたが、最近では mp::wavy を中心にした便利ライブラリになっており、今では ccf の中に取り込まれるに至っている。
分散ストレージの収束する方向
サーバーサイドの分散ストレージについて。広域P2Pとかデータセンター間で同期するとかCDN云々は知らない。
kumofsのアプリケーション-Gateway間のインタフェースは Get(key) だが、Gateway-Server間のインタフェースは実は GetByHash(key, partitioning-id)(とGetByHashIfModified(key, partitioning-id, time))だったりする。(実際の名前は違うけど意味は同じ)
現状ではpartition-idはkeyにハッシュ関数を掛けて自動生成するが、実際には任意の値を指定できる。
つまり関連するkeyには同じpartitioning-idを指定して同じノードに保存されるようにして、partitioning-idが同じkey同士ならトランザクションできるようにすることも、案外に容易にできる。
Consistent Hashing + double-hash-space はわりと強めに一貫性を保証する。kumofsはレプリケーションが失敗したときにロールバックしないのでdouble-hash-spaceを使っていても不整合が発生し得るが、それはレプリケーションの実装がヘタなだけでアルゴリズム上の問題ではない。
id:kazuhookuさんのPacificとそのはてブがソースだが、ユーザーIDとか何かをキーにしてshardingし、異なるキー間ではトランザクションができなくなるのは許容できるらしい。
Pacificはリゾルバがボトルネックになるが、Consistent Hashing + double-hash-space で分散できるのではないか。データの再配置を悲観的ロックではなく楽観的ロックでやるあたりが問題になりそうか…そこはまだ考えてない。
※2009-06-14追記:データの再分配が起こらなければ問題ないので、最初は小さく始めて動的に増やす(動的な運用)を考えなければ、台数固定で分割してやってもリゾルバは分散できそう。
RDBMSをshardingする方面から歩み寄りと、kvsの分散用のhash値をユーザーが指定可能にする + 同じhash値ならバックエンドDBに任意のクエリを発行可能にするという方面からの歩み寄りで、小さなデータを大量に扱うタイプの分散ストレージは Consistent Hashing + double-hash-space + partitioning-idによる分散 + 確実に実装された再配置 に収束するのかな、と思った。
※2009-06-14 追記:Consistent Hashingは局所性が一切失われる問題があるのと、double-hash-spaceは基本的にManagerノードが必要なのでshardingしないとスケーラビリティに限界がある。偏りを妥協してsorted arrayで分割するとか、一貫性を妥協してEventually Consistentにする実装もあるかもしれない。どの程度のスケーラビリティと台数あたりの効率が必要かに依るか。
大きなデータを扱うタイプの分散ストレージは、ノードが増減したときにデータの移動をしていると遅すぎるので、どのノードにデータが保存されているかというインデックス(メタデータ)と、実際のデータを分離するだろう。データは任意のノードに分散して保存しておけるし、インデックスを引くことで目的のデータに1発で到達できる。
メタデータは小さいデータなのでConsistent Hashingで良い。keyの完全一致以上の方法で検索したければ、非同期にインデックスを作る(インデックスの一貫性は保証しない)のが現実的だと思う。多少遅くなってもスケーラビリティを重視するならSkipGraphか。
1MBくらいまでの大きめのデータを扱うタイプは、通信のホップ数よりいかにキャッシュに乗っているノードを見つけるかが重要になる。クライアントのメモリを共有キャッシュとして使い、データサーバーは目的のデータがキャッシュに載っていなければ共有キャッシュにリダイレクトする、共有キャッシュはMOESIプロトコル(あるいはSIプロトコル?)を使って一貫性を維持するなどの手法がありそう。
もっと大きい巨大データを扱うタイプは、どうせキャッシュに乗らないのでレプリカを増やして対応する。人気コンテンツほどレプリカを増やすなど。
あるいはストレージががやたら速くてネットワークのスループットがボトルネックになるのであれば、共有キャッシュと人気コンテンツほどレプリカを増やす方法の両方を使うだろう。
サーバーサイドの分散ストレージはこのあたりに収束するのではないだろうか。
他の機能…たとえばある条件にマッチするデータが保存されたら通知するとか、ロックとか、階層構造(ディレクトリ)を作ってデータ一覧を出すといった機能は、分散ストレージとは疎結合した別のシステムで実装される気がする。
そのあたりはジョブサーバー使って非同期処理しろとか、ディレクトリなんてアプリケーション側でやればいいという気もしなくはないが。
ccfの計画
今実装中のccfの高水準なAPIについてメモ。(前に書いたccfの件はwavyのちょっとしたラッパ程度でしかない)
connection
まず基本のconnection
template <typename IMPL> class rpc_connection : public connection<IMPL> { // ... void process_message(msgobj msg, auto_zone& z); // process_messageから呼ばれる void process_request(method_t method, msgobj param, msgid_t msgid, auto_zone& z); // process_requestから呼ばれる void dispatch_request(method_t method, msgobj param, responder& response, auto_zone& z); // process_messageから呼ばれる void process_response(msgobj result, msgobj error, msgid_t msgid, auto_zone& z); };
(このrpc_connectionだけでもかなり便利で、他のところで使い回せる)
その上に「コネクション」と「セッション」を分離する層をかぶせる。セッションはこちらから送るRPC要求のメッセージIDにコールバック関数を紐づけておいて、応答を受け取ったときに紐づけられたコールバック関数を呼び出す…などなどするクラス。
template <typename IMPL> class managed_connection : public rpc_connection<IMPL> { // 引数にセッションマネージャを取る managed_connection(int fd, basic_session_manager* manager, shared_session session, address locator); // rpc_connection<IMPL>::process_requestをフック void process_request(method_t method, msgobj param, msgid_t msgid, auto_zone& z); // rpc_connection<IMPL>::process_responseをフック void process_response(msgobj result, msgobj error, msgid_t msgid, auto_zone& z); // ...
コンストラクタでsession_managerとsessionを受け取って、process_requestをsession_managerに中継し、process_responseをsessionに中継する。
コネクションとセッションを分離する目的は以下:
- コネクションをRPCセッションの下に隠蔽する
- 明示的に「まず接続して、接続し終わったらコールバック関数を呼んでもらって…」とやらなくてもメッセージが送れる
- ↑メッセージを送ろうとした段階でセッションを作って、後で(非同期に)そのセッションにコネクションを登録する
- 相手先ホスト=セッション(!=コネクション)になる
- 複数のコネクションで負荷分散したり(1つのホストが複数のNICを持っている場合)
- bondingでactive-activeは色々アレなのでアプリケーションレベルで
- ネットワークが瞬断したとき、再接続してそのまま正常に続行できる
- 要求->応答のコンテキストも途切れない
sessionは↓こんな感じで:
class sessoin { // RPC呼び出し template <typename Message> void call(Message& parameter, shared_zone life, callback_t callback, unsigned short timeout = 10); // コネクションを追加・削除 void add_connection(int fd, const address& locator); // 削除はmanaged_connection::~managed_connectionから自動で呼ばれる void remove_connection(int fd); // managed_connection::process_responseから呼ばれる void process_response(msgobj result, msgobj error, msgid_t msgid, auto_zone& z); };
callでRPC呼び出しする。templateのMessageはMessagePackでシリアライズ可能な任意のクラス…なのだが、static const uint32_t Message::idがRPCのメソッドIDになっているなどの制約がある。
で、そのMessageクラスを生成するRPCスタブジェネレータ*2がある(作りかけ)。書き方はこんな感じで:
@message Get # これ自体がMessagePackでシリアライズ可能になる std::string key; # MessagePackでシリアライズ可能な任意のクラス uint32_t flags = 0; # デフォルト引数に対応 @end = 1 # これが static const uint32_t Get::id になる
一方session_managerは↓こんな感じ:
template <typename Identifier, typename IMPL> class session_manager : public basic_session_manager { // identifierに対応するsessionを取り出すか、無ければ作る shared_session get_session(const Identifier& id); // managed_connectionから呼ばれる virtual void dispatch(shared_session from, method_t method, msgobj param, session_responder response, auto_zone& z) = 0; // get_sessionで新しいsessionが作られたら呼ばれる // session_created(const identifer_t& addr, shared_session s); };
session_managerを継承してdispatchを実装してやればRPCサーバーが完成する。引数に session_responder というのが渡されていて、.result(true); などとやるとRPC応答を返送してくれる。
Identifierにはアプリケーションによって独自の「ノード識別子」を定義できる。1つのノード識別子に対して1つのセッションが対応する。アドレスが違っても*3識別子が==なら、複数のコネクションを1つのセッションにまとめられる。
Identiferから相手のIP+ポートが引けるなら、session_createdの中で非同期にccf::core::connectしたりできる…が、それは派生クラスの実装に任される。ここもCuriously Recurring Template Pattern。
しかしフツーに相手に接続するだけでいいなら、Identifierには単なるIP+ポートを使うだろうし、勝手にconnect(2)もしてほしい。というわけで典型的な用途向けにsession_managerを継承したクラスがいくつかある:
class server : public session_manager<address, server>; class client : public session_manager<address, client>; class custer : public session_manager<未定, cluster>;
ここから先は未実装。