簡易分散ストレージを実装してみた
Cagraの開発に全然参加できていなくてヘコみつつある今日この頃。発作的に簡易分散ストレージを実装してみた。
- server.rb
require 'socket' require 'thread' require 'rubygems' require 'json' class Server def initialize(srv) @srv = srv @db = {} end def self.open(port) self.new(TCPServer.open(port)) end def run while true Thread.new(@srv.accept) {|sock| begin req = recv(sock) puts "req: #{req.inspect}" __send__ "req_#{req['method']}", sock, req ensure sock.close end } end end def req_get(sock, req) key = req['key'] if data = @db[key] res(sock, { 'error' => nil, 'data' => data, }) else res(sock, { 'error' => 'not found', 'data' => nil, }) end end def req_put(sock, req) key = req['key'] data = req['data'] @db[key] = data res(sock, { 'error' => nil, }) end private def recv(sock) JSON.parse(sock.gets) end def res(sock, res) puts "res: #{res.inspect}" sock.puts JSON.generate(res) end end port = ARGV[0] Server.open(port).run
- client.rb
require 'socket' require 'thread' require 'rubygems' require 'json' require 'digest/sha1' class HashSpace def initialize @space = [] end def add(hash, addr) @space << [hash, addr] @space.sort_by {|s| s.first } end def loop(hash, &block) index = nil @space.each_with_index {|s, i| if s.first > hash index = i break end } index ||= @space.length @space[index..-1].each {|hash, addr| yield addr } @space[0, index].each {|hash, addr| yield addr } end def self.hash(str) Digest::SHA1.digest(str) end end class TCC def initialize(addrs) @addrs = addrs @hash_space = HashSpace.new addrs.each {|addr| hash = HashSpace.hash(Socket.pack_sockaddr_in(*addr.reverse)) @hash_space.add(hash, addr) } end def get(key) res = nil @hash_space.loop(HashSpace.hash(key)) {|addr| TCPSocket.open(*addr) {|sock| res = req(sock, { 'method' => 'get', 'key' => key, }) } return res } raise "not found" end def put(key, data, rep = 1) res = nil reped = 0 @hash_space.loop(HashSpace.hash(key)) {|addr| TCPSocket.open(*addr) {|sock| res = req(sock, { 'method' => 'put', 'key' => key, 'data' => data, }) } reped += 1 if reped >= rep return res end } raise "node not found" end private def req(sock, req) sock.puts JSON.generate(req) JSON.parse sock.gets end end c = TCC.new [ ["127.0.0.1", 4500], ["127.0.0.1", 4600], ["127.0.0.1", 4700], ["127.0.0.1", 4800], ] 100.times {|i| print "put test#{i} = data-#{i}: " p c.put("test#{i}", "data-#{i}", 3) } 100.times {|i| print "get test#{i}: " p c.get("test#{i}") }
クライアントとサーバーが分離しているが、双方を同じプロセス内の別スレッドで動かせばP2P、なんて。
3時間くらいで書けた。分散ストレージはとても簡単なコードからヤバイほど複雑なコードまでスケールするという意味で、Rubyでのネットワークプログラミングの入門に適している。
このプログラムがこれだけ短い理由:
- 例外処理をしていない
- ノードの離脱・参加を考えていない
- サーバーは自分からコネクションを張りに行かない
- クライアントは常に自分からコネクションを張りに行く
- データを共有しないマルチスレッド
- シリアライズを全部JSONに丸投げしている
- コネクションを再利用していない
サーバーが自分からコネクションを張りに行かないで済んでいる理由:レプリケーションするとき、クライアントが複数のサーバーに書き込んでいるため。
しかし、これは遅延は小さいがスループットは出ない。クライアントがあるサーバーに書き込み、そのサーバーが別のサーバーに書き込む、という数珠つなぎの方がスループットが出る。
ロックも同期も要らないマルチスレッドほど簡単なモノはない。しかし実際にはノードリストの更新、並列処理などが絡み、必然的にロックが必要になる == メンドーになる。
解決策として、シングルスレッドのイベント駆動で状態遷移マシンを回す方法があるが、状態の数が多くなってくるとやってられなくなる。
Cagraではシングルスレッドのイベント駆動でユーザーランドスレッドを回している。これは賢いと思った。しかしwriteやreadなどのブロックする命令をすべて抽象化する必要がある。
データ部分も含めてシリアライズする(RPCっぽくする)のは、シリアライズ・デシリアライズが1行で書ける、RPC部を本流の処理から通信と例外処理を分離・抽象化できるなどの利点があるが、一度バッファメモリを経由する分、遅くなる。
コネクションを再利用しないで毎回張りに行くのは遅い。しかし再利用を考えると、ソケットを使い終わったとき(スコープを外れたとき)に、(1)closeするのか、(2)コネクションプールに戻すのか という2つの選択肢が発生することになる。いつcloseするのかのタイミングが難しくなる。必然的にコネクションマネージャ的なモノが必要になる。コードが複雑になる。