March 27, 2012

Adding Acknowledgement Semantics to a Persistent Queue

Persistence capability is not enough to ensure the reliability of message oriented middleware. Suppose that you retrieve an item from a queue, and the application or thread crashes in the middle of the process. The item and processes depending on it will be lost, since the crash occurred after retrieving the item from the queue. Acknowledgement semantics can prevent this loss If the application crashes before acknowledging an item. This item will continue to be available to other consumers until an acknowledgment is sent.

This Python code shows how to add acknowledgement to a class derived from the Python Queue class. In the article Persisting Native Python Queues we only show how to persist a queue. It is important to note that we have modified the base Python Queue class, adding the “connect” and “ack” methods. Each application thread must call the “connect” method before using the queue object. The “connect” method returns a unique queue proxy. If the thread crashes, the items that have been fetched, but not acknowledged, in this queue are enqueued again. The “ack” method uses the item returned by the “get” method and effectively removes the item from the queue. In this code ZODB is used for persistence instead of DyBASE. If the entire application crashes, not just a single thread, unacknowledged items are requeued when it restarts.

While acknowledgement semantics increases reliability, it is not infallible. Imagine that after processing an acknowledged item, the result of the process is also added to the queue. In some web crawling implementations, first a URL is retrieved from a queue and acknowledged, then an HTML page is fetched from that URL, and finally the links on that page are inserted in the queue. Two problems can occur if the application or thread crashes during this process. If items, in this case URLs, are acknowledged and thus eliminated as soon as they are retrieved, they may be eliminated before enqueuing all of the links on the page. In this case, the remaining links will be lost. If, on the other hand, items are acknowledged only after enqueuing all the links, some links will be duplicated. This conflict is solved with queue transaction semantics. If the process or thread crashes a rollback is performed.

Notes

  1. This persistent queue with acknowledgement assumes that the objects in the queue all have different identities, id(obji) != id(objj) for all i,j. Making a copy of the object works for mutable objects. Immutable objects must be wrapped.
  2. The object classes in the queue must inherit from the Persistent class, including object members.

Prerequisites

  1. Python 2.x (x>=6)
  2. ZODB3

Code

The code is available at github and includes a series of unit tests.

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Persisting Native Python Queues

Resources

  1. AMQP Acknowledgement
  2. HornetQ Asynchronous Send Acknowledgements
  3. HornetQ Transactions
  4. ZODB In Real Life
  5. Storing Persistent Objects with Persistent Objects as attributes of the Parent PO

Photo taken by Paul Downey

February 23, 2012

Persisting Native Python Queues

Native Python queues do not allow you to stop and resume an application without loosing queue items. Adding persistence of objects to a derived Python Queue class partially addresses this issue. We use the DyBASE embedded object oriented database to persist queues items in files. If needed, you can instantiate multiple queues pointing to different files. Since our PersistentQueue class is derived from the Python Queue, it works in multithreading environments. Transactional support such as acknowledging successfully processed queue items is not currently a feature of this class.

In Using Queues in Web Crawling and Analysis Infrastructure we noted the relevancy of queues to connect heterogeneous technologies. Queues are also used in the context of a single technology to follow the typical producer/consumer pattern. For example, the Python programming language offers FIFO and priority queues, as does .NET. However, neither of these native queues persists. The Microsoft Windows Azure platform incorporates persistant queues but has other limitations, and also may be overkill for your solution.

There are several ways to persist a queue. If the items that you want to persist have a fixed buffer length then Berkeley DB’s queues or STXXL’s queues work well. You can’t use database managers like GDBM if you need a FIFO queue since you need to traverse the elements in order and the hash table does not assure this order. STXXL, and DyBASE use a B+Tree data structure. You may be tempted to use a database engine like SQLite which can be useful in many scenarios, but an SQL engine adds complexity that is not required for FIFO queues.

Prerequisites

  1. DyBASE: http://www.garret.ru/dybase.html

Code

The code is also available at github.

#!/usr/bin/python

from Queue import Queue
import dybase
import sys

MAX_INT = sys.maxint
MIN_INT = -MAX_INT - 1

#DEBUG = True
DEBUG = False

class Root(dybase.Persistent):
	def __init__(self):
		self.start = 0
		self.stop = 0

class SizeOfPersistentQueueExceeded(Exception):
	pass

class incomplete_persistent_deque:
	def __init__(self, filename):
		self._init_db(filename)

	def _init_db(self, filename):
		self.db = dybase.Storage()
		if self.db.open(filename):
			self.root = self.db.getRootObject()
			if self.root == None:
				self.root = Root()
				self.root.elements = self.db.createIntIndex() # createLongIndex can be used on 64 bits systems but it is strange to pass 2**32 elements in the queue
				self.root.pending_elements = self.db.createIntIndex()

				self.db.setRootObject(self.root)
				self.db.commit()
			else:
				if DEBUG:
					print "self.root already exists"

		if DEBUG:
			print "self.root.start =", self.root.start
			print "self.root.stop = ", self.root.stop

	def __len__(self):
		if self.root.stop >= self.root.start:
			return self.root.stop - self.root.start
		else:
			return (MAX_INT - self.root.start + 1) + (self.root.stop - MIN_INT)

	def append(self, item):
		# add element to index
		self.root.elements.insert(self.root.stop, item)
		self.root.stop += 1
		if self.root.stop > MAX_INT:
			# check also if stop touches start
			self.root.stop = MIN_INT

		if self.root.start == self.root.stop:
			raise SizeOfPersistentQueueExceeded

		# persist
		self.root.store()
		self.db.commit()

	def popleft(self):
		# don't check if empty, Queue class take charge of that
		# remove element from index
		item = self.root.elements.get(self.root.start)
		self.root.elements.remove(self.root.start)
		self.root.start += 1
		if self.root.start > MAX_INT:
			# check also if start touches stop
			self.root.start = MIN_INT 

		if self.root.start == self.root.stop: # if queue is empty resync start & stop to 0. It is for beautifier purposes can be removed.
			self.root.start = 0
			self.root.stop = 0

		# persist
		self.root.store()
		self.db.commit()

		return item

class PersistentQueue(Queue):
	def __init__(self, filename, maxsize = 0):
		self.filename = filename
		Queue.__init__(self, maxsize)

	def _init(self, maxsize):
		# original: self.queue = deque()

		# incomplete_persistent_deque:
		# - incomplete implementation but enough for Queue:
		# - implemented methods:
		# -- __len__
		# -- append
		# -- popleft
		#

		self.queue = incomplete_persistent_deque(self.filename)

	def connect(self): # to handle failovers
		pass

	def ack(self):
		pass

	#def ack(self, item):

class ElementTest:
	def __init__(self, value):
		self.value = value

	def __repr__(self):
		return self.value

	def __str__(self):
		return self.value

def test1():
	q = PersistentQueue("myqueue.dbs")
	if not q.empty(): # get pending items
		while not q.empty():
			e = q.get()
			print e

	for s in ['one', 'two', 'three']:
		q.put(ElementTest(s))

def main(): # run this script twice to see the persisted elements
	test1()

if __name__ == '__main__':
	main()

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. [queue persistent site:stackoverflow.com] Google query
  2. bsddb3 Python interface for Berkeley DB
  3. bsddb3 examples
  4. STXXL queue class template