Esoteric Queue Scheduling Disciplines

New Challenges Requires New Tools

Big Data challenges current message oriented middleware (MOM) applications. MOM usually works with FIFO and priority scheduling disciplines. What happens if there is a large list of URLs ready to be crawled but you want to give URLs at the end of the list a chance of being crawled earlier? This concept comes from genetics, and is used in genetic algorithms selection schemes. The last URLs may contain interesting new resources in spite of their order or priority. Consuming from a FIFO queue takes a long time to crawl these URLs. Priority scheduling is more helpful, but it is not possible to know apriori how useful a URL will end up being in the quest for new Internet resources. Why not add a chance factor to URL selection by using roulette wheel scheduling and an efficient algorithm?

Data flows follow an order of execution based on task dependencies. One task cannot start until the preceding tasks have finished. This is the way a spreadsheet works. A change in a cell triggers a series of processes to be completed in topological order. Why not add task dependency to MOM applications? An item can be consumed from the queue only if its precedent tasks have been completed. We provide some data flow resources at the end of the Egont Part II article. However, a new queue scheduling discipline could be used in place of a separate framework. Ideally, the new queue discipline would include features such as persistence and transactions.

Roulette Wheel Scheduling Algorithm Design

To the best of our knowledge, there are currently no Internet resources about using a roulette wheel scheduling discipline for a queue.

The external interface for a roulette wheel queue is the same as for a typical queue with “get” and ”put” methods except that the “put” method incorporates a new probabilistic parameter. Probabilities can be expressed as integers. When a consumer requests an item, a random number is generated to decide which item is selected. Items with higher probabilities have a greater chance of being retrieved, but even items with low probabilities can be consumed.

The implementation of an efficient roulette wheel queue is not easy. Genetic algorithms use roulette wheel selection to choose between a small set of alternatives. A queue used for crawling can contain a lot of URLs, and the question is how to take these processes into account in order to find, add, and remove URLS efficiently.

Finding an item in a roulette wheel data structure is O(n) for trivial traversing and O(log(n)) using a binary search. Adding an item is trivial, it can be added at the end and the new total is the sum of the previous probability parameters total plus the new item’s probability parameter. Removing an item is more difficult. The trivial, but not the best, solution is to recalculate all the partial sums after the element which is being removed. A better solution is to use a heap tree data structure or one of its variants.

An alternative that merits further study is the use of Fenwick trees. In 1994, Peter M. Fenwick discovered how to improve the finding, adding, and modifying of items and the calculation of their subtotals. Since the Fenwick tree works over a fixed range of items, item keys must be preallocated.

See Also

  1. Using Queues in Web Crawling and Analysis Infrastructure
  2. Persisting Native Python Queues
  3. Adding Acknowledgement Semantics to a Persistent Queue
  4. Ideas: Egont, A Web Orchestration Language

Resources

  1. A New Data Structure for Cumulative Frequency Tables
  2. Select random k elements from a list whose elements have weights and the roulette wheel answer
  3. A comparative analysis of selection schemes used in genetic algorithms
  4. A Framework for Alternate Queueing: Towards Traffic Management by PC-UNIX Based Routers
  5. Stack and Queue Layouts of Directed Acyclic Graphs
  6. Dynamic Data Structures for Taskgraph Scheduling Policies with Applications in OpenCL Accelerators

Photo taken by Kristofer Björkman

Adding Acknowledgement Semantics to a Persistent Queue

Persistence capability is not enough to ensure the reliability of message oriented middleware. Suppose that you retrieve an item from a queue, and the application or thread crashes in the middle of the process. The item and processes depending on it will be lost, since the crash occurred after retrieving the item from the queue. Acknowledgement semantics can prevent this loss If the application crashes before acknowledging an item. This item will continue to be available to other consumers until an acknowledgment is sent.

This Python code shows how to add acknowledgement to a class derived from the Python Queue class. In the article Persisting Native Python Queues we only show how to persist a queue. It is important to note that we have modified the base Python Queue class, adding the “connect” and “ack” methods. Each application thread must call the “connect” method before using the queue object. The “connect” method returns a unique queue proxy. If the thread crashes, the items that have been fetched, but not acknowledged, in this queue are enqueued again. The “ack” method uses the item returned by the “get” method and effectively removes the item from the queue. In this code ZODB is used for persistence instead of DyBASE. If the entire application crashes, not just a single thread, unacknowledged items are requeued when it restarts.

While acknowledgement semantics increases reliability, it is not infallible. Imagine that after processing an acknowledged item, the result of the process is also added to the queue. In some web crawling implementations, first a URL is retrieved from a queue and acknowledged, then an HTML page is fetched from that URL, and finally the links on that page are inserted in the queue. Two problems can occur if the application or thread crashes during this process. If items, in this case URLs, are acknowledged and thus eliminated as soon as they are retrieved, they may be eliminated before enqueuing all of the links on the page. In this case, the remaining links will be lost. If, on the other hand, items are acknowledged only after enqueuing all the links, some links will be duplicated. This conflict is solved with queue transaction semantics. If the process or thread crashes a rollback is performed.

Notes

  1. This persistent queue with acknowledgement assumes that the objects in the queue all have different identities, id(obji) != id(objj) for all i,j. Making a copy of the object works for mutable objects. Immutable objects must be wrapped.
  2. The object classes in the queue must inherit from the Persistent class, including object members.

Prerequisites

  1. Python 2.x (x>=6)
  2. ZODB3

Code

The code is available at github and includes a series of unit tests.

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Persisting Native Python Queues

Resources

  1. AMQP Acknowledgement
  2. HornetQ Asynchronous Send Acknowledgements
  3. HornetQ Transactions
  4. ZODB In Real Life
  5. Storing Persistent Objects with Persistent Objects as attributes of the Parent PO

Photo taken by Paul Downey

Persisting Native Python Queues

Native Python queues do not allow you to stop and resume an application without loosing queue items. Adding persistence of objects to a derived Python Queue class partially addresses this issue. We use the DyBASE embedded object oriented database to persist queues items in files. If needed, you can instantiate multiple queues pointing to different files. Since our PersistentQueue class is derived from the Python Queue, it works in multithreading environments. Transactional support such as acknowledging successfully processed queue items is not currently a feature of this class.

In Using Queues in Web Crawling and Analysis Infrastructure we noted the relevancy of queues to connect heterogeneous technologies. Queues are also used in the context of a single technology to follow the typical producer/consumer pattern. For example, the Python programming language offers FIFO and priority queues, as does .NET. However, neither of these native queues persists. The Microsoft Windows Azure platform incorporates persistant queues but has other limitations, and also may be overkill for your solution.

There are several ways to persist a queue. If the items that you want to persist have a fixed buffer length then Berkeley DB’s queues or STXXL’s queues work well. You can’t use database managers like GDBM if you need a FIFO queue since you need to traverse the elements in order and the hash table does not assure this order. STXXL, and DyBASE use a B+Tree data structure. You may be tempted to use a database engine like SQLite which can be useful in many scenarios, but an SQL engine adds complexity that is not required for FIFO queues.

Prerequisites

  1. DyBASE: http://www.garret.ru/dybase.html

Code

The code is also available at github.

#!/usr/bin/python

from Queue import Queue
import dybase
import sys

MAX_INT = sys.maxint
MIN_INT = -MAX_INT - 1

#DEBUG = True
DEBUG = False

class Root(dybase.Persistent):
	def __init__(self):
		self.start = 0
		self.stop = 0

class SizeOfPersistentQueueExceeded(Exception):
	pass

class incomplete_persistent_deque:
	def __init__(self, filename):
		self._init_db(filename)

	def _init_db(self, filename):
		self.db = dybase.Storage()
		if self.db.open(filename):
			self.root = self.db.getRootObject()
			if self.root == None:
				self.root = Root()
				self.root.elements = self.db.createIntIndex() # createLongIndex can be used on 64 bits systems but it is strange to pass 2**32 elements in the queue
				self.root.pending_elements = self.db.createIntIndex()

				self.db.setRootObject(self.root)
				self.db.commit()
			else:
				if DEBUG:
					print "self.root already exists"

		if DEBUG:
			print "self.root.start =", self.root.start
			print "self.root.stop = ", self.root.stop

	def __len__(self):
		if self.root.stop >= self.root.start:
			return self.root.stop - self.root.start
		else:
			return (MAX_INT - self.root.start + 1) + (self.root.stop - MIN_INT)

	def append(self, item):
		# add element to index
		self.root.elements.insert(self.root.stop, item)
		self.root.stop += 1
		if self.root.stop > MAX_INT:
			# check also if stop touches start
			self.root.stop = MIN_INT

		if self.root.start == self.root.stop:
			raise SizeOfPersistentQueueExceeded

		# persist
		self.root.store()
		self.db.commit()

	def popleft(self):
		# don't check if empty, Queue class take charge of that
		# remove element from index
		item = self.root.elements.get(self.root.start)
		self.root.elements.remove(self.root.start)
		self.root.start += 1
		if self.root.start > MAX_INT:
			# check also if start touches stop
			self.root.start = MIN_INT 

		if self.root.start == self.root.stop: # if queue is empty resync start & stop to 0. It is for beautifier purposes can be removed.
			self.root.start = 0
			self.root.stop = 0

		# persist
		self.root.store()
		self.db.commit()

		return item

class PersistentQueue(Queue):
	def __init__(self, filename, maxsize = 0):
		self.filename = filename
		Queue.__init__(self, maxsize)

	def _init(self, maxsize):
		# original: self.queue = deque()

		# incomplete_persistent_deque:
		# - incomplete implementation but enough for Queue:
		# - implemented methods:
		# -- __len__
		# -- append
		# -- popleft
		#

		self.queue = incomplete_persistent_deque(self.filename)

	def connect(self): # to handle failovers
		pass

	def ack(self):
		pass

	#def ack(self, item):

class ElementTest:
	def __init__(self, value):
		self.value = value

	def __repr__(self):
		return self.value

	def __str__(self):
		return self.value

def test1():
	q = PersistentQueue("myqueue.dbs")
	if not q.empty(): # get pending items
		while not q.empty():
			e = q.get()
			print e

	for s in ['one', 'two', 'three']:
		q.put(ElementTest(s))

def main(): # run this script twice to see the persisted elements
	test1()

if __name__ == '__main__':
	main()

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. [queue persistent site:stackoverflow.com] Google query
  2. bsddb3 Python interface for Berkeley DB
  3. bsddb3 examples
  4. STXXL queue class template

Using Queues in Web Crawling and Analysis Infrastructure

Message oriented middleware (MOM) is a key technology for implementing a custom pipeline and analyzing unstructured data. The pipeline for going from crawling web pages to part of speech tagging (PoST) and beyond is long. It requires a variety of processes which are implemented in several different programming languages and operating systems. For example, boilerpipe is an excellent Java library for extracting main text content while PoSTs libraries, like NLTK or FreeLing, are implemented in Python.

One might be tempted to integrate different technologies using web services but web services alone have many weak points. If the pipeline has ten processes and, for example, the last one fails, then the intermediate processes can be lost if they are not persisted. There must be a higher level mechanism in place to resume the pipeline processing. MOMs ensure message persistence until a consumer acknowledges that a specific process has finished.

There are a lot of MOMs to choose from, including commercial and free open source variants. Some features are present in almost all of them while others are not. Contention management is an important feature if you are dealing, as is likely, with a high ratio of messages produced to messages consumed at any one time. For example, a web crawler can fetch web pages at an incredibly high speed while processes like content extraction take longer. Running a message queue without contention management under these circumstances will exhaust the machine’s memory.

While MOMs are important for uniting heterogeneous technologies, the different processes must also know which queues to utilize to consume the input and produce the output for the next phases. A new wave of frameworks like NServiceBusResque, Celery, and Octobot has emerged to handle this.

In conclusion, MOMs help to connect heterogeneous technologies and bring robustness, and are very useful in the context of unstructured information like text analysis. Many MOMs are available, but there is not a single one with a complete feature set. However some of these features can be supplied by frameworks such as NServiceBus, Resque, Celery, and Octobot.

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Persisting Native Python Queues
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. Message Queues vs Web Services
  2. Message Queue Evaluation Notes
  3. The Hadoop Map-Reduce Capacity Scheduler
  4. Contention Management in the WSA
  5. Message Queuing Architectures