イベント駆動型プログラムのエラー処理
イベント駆動型のプログラムでは、エラー処理の記述が面倒になることがある。これを何とかしたい。
例えば、keyからidを引き、idからdata引くプログラムを書きたいとする。
手続き型で書くと以下のようになる:
def doit(key) id = get_id(key) data = get_data(id) return data end
ここで、get_id と get_data は長くブロックするので、イベント駆動型にしたいとする。
そうするとプログラムの正常系の処理は↓このようになる。
def doit(key, &callback) get_id(key) {|id| get_data(id) {|data| callback.call(data) } } # futureを返すのは良いアイディア end
ここまでが前提条件。
このプログラムのエラー処理を愚直に書くと、↓このように非常に冗長になってしまうのを何とかしたい:
def doit(key, error_callback, &result_callback) begin get_id(key) {|id,error| if error error_callback.call("error!") else begin get_data(id) {|data,error| if error error_callback.call("error!") else result_callback.call(data) end } rescue error_callback.call("error!") end end } rescue error_callback.call("error!") end end
方法1:Fiberを使う
実装が複雑になるので、とりあえず使わないことにする。
方法2:エラーハンドラを伝播させていく
get_id や get_data にエラーハンドラを引数として渡せるようにする。get_id や get_data は決して例外を投げず、エラーが起こった場合にはエラーハンドラを呼ぶことにする。
def doit(key, error_callback, &result_callback) get_id(key, error_callback) {|id| get_data(id, error_callback) {|data| result_callback.call(data) } } end
get_idの実装は↓こんな感じになる。
def get_id(key, error_callback, &result_callback) @rpcClient.callback(:get_id, key) {|result, error| if error error_callback.call(error) else id = result result_callback.call(id) end } end
これでだいたい良いのだが、エラーの返し方をエラーの種類によって変えたかったりする。
例えば、get_idは成功したがget_dataが失敗した場合は、エラーを返すのではなくnilを返すようにしたい。そういう場合には↓こう書く。
def doit(key, error_callback, &result_callback) error_callback_nil = Proc.new { result_callback.call(nil) } get_id(key, error_callback) {|id| get_data(id, error_callback_nil) {|data| result_callback.call(data) } } end
方法3?
方法2は、引数にいちいちエラーハンドラが出現するのがわずらわしい。もっとうまく書けないだろうか。
スレッドは使い回すので、TLSは使えなさそう。
Lazyレプリケーションはpush型とpull型のどちらが良いのか
更新ログをマスタからスレーブに送ることでデータをレプリケーションする、いわゆるログシッピングを、分散データストアに実装する方法について。
ログシッピングは、操作を同時に複数箇所に送信するレプリケーションと比べると、次のような実装上の利点がある:
- オリジナルに対する変更とレプリカに対する変更の適用順序が一致する
- オリジナルとレプリカの整合性を維持しやすくなる。設計が簡単になるので嬉しい。
- ダブってレプリケーションされない
- at-most-once。どこまで更新ログを受け取ったかをスレーブ側で管理している前提。
- CPUにやさしい
- バースト的に更新クエリが発生した時でも、レプリケーションクエリの数は(激しくは)増えない。その代わりログに溜まって一度に送信される。バッチ処理はCPUにやさしく、スループットが高い。
一方、更新ログを管理しなければならない欠点があるが、ストレージのデータ構造自体がログであれば不要にできるだろう。
ここで、このレプリケーションを
- push型
- マスタがスレーブに更新ログを投げつける。
- pull型
- スレーブがマスタから更新ログを取り寄せる。
のどちらで実装するのが良いのか。
push型を実装するには、次に送るべきデータをマスタが知っている必要がある。
pull型を実装するには、次に受け取るべきデータをスレーブが知っている必要がある。
前者の場合、マスタは "スレーブが持っていないデータ" を送れば良い。マスタは "スレーブがどこまでデータを持っているのか" を知っている必要がある。
後者の場合、スレーブは "自分が持っていないデータ" を受け取れば良い。スレーブは "自分がどこまでデータを持っているのか" を知っている必要がある。
後者ならステートレスなプロトコルで実装できるが、前者はステートフルになってしまう。
その上に、分散データストアでは、マスタに対するスレーブが変化することがある:
- スレーブが故障したあと復旧した
- マスタが故障したあと復旧した
- 新しいスレーブを追加した
これらの変化を人間に対処させるのは良い割り切りだが、MySQLになるので避けたい。
しかし、こういう変則的な変化をステートフルに管理するコードは、たいていバグるので良くない。
以上の理由で、pull型の方が良いのではないかと思っているところ。
バグを生み出さない最善の方法はプログラムを書かないことであって、そもそもレプリケーションなんて自分で書かないのが一番良いに決まっている。
非同期レプリケーションで単調読み出し整合性を保つ
レプリケーション*1は分散トランザクションと紙一重で、複数のサーバに分散された複数のデータを同時に書き換えたい。
もしこれが同時でないと、クライアントによって異なるデータを読んでしまう可能性がある:
しかし実際には同時でなくても良い。サーバ2からデータを読み出される前に、サーバ2にデータを同期してしまえば「まったく問題ない」:
もう一点、サーバ1から新しいデータが読み出される前ならば、サーバ2から古いデータを読み込んでも「まったく問題ない」:
つまり、整合性のない(それぞれのサーバが異なるデータを持っている)状態が発生したとしても、それが観測されなければ、実質的にすべてのデータは同時に更新されたように見える。
そこで、次の2つのルールを設定する:
- ルール1:クライアントは、整合性がない状態を観測したら、すべてのクライアントが古いデータを読み出さなくなるまで、データを読み出さない
- ルール2:整合性のない状態は、整合性のある状態に自然に(神=人が手を加えなくても)収束していく
ルール1が保たれていれば、整合性のない状態でも新しいデータしか読み出されない。
ルール2が保たれていれば、データを読み出せない状態が無限に続かない。
ルール1の実装
N>R+Wを使う。
ルール2の実装
Paxosを使う。
*1:読み込みの負荷を分散させたい場合。耐障害性だけが問題の場合は話が違うかもしれない。
HTTPをMessagePack-RPCに変換する方法
HTTPは、クライアントは楽だけどサーバは大変。完全でセキュアで堅牢なHTTPのサーバに書くのは異常に難しい。
MessagePack-RPCは、クライアントで使えないかもしれないけど、サーバは書きやすい。それからRPCの弊害として、クライアントとサーバが簡単に密結合化してしまう。
サーバもクライアントも楽したい => HTTPをMessagePack-RPCに変換すればいい。
案1:URLに埋め込む
ぱっと思いついた方法。
"/method?param1=value1¶m2=value" のように、URLにRPCのメソッド名と引数を埋め込む。返り値はJSONに変換して返す。
利点
クライアントはお気楽。
欠点
RPCではメソッドの引数は配列だが、この案では連想配列なので、どうにかする必要がある。
引数に型がない。
返り値はどうしよう。
案2:JSON-RPC over HTTP
クライアントはJSON-RPC over HTTPを実装する
リクエストは全部POSTで、URLはRPCサーバの位置を表し、bodyにJSONを埋め込む。JSONの形式はJSON-RPCに従う。
http://json-rpc.org/wiki/specification
利点
引数に型がある。
JSON-RPCにはシーケンスIDを埋め込む仕様があるので、クライアントでとってもがんばれば、非同期RPCが可能。
どんなメソッドでも呼べる。
どんなエラーでも返せる。
案3:REST
GETは、"/path?param1=value"のように、URLの中にパスとパラメータを埋め込み、GET("path", params)というRPC呼び出しに変換する。返り値はJSONに変換して返す。エラーはHTTPのコードで返す。
POSTは、URLを第一引数、bodyをJSONとして、POST("path", params)というRPC呼び出しに変換する。返り値はJSONに変換して返す。エラーはHTTPのコードで返す。
PUTは、URLを第一引数、bodyを第二引数として、PUT("path", body)というRPC呼び出しに変換する。エラーはHTTPのコードで返す。
DELETEは、URLを第一引数として、DELETE("path")というRPC呼び出しに変換する。エラーはHTTPのコードで返す。
利点
GET,POST,PUT,DELETE以外のメソッドを呼べないので、疎結合化を強制できる。
リソース(パス)に対する操作(GET/POST/PUT/DELETE)という概念。
案4:Webアプリケーションフレームワーク風
POSTと、なんとなくGETでは、URLはリソースではなくサーバーのコマンドを示すそうだ。
GETは、"/method/arg1/arg2?param1=value"のように、URLの中にメソッドと引数とパラメータを埋め込み、method("arg1", "arg2", params)というRPC呼び出しに変換する。返り値とエラーはRESTと同じ。
POSTは、"/method/arg1/arg2"のように、URLの中にメソッドと引数を埋め込み、bodyをJSONとして、method("arg1", "arg2", params)というRPC呼び出しに変換する。返り値とエラーはRESTと同じ。
PUTは、RESTと同じだが、どうせ使わないんじゃないか。
DELETEは、RESTと同じだが、きっと使わないだろう。
利点
よくあるWebアプリケーションのように見える。
kumofsに10MBのvalueを入れるとどうなるか実験してみた
kumofsは、本来小さいサイズのvalueを大量に入れることを想定した分散KVSで、高解像度の画像など、サイズの大きいvalueを入れることは想定されていない。と言うかテストされていない。
でも、実は入れてみたら案外うまく動くんじゃないか?というわけで試してみた。
結論
- データ総量30GB、物理ホスト1台で試した限りでは、実は問題は無さそう
- 物理ホスト1台というのが不十分なので微妙…
- ノードを追加したり復旧したりするとき、数時間の間、速度が半分くらいに劣化する
- データ量1TB、サーバ4台の構成で、2時間くらいかかる推定
- データの再配置がタイムアウトしてしまう可能性があるが確認できていない。3台以上の構成で追試する必要あり
- 速度は、サーバ1台につき、Get 6.7 req/sec、Set 3.8 req/sec くらい
- ほぼ線形にスケールアウトすると仮定すれば、4台投入すれば Get 26.8 req/sec、Set 15.2 req/sec
環境
実験内容
結果
通常時のSet速度
通常時のGet速度
- 結果そのまま:6.7 req/sec
再配置時間
- 結果そのまま:
- データを全件スキャンして移動するべきデータを取り出すのにかかった時間:210秒
- データの転送にかかった時間:80秒
- スループットは 1000件 / (210秒 + 80秒)= 3.44件/sec くらい
- MB/secに直すと 10MB × 1000件 / (210秒 + 80秒) = 34.4MB/sec くらい
- データの転送のスループット:10MB×1000件 / 80秒 = 125MB/sec
- MB/secに直すと 10MB × 1000件 / (210秒 + 80秒) = 34.4MB/sec くらい
- 1台構成、データ量1TBで、1TB / 34.4 = 8時間 くらい
- 4台構成、データ量1TBで、2時間 くらい
データの再配置はストリーム全体を圧縮して行っているが、今回使ったデータは全部 'v' で埋められたデータなので、圧縮率がやたら効いている。
実際にはデータの転送にもっと時間がかかるハズ…だが、実際にはHDDの方がボトルネックになっているはずなので、おそらく結果には関係ない。遅延は若干増えるはずだが、ミリ秒レベル。
再配置中のGet速度
- 結果そのまま:3.4 req/sec
- 通常時は 6.7 req/sec だったので、半分くらいに落ちている
RDBに代わるスケーラブルなデータモデルの必要性
このあたりの内容を卒業研究にする予定で、中間報告書まで書いたけど、整理と裏付けが全然追いつかなくて卒論なんて書けそうにないので、とりあえずテキトーにブログに書いておくなど。
データストアには、状態を永続化して共有する機能と、データモデル(状態を操作する意味論)を規定する機能の、2つの機能がある。この2つの機能を、より使いやすく、より高速に、よりスケーラブルに提供することが求められる。そうでないとシステム全体が成り立たない。
冗長化とか負荷分散とか、ハードの質に頼らない高性能なシステムを構築したいときは、「状態を持たないようにする」のが定石になる。同じ状態を2台のホストで同期し続けたり、状態を分割しながら整合性を保ち続けるのは、非常に難しい。このため、状態は共有データストアに保存しておくのがもっとも簡単で、現実的な解になる。
MVCアーキテクチャにおけるViewとControllerはModelの上に成り立っており、そのModelの実装は、データストアの機能に依存するところが大きい。データストアがアトミックな操作を一切サポートしていなければ、いくらModelでがんばっても、高速なトランザクションを実現するのは難しい。データの検索や絞り込みについても同じで、データの物理的な分布を知らないアプリケーションがいくらがんばっても、高速な実装は難しい。
従来は「高級」なデータモデルを保ったまま、状態を共有する機能を高めてきたが、ここにきて困ったことに、物理的な限界が見えてきてしまった。
…と言うと正しくないかも知れない。物理的な限界には達していないのだが、(高級なデータモデルを保ったまま、状態を共有する機能を高めるのは、高級なデータモデルを諦めることに比べて)コスト的に高くなってしまう、と言う方が正確かも知れない。
@frsyuki データモデルが関係代数などが規定する論理的な構造、サーバへの保存はストレージのモデルで物理的な構造。この2つの組み合わせでいいのでは。
http://twitter.com/masayh/status/6654253395
経済原理はこの意味では、論理か物理かのいずれを優先するかを選ぶといえます。保守による長期的コスト低下より、短期開発による売り上げの増大や開発コスト低下を選んだのでしょう。
http://twitter.com/masayh/status/6655773843
同様にコスト低下はクラウドを生みだして、サーバ集約やスケールによる物理レベルでの解決も併用するようになった。これは理性レベルでの生産性、保守性だけでの追及では飽き足らなくなった貪欲な経済原理の進展を意味します。
http://twitter.com/masayh/status/6655822982
ここで言うデータモデルの「高級」さとは、「アトミックに更新できる単位が大きい」ことを意味している。
アトミックに更新できるとは、あるデータを書き換えるときに、すべて更新されるか、まったく更新されないかの、どちらかにしかならない、ということである。一部のデータだけが更新されたような中途半端な状態にはならない。
例えばRDBMSはとても高級なデータモデルを提供していて、すべてのデータをアトミックに更新できる(アトミックに更新できる単位が全データ)。どこのデータを選んできても、アトミックに更新できる。
この特性がどれだけ重要かは、あまり理解できていないが、普段RDBMSを使っている方々にしてみれば当然のことの様に重要なのではないかーと予想している。
一方でKVS(key-value store)はとても貧弱なデータモデルしか提供していなくて、1つのデータしかアトミックに更新できない(アトミックに更新できる単位がvalue)。あるvalueを更新するとき、そのvalueの前半の数バイトだけ更新されたような中途半端な状態にはならないが、複数のvalueをアトミックに更新することはできない。
なぜ今になってkey-valueなる貧弱なデータモデルが(一部で)流行し始めたかと言えば、アトミックに更新できる単位を限定した方が、スケーラブルな実装が作りやすいのである。
ここで、CAP定理という(誤解されやすくて誤用しやすい)法則がある。
複数のサーバの分割して保存されたデータを、必ず Atomic に更新できるように保証しようとすると、応答を返せなくなってサービスが止まってしまうことがあるので、サービスの可用性を向上させるには、Atomic に更新するデータは複数のサーバーに分割しないようにするか、Atomic に更新することを諦めるしかない(と読めたけど違うかも)。
- CAP定理の分かりやすい解説:Brewer's CAP Theorem
- CAP定理の分かりやすい解説の和訳:ブリュワーの CAP 定理
- CAP定理のより厳密な説明:Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services
そこで、CAPのどれを捨てるかと言う議論がある。
例えば、複数のデータを更新するときに、Atomicでなくても良いとすれば(CAPのCを捨てる)、Availability と Partition Tolerance を満足できる。つまり、一応は複数のデータを非同期に(Atomicでなく)更新できる特性を保ちながら、いつでも高速に応答できて、複数のサーバーにデータを分割してスケールアウトできる。
一方で、たまにデータがロックされっぱなしになって応答を返すのに時間がかかるような状況になっても良いとすれば(CAPのAを捨てる)、分散トランザクションなどを使って、Atomic と Partition Tolerance を同時に保証できる。
あるいは、データを複数のサーバーに分割してスケールしなくても良いとすれば(CAPのPを捨てる)、データをすべて1台のサーバーにまとめて Atomic と Availability を同時に保証できる。
…が、詳しくは前の参考文献にお任せして、分かりやすい観点だと、Atomic に更新可能な範囲を広くすると、その範囲では(物理法則的に)スケールアウトできないんだーというのがポイントだと思う。
つまり、スケールアウトが必要なら、すべてのデータをAtomicに更新できるというRDBMSのデータモデルに代わる、新しいモデルを考える必要がある。RDBに代わるスケーラブルなデータモデルが必要だ。
ときに、本当にスケールアウトする必要があるのか、という話がある。根本的だけど、実はここが一番紛糾しているんじゃないかと思う。
「クラウド」はマルチテナントシステムなシステムで、1つの企業だけで使う物ではない。だから1つに企業にとっては、スケールアウトする必要は無いとも言える。しかし「クラウド」がアノ価格で使えるのは、スケールアウトするシステムがあるからだろう。クラウドの価格が魅力的に映るのであれば、ウチにはスケールアウトは必要ないとは言えないハズだ。
それは規模の経済性が働いているからアノ価格になるのであって、巨大企業でもない組織が、ちまちまとスケールアウトするシステムを作っても意味がない、という話もあるだろう。しかし技術的に巨大企業に依存してしまうのは、また別の問題があると思う。
今はまだ、その新しいデータモデルが確立されていない時期だと思うので、そのあたりの知見を独占されずに収集しておくという意味でも、データモデルについてはよく考えておきたいところ。
データを分割して分散しなきゃならんねぇと考え始めると、次にそのデータを扱って何か処理を行いたいときに、「データを移動させるよりプログラムを移動させた方が速い」という話が出てくる。そこでMapReduceが出てきたりする(分散データストアのデータモデルは、それ以前の話になる)。分散されたデータの保存と処理も合わせて、分散システム全体を統合して考えられるようなデータモデルを確立できないかなぁ、と思う。
分散KVSの使い方
今流行のkey-value storageの利点と欠点など。小さいデータをたくさん扱うタイプで、単純なkey-value型のデータモデルを持つ分散KVSについて。
Webアプリをとりまく最近のKVS事情、雑感を読んで、ちゃんと整理して把握しておかないといけないな、と思ったので少し整理。
それは違うぞーという事があったらコメントくださいm(_ _)m
※2009-11-17 追記:現在、KVSという用語の意味は定義されておらず、使う人によって揺れています。ここで言うところの分散KVSは、Dynamo や kumofs や ROMA など を想定しています。
分散KVSの利点
- スケールアウトできる
- 簡単にサーバーを追加して性能を上げられる
- 単体の性能が高い
- スキーマレス
最初は少ない台数で安く、後からサーバーを足してスケールアウト!という運用ができる。アプリケーションに影響せずに、ストレージ側の都合だけで性能を向上させることができる。
負荷が想定していた以上に高くなってもサーバーを足すだけで何とかなるし、何より高いサーバーを買う必要がない。分散KVSを特徴付けるポイントは スケーラビリティが高い という点だと思う。
それから単体の性能が高いのもポイント。
RDBでもスケールするようにスキーマを設計していくと、JOINは使わないとか、shardingした全部のサーバーを横断するクエリは発行しないとか、だんだん貧弱なクエリしか使えなくなってくる*1。
するとKVSに比べてサーバー1台の性能が低いのが気になってくる。RDBで1万QPS程度の性能しか出ないところを、KVSだと10万QPSくらい処理できたりするので、ただひたすらにkeyとvalueを低遅延かつ高スループットで取り出したいユースケースだと、KVSの方が嬉しい*2。
こうして見ると、RDBをチューニングすれば足りるとか、あるいは高いハードやソフトを買うお金もあるし、最初から性能が予測しやすいからスケーラビリティもそれほど重要じゃないという場合は、分散KVSは不要。高いハードウェアを買ってきて、コア数に併せてOracle RACのライセンス料を払えば済む。
分散KVSは、負荷がサッパリ予測できない上にお金も無いWebサービスやオンラインゲーム向けだと思う。
分散KVSの欠点
分散KVSはデータモデルが貧弱すぎる。範囲検索もできなければ絞り込みもできない。
本当に連想配列としてしか使わない用途(キャッシュとか)なら何も考えずに分散KVSを使えば良いのだけど、そうでなければ基本的にはRDBと組み合わせて使うことになる。
具体的には、インデックスをRDBに置いて、データ本体を分散KVSに置く。スキーマ不定のデータをRDBに永続化する方法の比較の「friendfeedアプローチ」の図を見るとイメージが湧く。「データテーブル」の部分を分散KVSに置き換える*3。
ここで、この方法でデータを格納しているときに、例えばRDBからインデックスを引いたら100件ヒットしたとする。次にその100件分のデータ本体を分散KVSから取り出そうとすると、全部のサーバーにクエリが飛ぶ(サーバーの台数が100台よりも十分少ないとき)。
これは分散KVSのほとんどの実装は、ハッシュ関数を使ってデータを分散させていることに起因する。できるだけすべてのサーバーにまんべんなく散らばるように設計されているため、多くのデータを一度に取り出そうとすると、多くのサーバーに散らばったデータを集めてくる必要がある。
1台のRDBサーバーにデータが全部集約されていれば、100件分のデータを取り出そうとしても1台のサーバーから取り出せばいい。データがソートされていればHDDをシークする必要もないため、高速に取り出せる。
つまり、関係性のある複数のデータを同時に取り出したい場合は、分散KVSはRDBに比べてサーバー1台あたりの効率が悪い。
分散KVSでも効率を上げたいなら、必要になるデータをあらかじめ作っておくのが基本で、例えば新しいデータを保存するときに検索済みの100件分のデータが入ったkeyも作っておく。ただこれが有効なのは検索する条件があらかじめ分かっている場合で、条件が不定だと難しい。それからトランザクションが使えないので、検索済みのkeyは非同期に作ることになる。厳密な一貫性が必要になるケースでは使えない。
分散KVSはこの効率の問題を、スケーラビリティに任せて力押しで解決する。100台でダメなら1,000台にすればいいじゃない!
そろそろ消費電力やラックの容量の問題が語られてもいいのかな、と思う。
分散KVSの中でも単体性能のチューニングは大事で、それによってサーバーの台数が10台になるか20台になるかに直接影響してくる。
ここで言うところの分散KVSには、BigTableやCassandraなどの、いわゆるMulti dimensional sorted storeは含めていない。
これらの分散データストアはkey-valueよりも高級なデータモデルを持ち、単純なKVSの効率上の問題を解決しようとしている。