簡易分散ストレージを実装してみた

Cagraの開発に全然参加できていなくてヘコみつつある今日この頃。発作的に簡易分散ストレージを実装してみた。

  • server.rb
require 'socket'
require 'thread'

require 'rubygems'
require 'json'

class Server

  def initialize(srv)
    @srv = srv
    @db = {}
  end

  def self.open(port)
    self.new(TCPServer.open(port))
  end

  def run
    while true
      Thread.new(@srv.accept) {|sock|
        begin
          req = recv(sock)
          puts "req: #{req.inspect}"
          __send__ "req_#{req['method']}", sock, req
        ensure
          sock.close
        end
      }
    end
  end

  def req_get(sock, req)
    key = req['key']
    if data = @db[key]
      res(sock, {
        'error' => nil,
        'data' => data,
      })
    else
      res(sock, {
        'error' => 'not found',
        'data' => nil,
      })
    end
  end

  def req_put(sock, req)
    key = req['key']
    data = req['data']
    @db[key] = data
    res(sock, {
      'error' => nil,
    })
  end


  private
  def recv(sock)
    JSON.parse(sock.gets)
  end

  def res(sock, res)
    puts "res: #{res.inspect}"
    sock.puts JSON.generate(res)
  end

end

port = ARGV[0]

Server.open(port).run
  • client.rb
require 'socket'
require 'thread'

require 'rubygems'
require 'json'

require 'digest/sha1'



class HashSpace
  def initialize
    @space = []
  end

  def add(hash, addr)
    @space << [hash, addr]
    @space.sort_by {|s| s.first }
  end

  def loop(hash, &block)
    index = nil
    @space.each_with_index {|s, i|
      if s.first > hash
        index = i
        break
      end
    }
    index ||= @space.length

    @space[index..-1].each {|hash, addr|
      yield addr
    }
    @space[0, index].each {|hash, addr|
      yield addr
    }
  end

  def self.hash(str)
    Digest::SHA1.digest(str)
  end
end


class TCC
  def initialize(addrs)
    @addrs = addrs
    @hash_space = HashSpace.new

    addrs.each {|addr|
      hash = HashSpace.hash(Socket.pack_sockaddr_in(*addr.reverse))
      @hash_space.add(hash, addr)
    }
  end

  def get(key)
    res = nil
    @hash_space.loop(HashSpace.hash(key)) {|addr|
      TCPSocket.open(*addr) {|sock|
        res = req(sock, {
          'method' => 'get',
          'key' => key,
        })
      }
      return res
    }
    raise "not found"
  end

  def put(key, data, rep = 1)
    res = nil
    reped = 0
    @hash_space.loop(HashSpace.hash(key)) {|addr|
      TCPSocket.open(*addr) {|sock|
        res = req(sock, {
          'method' => 'put',
          'key' => key,
          'data' => data,
        })
      }
      reped += 1
      if reped >= rep
        return res
      end
    }
    raise "node not found"
  end

  private
  def req(sock, req)
    sock.puts JSON.generate(req)
    JSON.parse sock.gets
  end

end


c = TCC.new [
  ["127.0.0.1", 4500],
  ["127.0.0.1", 4600],
  ["127.0.0.1", 4700],
  ["127.0.0.1", 4800],
]


100.times {|i|
  print "put test#{i} = data-#{i}: "
  p c.put("test#{i}", "data-#{i}", 3)
}

100.times {|i|
  print "get test#{i}: "
  p c.get("test#{i}")
}

クライアントとサーバーが分離しているが、双方を同じプロセス内の別スレッドで動かせばP2P、なんて。

3時間くらいで書けた。分散ストレージはとても簡単なコードからヤバイほど複雑なコードまでスケールするという意味で、Rubyでのネットワークプログラミングの入門に適している。



このプログラムがこれだけ短い理由:

  • 例外処理をしていない
  • ノードの離脱・参加を考えていない
  • サーバーは自分からコネクションを張りに行かない
  • クライアントは常に自分からコネクションを張りに行く
  • データを共有しないマルチスレッド
  • シリアライズを全部JSONに丸投げしている
  • コネクションを再利用していない


サーバーが自分からコネクションを張りに行かないで済んでいる理由:レプリケーションするとき、クライアントが複数のサーバーに書き込んでいるため。
しかし、これは遅延は小さいがスループットは出ない。クライアントがあるサーバーに書き込み、そのサーバーが別のサーバーに書き込む、という数珠つなぎの方がスループットが出る。


ロックも同期も要らないマルチスレッドほど簡単なモノはない。しかし実際にはノードリストの更新、並列処理などが絡み、必然的にロックが必要になる == メンドーになる。
解決策として、シングルスレッドのイベント駆動で状態遷移マシンを回す方法があるが、状態の数が多くなってくるとやってられなくなる。
Cagraではシングルスレッドのイベント駆動でユーザーランドスレッドを回している。これは賢いと思った。しかしwriteやreadなどのブロックする命令をすべて抽象化する必要がある。


データ部分も含めてシリアライズする(RPCっぽくする)のは、シリアライズ・デシリアライズが1行で書ける、RPC部を本流の処理から通信と例外処理を分離・抽象化できるなどの利点があるが、一度バッファメモリを経由する分、遅くなる。


コネクションを再利用しないで毎回張りに行くのは遅い。しかし再利用を考えると、ソケットを使い終わったとき(スコープを外れたとき)に、(1)closeするのか、(2)コネクションプールに戻すのか という2つの選択肢が発生することになる。いつcloseするのかのタイミングが難しくなる。必然的にコネクションマネージャ的なモノが必要になる。コードが複雑になる。