Writing a distributed datastore in Python

I am writing a distributed data-store in Python for a very specific kind of data,and I wanted to show how you can build a simple distributed system in Python.
For this post we will build a distributed log,This system allows you to store logs from many servers into one big log which is distributed between many machines.
We start by giving a overview of the components of the system:

diagram of components

This system have two major components,The master node and the storage nodes.
Storage nodes are simple dumb nodes that store and retrieve data when needed.

The master node is responsible for accepting writes from servers and sending those writes to the storage node in a way that the data get distributed to all the nodes.
When a server sends his log entries to the master,the following happens:

  1. master gets the K log entries and select N(the replication factor) nodes that will store the entries
  2. master send a copy of the K entries to the N nodes and waits for them to write it to their local store(be it memory or disk)
  3. master then register in his local datastructure that those N nodes have those K entries.

Also the master is responsible for client requests,when a client wants to read a portion of the log the following happens:

  1. he sends a request to the master with a time range that he wants to read.
  2. the master decided which nodes have the information and ask for each node the entries.
  3. the master combine all the information and send it back to the client.

This design(single master,multiple slaves) have pros and cons.


  • simple design,the master coordinates both read and writes.
  • complex operations can be implemented in the master,like garbage collecting.
  • the master have a consist view of the cluster and can make elaborated decisions on data placement.


  • SPOF(single point of failure) - in case the master is down then all logs are inaccessible,this problem can be solved using a shadow master(like it is done In Google File System) or by using a multi-master/no-master design(out of the scope of this post).
  • the cluster size is limited by the master capacity,for example there is a limit on the amount of RAM(or disk) a master have to keep track of which node have each log entry.
  • the amount of read/write load is limited by the master as all writes and reads must go to the master first.

Each components is basically a simple json-rpc server using wsgi-jsonrpc.

Lets start by looking on the storage node code(storage_node.py):

from optparse import OptionParser
from wsgiref import simple_server
from wsgi_jsonrpc import json_tools,WSGIJSONRPCApplication

class StorageNode(object):
    def __init__(self):
        self.entries = []
        self.dt_to_entries = {}

    def add_entries(self,data,dt):
        self.dt_to_entries[dt] = len(self.entries) - 1

    def get_entries(self,dts):
        return [(self.entries[self.dt_to_entries[dt]],dt) for dt in dts]

if __name__ == "__main__":   
  parser = OptionParser()
  parser.add_option("-m", "--master-port", dest="master_port",
help="master port",type="int")
  parser.add_option("-p", "--port", dest="port",help="storage port",
  (options, args) = parser.parse_args()
  node = StorageNode()
  print "joining master..."
  master = json_tools.ServerProxy(
"http://localhost:%d" % options.master_port)
    print "joining completed,serving..."
    server = simple_server.make_server('localhost', options.port,
  except Exception,e:
    print "not working",e

The storage node accepts two arguments,master port and port,the storage node sends a join request to the master when it start with his port so the master can communicate with the storage node.
The storage node have two functions,add_entires which is used to write new log entries to the local store(in this case a list) and get_entries which is used to retrieve entries by timestamps(dts parameter).

Now for the master node code:

from optparse import OptionParser
from random import sample
from collections import defaultdict
from wsgiref import simple_server
from wsgi_jsonrpc import json_tools,WSGIJSONRPCApplication

def get_proxy(port):
    return json_tools.ServerProxy("http://localhost:%d" %port)

class MasterNode(object):
    def __init__(self,options):
        self.options = options
        self.servers = set()
        self.dt_to_servers = {}
        self.dts = []

    def join(self,port):

    def add_entries(self,data,dt):
        print data,dt
        servers = sample(self.servers,options.replication_factor)
        for server in servers:
        self.dt_to_servers[dt] = servers

    def get_range(self,start_dt,end_dt):
        #should use binary search!
        servers_dts = defaultdict(list)
        for dt in self.dts:
          if dt >= start_dt and dt <= end_dt:

        dts_data = {}
        for server,dts in servers_dts.iteritems():
          response = get_proxy(server).get_intervals(dts)
          for data,dt in response['result']:
            dts_data[dt] = data
        return dts_data

if __name__ == "__main__":   
  parser = OptionParser()  
  parser.add_option("-p", "--port", dest="port",help="port",type="int")
  parser.add_option("-r", "--replication-factor",
dest="replication_factor",help="the replication factor for entries",
  (options, args) = parser.parse_args()
  print "running..."
  server = simple_server.make_server('localhost', options.port,

The master have a join method,this is the method that is called by the storage nodes when joining the cluster,the master register the ports of the storage nodes in the servers set.
The other two method are:

  • get_range - return all the entires between start_dt and end_dt
  • add_entries - add the entries that happened in dt(timestamp) to the distributed store.

Both of them do it in a distributed matter,adding a entry get copied to N(the replication factor) different storage node.
Getting entries by a datetime range is done by getting the entries from the nodes that contain the data.

This implementation have some limitations:

  • it assumes that all nodes are in the same machine on different ports(this is done on purpose since this is a demonstration),it can be easily fixed by storing full addresses instead of ports.
  • it doesn`t detect dead nodes,it will send requests to dead nodes this can be fixed by adding a heartbeat system to check each node state.
  • new nodes that join the cluster start getting data written to them but  they should get data from other nodes first in order to balance the load in the cluster(this is called bootstrapping).

All in all I think  this is a good start for anyone who is looking to build a distributed store.


  1. bob Jan 4

    sample(self.dt_to_servers[dt],1)[0] should be written as random.choice(self.dt_to_servers[dt])

  2. Uriel Katz Jan 5

    Yeha my mistake,choice is more appropriate for this task