Building a distributed concurrent queue with Apache ZooKeeper
In my first few weeks here at Cloudera, I’ve been tasked with helping out with the Apache ZooKeeper system, part of the umbrella Hadoop project. ZooKeeper is a system for coordinating distributed processes. In a distributed environment, getting processes to act in any kind of synchrony is an extremely hard problem. For example, simply having a set of processes wait until they’ve all reached the same point in their execution – a kind of distributed barrier – is surprisingly difficult to do correctly. ZooKeeper offers an API to facilitate this sort of distributed coordination. For example, it is often used to serve locks to client processes – locks are just another kind of coordination primitive – in the form of small files that ZooKeeper tracks.
In order to be useful, ZooKeeper must be both highly reliable and available as systems will rely upon it as a critical component. For example, if locks cannot be taken, processes cannot make progress and the whole system will grind to a halt. ZooKeeper is built on a suite of reliable distributed systems techniques and protocols, and is typically run on a cluster of machines so that if some should fail, the remaining ones can continue to provide service. Under the hood, ZooKeeper is responsible for ordering calls made by clients so that each request is processed atomically and in a fixed and firm order.
One of my first contributions to the project was a set of bindings to allow programs written in the Python language to act as clients to a ZooKeeper cluster. ZooKeeper was natively written in Java, and there are already C and Perl bindings. Adding Python bindings increases the number of people that can use the system, and brings the strengths of Python, such as rapid prototyping, to bear when designing distributed systems.
The Python ZooKeeper bindings are available from the ZooKeeper SVN repository and should be part of the 3.2 release, planned for the next couple of weeks. To use the bindings now, you can either check out the latest version of the code from the SVN repository, or download a tarball containing a recent snapshot here. The zookeeper module exposes the ZooKeeper API to Python, so to get started all you need do is add import zookeeper to your Python script once the module is installed. Instructions on getting up and running are at the end of this post.
To illustrate some of the ZooKeeper API, I’ve written a distributed FIFO queue in Python – the source code is here – which I wanted to share. The combination of Python and Zookeeper meant that I was able to write the queue in just over 60 lines of code, and most of that deals with local coordination issues between two threads rather than any tricky issues trying to make remote processes behave correctly. I can only give a taste here of how programming with Python and ZooKeeper works. I hope there’s enough here to convince you that ZooKeeper might make a useful component for distributed systems that need a little herding.
ZooKeeper provides a tree abstraction where every node in that tree (or znode, in ZooKeeper parlance) is a file on which a variety of simple operations can be performed. ZooKeeper orders operations on znodes so that they occur atomically. Therefore there is no need to use complex locking protocols to ensure that only one process can access a znode at a time. The tree represents a hierarchical namespace, so that many distinct distributed systems can use a single ZooKeeper instance without worrying about their files having the same name.
Each znode has some associated data – up to a megabyte in current builds – that can be updated atomically. Every update to a znode increases its version number, which allows clients to perform compare-and-swap operations by reading the version and then updating a znode only if the version is still the one that was read.
As a notification mechanism, ZooKeeper provides watches, which are callback methods that are called asynchronously when an event of interest occurs. Watches are attached, typically, to an individual znode. When that znode changes any watcher on the znode will be fired asynchronously on the client. Many methods of the ZooKeeper API have an optional watch argument. Some languages have to work hard to provide callable objects as parameters, but Python makes this easy as callables are first class language constructs. Simply pass any callable, like a method or a lambda expression, to the zookeeper module and when an event of interest occurs, the callable will be executed.
This call comes from a separate thread of execution, so great care must be taken to ensure that unexpected things do not happen due to your watcher being fired at an arbitrary point in the execution of your script. Normally you will use watchers to notify another thread of a state change. It will often be the case that the main thread will be waiting for the watcher to fire before it can continue. An example of this is in the __init__ method of our ZooKeeperQueue when we try to connect to the server. Compared to the time a script takes to execute, connections can take a long time to run. So it’s useful that the ZooKeeper API allows us to connect asynchronously, in case there were any work that we wanted to get done while we were waiting for the connection to be established. However, in our case, we just want to wait until the connection is successful, and so we need a mechanism to wait for the watcher to notify us.
A useful tool for this inter-thread communication is the Condition object in Python, which represents a condition variable, a well-known concurrent programming abstraction. Condition objects may be acquired and released just like locks, but they also expose an API to wait for a notification from another thread and to fire that notification. While a thread is waiting on a Condition it goes to sleep, leaving the operating system with some free CPU to dedicate to other processes. Once a Condition is notified, a thread that is waiting on it is woken up and allowed to continue execution once the notifying thread has released the Condition.
This leads to a simple pattern for communicating between watchers and the main thread. Here’s an excerpt from the connection code:
def watcher(handle,type,state,path): print "Connected" self.cv.acquire() self.connected = True self.cv.notify() self.cv.release() self.cv.acquire() self.handle = zookeeper.init("localhost:2181", watcher, 10000, 0) self.cv.wait(10.0)
First we define our watcher which takes four parameters (if you want to provide more parameters or local state to a watcher, one way to do it is to wrap a function call in a local lambda which captures the state). The next line acquires an exclusive lock on a condition variable cv. Why do this now? Once we set our watcher in place, it could be fired at any time – even before the main thread makes progress to the next line of code. If we don’t prevent it from sending a notification on the condition variable before we’re ready to look for it, the notification could get lost and we could wait forever. Notifications aren’t buffered – if no one is waiting on a condition variable, no one gets woken up.
Then the code initialises ZooKeeper. The zookeeper module gives us an integer handle which we can use to refer to our connection in the future (we can open many connections per client). The next line tells us to wait until we receive a notification on the condition variable that the connection has succeeded. The parameter is a timeout in seconds, after which if we are still not connected we presume that something is wrong and abort.
The ZooKeeper queue
A FIFO queue is a simple data structure where producers put items in, and consumers retrieve them in the order they were put in. There are only two operations on a basic queue: enqueue adds an item and dequeue removes it. Despite their simplicity, queues crop up very often in distributed systems – for example, in job submission systems where clients submit requests to a set of workers which serve the requests on a first-come, first-served basis.
The ZooKeeper queue is structured very simply. All items are stored as znodes under a single top-level znode which represents a queue instance. Consumers can retrieve items by getting and then deleting a child of the top-level znode. The code creates a queue by calling a single create command. If the queue already exists, the Python module will throw an exception which we catch. This is a design decision that is still in review – future versions of the bindings might return integer error codes, and rely on the user to throw an exception if required.
zookeeper.create(self.handle,self.queuename,"queue top level", [ZOO_OPEN_ACL_UNSAFE],0)
The first two arguments to this call identify the connection to the ZooKeeper service and the name of the znode. The third is the data the znode contains. We won’t be accessing the data so we write some placeholder text.
The fourth argument is an access control list of permissions that controls who can access the znode in the future. ZooKeeper provides fairly fine-grained control over access, but the subject is beyond the scope of this post. What we have done here is to create the queue znode so that any client can read or write to it.
Adding and deleting items from the queue
Although I explained how consumers retrieve items from the queue, I said nothing about how they make sure they are retrieving items in FIFO order. What we would like is a way of naming each item such that later items are ordered lexicographically after earlier ones. If we can retrieve items in the same order, we’ll have our queue. Thankfully, ZooKeeper provides a very handy flag for the create call that helps us out. Specifying the zookeeper.CREATE_SEQUENCE flag appends each znode name with an sequence number suffix that increases monotonically with each new znode that is created. ZooKeeper ensures that the sequence numbers are applied in order and are not reused.
Enqueuing an item is therefore a simple one liner. We don’t have to take out any locks to ensure that access to the queue znode is serialised. Items may be queued concurrently, and ZooKeeper takes care of assigning sequence numbers to them in the order they were received.
Dequeuing an item is also straightforward, but a bit more involved. First we retrieve a list of all the items waiting to be queued from ZooKeeper with the get_children procedure call. Then, after sorting the list of items on the client, we get the contents of the znode (i.e. the item’s data) and then try to delete it.
It is possible that this deletion will fail because some other consumer has managed to successfully retrieve the item beforehand. We could ensure that this would never happen by organising for a queue-wide lock – this is easily implemented in ZooKeeper (although left as an exercise for the reader). However, this would severely impact performance by only allowing a single consumer to access the queue at one time. Instead, the client simply deals with the failed delete – again, indicated via an exception – and moves on to the next child znode in the list. If the client reaches the end of the list without successfully deleting an item, it should issue another get_children call to make sure that no items were added while the original list was being scanned. Once the get_children call returns an empty list, the dequeue procedure gives up and returns None.
Sometimes we might want to block until an item is available to retrieve. It would be inefficient to copy exactly the non-blocking approach and simply loop, issuing get_children requests until an item was found. Instead, we can leverage ZooKeeper’s watcher mechanism to provide an asynchronous notification when a new znode is created as a child of the queue znode. The code to accomplish this is a combination of the patterns we’ve seen already in the dequeue and connection code.
def block_dequeue(self): def queue_watcher(handle,event,state,path): self.cv.acquire() self.cv.notify() self.cv.release() while True: self.cv.acquire() children = sorted(zookeeper.get_children(self.handle, self.queuename, queue_watcher)) for child in children: data = self.get_and_delete(self.queuename+"/"+children) if data != None: self.cv.release() return data self.cv.wait() self.cv.release()
First the client acquires a lock to prevent the watcher sending a notification when the client is unready. Then, as in the dequeue method, the client retrieves a list of items, but here a watcher parameter is specified. The watcher will fire whenever any event is seen that is relevant to the queue znode. The watcher acquires the lock – blocking until the client has given it up – and then notifies the client that there may be more items available.
The client only waits for this notification if all the children returned from get_children have already been consumed by others – otherwise it will successfully retrieve an item and return it. Once all possible items have been exhausted, the client waits on the condition variable. After being woken up, it repeats the same list-read-delete-wait loop.
ZooKeeper operations can fail in a number of ways. In order to keep this example simple, most errors are raised as exceptions and the queue aborts. A more robust implementation should catch errors at every ZooKeeper invocation, as many can be recovered from with a little effort.
The zookeeper.CONNECTIONLOSS error condition is particularly worth noting. ZooKeeper may drop a client connection at any time, due to physical link loss, network congestion or other connection problem. This can cause ZooKeeper API invocations to abort before the ZooKeeper cluster is able to inform the client of the operation’s success. This is problematic for our queue, as enqueue operations may or may not have succeeded when we receive a CONNECTIONLOSS error.
There are several approaches we can take to this problem. The first is to blindly retry enqueue when a connection is lost. This could result in an item being queued several times, but for some systems this is not a significant problem. For example, if a web page is crawled twice, apart from the time cost there will be no hardship caused to a indexing engine.
For some applications, duplication of enqueue operations is problematic. The obvious ‘solution’ is to check whether an item is in the queue after it has been queued. However, it is possible that a consumer will have retrieved and deleted the item between the connection loss event and the subsequent reconnection and existence check. Instead, a two-phase protocol is necessary where a producer marks an item as ‘consumable’ only when it is sure it is in the queue, by atomically updating its associated data with a flag. Consumers may only retrieve items for which the flag is set. If a connection loss occurs during the setting of this flag, recovery is easier as the set call may be reissued – if the item is no longer present in the queue, the only possible explanation is that the original flag update succeeded and the item has been consumed. This is not built into the example code, but a production system should implement a similar form of connection loss recovery.
Taking care of failure modes like this one often comprises most of the work of building a distributed system. The key is to understand every exception that API calls can throw, and to know what your code does in every circumstance.
Using the queue
To use the queue, you must first make sure you have built and installed both the C client libraries and the zookeeper Python module. There are two prerequisite packages: the cppunit development package and the Python development package. On yum-based systems, these are named cppunit-devel and python-devel. Both packages are available through standard platform package managers like yum, apt and Darwin ports.
As a prerequisite to building the C client libraries, the Java based-server must be built. This auto-generates some header files that the C libraries rely on. From the root directory of the downloaded distribution:
The C client libraries for ZooKeeper must be installed as the Python module makes use of them to actually communicate with a ZooKeeper cluster. It’s easiest to build these from source. From the src/c directory, type the following:
make && sudo make install
The downloadable package contains the source code for the Python module. To build and install, one command should do the trick from the src/contrib/zkpython directory:
To test the installation, start a Python shell and type import zookeeper. If you don’t see any errors or warnings, the module has been built and installed successfully. The bindings have been tested with Python 2.3, 2.4, 2.5 and 2.6, and are known not to work with 2.2 and earlier. We haven’t yet tested them against Python 3.x – we’d love to hear your feedback about your experiences with the latest versions of Python.
To run the queue example, you must have a ZooKeeper server running on the local machine at port 2181 (to change the location of the server, edit the string passed to zookeeper.__init__). The Java-based server will have been built when you ran ant from the root directory of the distribution earlier. Before the server can run, it needs a configuration file to read:
cat >> conf/zoo.cfg
Now you can run bin/zkServer.sh start to start a standalone server on the local machine. To stop the server in the future, run bin/zkServer.sh stop.
You’re finally ready to run the queue example:
The example is very simple. It queues three items, and then dequeues them.
I hope that I’ve shown you that ZooKeeper is a very useful system, with powerful primitives that makes writing tricky distributed concurrent programs easier. There are many applications that ZooKeeper could help you build – lock servers, name services, metadata stores and even a unique kind of filesystem can be built in a straightforward way using the ZooKeeper API. The project is active and always looking for volunteers. ZooKeeper integration is already being built into HBase, and there are moves to bring greater reliability to Hadoop and HDFS by delegating some server functionality to ZooKeeper. As far as the Python bindings go, the next version will include better documentation, some more Python niceties such as default parameters and docstrings, and a more Pythonic wrapper object to wrap up some of the bookkeeping that ZooKeeper requires.