Persisting Native Python Queues

Native Python queues do not allow you to stop and resume an application without loosing queue items. Adding persistence of objects to a derived Python Queue class partially addresses this issue. We use the DyBASE embedded object oriented database to persist queues items in files. If needed, you can instantiate multiple queues pointing to different files. Since our PersistentQueue class is derived from the Python Queue, it works in multithreading environments. Transactional support such as acknowledging successfully processed queue items is not currently a feature of this class.

In Using Queues in Web Crawling and Analysis Infrastructure we noted the relevancy of queues to connect heterogeneous technologies. Queues are also used in the context of a single technology to follow the typical producer/consumer pattern. For example, the Python programming language offers FIFO and priority queues, as does .NET. However, neither of these native queues persists. The Microsoft Windows Azure platform incorporates persistant queues but has other limitations, and also may be overkill for your solution.

There are several ways to persist a queue. If the items that you want to persist have a fixed buffer length then Berkeley DB’s queues or STXXL’s queues work well. You can’t use database managers like GDBM if you need a FIFO queue since you need to traverse the elements in order and the hash table does not assure this order. STXXL, and DyBASE use a B+Tree data structure. You may be tempted to use a database engine like SQLite which can be useful in many scenarios, but an SQL engine adds complexity that is not required for FIFO queues.

Prerequisites

  1. DyBASE: http://www.garret.ru/dybase.html

Code

The code is also available at github.

#!/usr/bin/python

from Queue import Queue
import dybase
import sys

MAX_INT = sys.maxint
MIN_INT = -MAX_INT - 1

#DEBUG = True
DEBUG = False

class Root(dybase.Persistent):
	def __init__(self):
		self.start = 0
		self.stop = 0

class SizeOfPersistentQueueExceeded(Exception):
	pass

class incomplete_persistent_deque:
	def __init__(self, filename):
		self._init_db(filename)

	def _init_db(self, filename):
		self.db = dybase.Storage()
		if self.db.open(filename):
			self.root = self.db.getRootObject()
			if self.root == None:
				self.root = Root()
				self.root.elements = self.db.createIntIndex() # createLongIndex can be used on 64 bits systems but it is strange to pass 2**32 elements in the queue
				self.root.pending_elements = self.db.createIntIndex()

				self.db.setRootObject(self.root)
				self.db.commit()
			else:
				if DEBUG:
					print "self.root already exists"

		if DEBUG:
			print "self.root.start =", self.root.start
			print "self.root.stop = ", self.root.stop

	def __len__(self):
		if self.root.stop >= self.root.start:
			return self.root.stop - self.root.start
		else:
			return (MAX_INT - self.root.start + 1) + (self.root.stop - MIN_INT)

	def append(self, item):
		# add element to index
		self.root.elements.insert(self.root.stop, item)
		self.root.stop += 1
		if self.root.stop > MAX_INT:
			# check also if stop touches start
			self.root.stop = MIN_INT

		if self.root.start == self.root.stop:
			raise SizeOfPersistentQueueExceeded

		# persist
		self.root.store()
		self.db.commit()

	def popleft(self):
		# don't check if empty, Queue class take charge of that
		# remove element from index
		item = self.root.elements.get(self.root.start)
		self.root.elements.remove(self.root.start)
		self.root.start += 1
		if self.root.start > MAX_INT:
			# check also if start touches stop
			self.root.start = MIN_INT 

		if self.root.start == self.root.stop: # if queue is empty resync start & stop to 0. It is for beautifier purposes can be removed.
			self.root.start = 0
			self.root.stop = 0

		# persist
		self.root.store()
		self.db.commit()

		return item

class PersistentQueue(Queue):
	def __init__(self, filename, maxsize = 0):
		self.filename = filename
		Queue.__init__(self, maxsize)

	def _init(self, maxsize):
		# original: self.queue = deque()

		# incomplete_persistent_deque:
		# - incomplete implementation but enough for Queue:
		# - implemented methods:
		# -- __len__
		# -- append
		# -- popleft
		#

		self.queue = incomplete_persistent_deque(self.filename)

	def connect(self): # to handle failovers
		pass

	def ack(self):
		pass

	#def ack(self, item):

class ElementTest:
	def __init__(self, value):
		self.value = value

	def __repr__(self):
		return self.value

	def __str__(self):
		return self.value

def test1():
	q = PersistentQueue("myqueue.dbs")
	if not q.empty(): # get pending items
		while not q.empty():
			e = q.get()
			print e

	for s in ['one', 'two', 'three']:
		q.put(ElementTest(s))

def main(): # run this script twice to see the persisted elements
	test1()

if __name__ == '__main__':
	main()

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. [queue persistent site:stackoverflow.com] Google query
  2. bsddb3 Python interface for Berkeley DB
  3. bsddb3 examples
  4. STXXL queue class template

Using Queues in Web Crawling and Analysis Infrastructure

Message oriented middleware (MOM) is a key technology for implementing a custom pipeline and analyzing unstructured data. The pipeline for going from crawling web pages to part of speech tagging (PoST) and beyond is long. It requires a variety of processes which are implemented in several different programming languages and operating systems. For example, boilerpipe is an excellent Java library for extracting main text content while PoSTs libraries, like NLTK or FreeLing, are implemented in Python.

One might be tempted to integrate different technologies using web services but web services alone have many weak points. If the pipeline has ten processes and, for example, the last one fails, then the intermediate processes can be lost if they are not persisted. There must be a higher level mechanism in place to resume the pipeline processing. MOMs ensure message persistence until a consumer acknowledges that a specific process has finished.

There are a lot of MOMs to choose from, including commercial and free open source variants. Some features are present in almost all of them while others are not. Contention management is an important feature if you are dealing, as is likely, with a high ratio of messages produced to messages consumed at any one time. For example, a web crawler can fetch web pages at an incredibly high speed while processes like content extraction take longer. Running a message queue without contention management under these circumstances will exhaust the machine’s memory.

While MOMs are important for uniting heterogeneous technologies, the different processes must also know which queues to utilize to consume the input and produce the output for the next phases. A new wave of frameworks like NServiceBusResque, Celery, and Octobot has emerged to handle this.

In conclusion, MOMs help to connect heterogeneous technologies and bring robustness, and are very useful in the context of unstructured information like text analysis. Many MOMs are available, but there is not a single one with a complete feature set. However some of these features can be supplied by frameworks such as NServiceBus, Resque, Celery, and Octobot.

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Persisting Native Python Queues
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. Message Queues vs Web Services
  2. Message Queue Evaluation Notes
  3. The Hadoop Map-Reduce Capacity Scheduler
  4. Contention Management in the WSA
  5. Message Queuing Architectures

Integrating Google Analytics into your Company Loop with a Microsoft Excel Add-on

 

Introduction

Google Analytics and AdWords are essential marketing and sales tools. They can be integrated with the ubiquitous Microsoft Excel with the Google Data API. Data Big Bang’s Nicolas Papagna has developed an Excel add-on which can be downloaded here. This plugin enables Excel users to quickly retrieve Google Analytics data using the available Google Analytics metrics, and dimensions, and may also be sorted by the user’s criteria. One of the advantages of our solution is that Excel accesses the Google Analytics API directly instead of accessing it thru Data Big Bang server. Other solutions need access to your information which this exposes your private data to third parties.

Installation and Usage

  1. Download GoogleAnalyticsToExcel.AddInSetup_1.0.20.0.exe.
  2. Install it.
  3. Run Microsoft Excel.
  4. Configure your Google credentials by clicking on “Settings” under the “Google Analytics to Excel Addin” ribbon tab.
  5. Customize your query and retrieve your Google Analytics data by clicking “Query Google Analytics” button.

Development Notes

Data Big Bang’s research team has also developed an OData web service that can be consumed using applications such as PowerPivot, Tableau and LINQPad. This web service doesn’t require any add-ons. However, since unfortunately neither PowerPivot nor Tableau offer query builders to interact with OData providers, users must know how to craft the OData URL query themselves. The most interesting part of this project was developing a Google Data Protocol to Open Data Protocol .NET class that offers an IQueryable interface to convert LINQ queries to GData. LINQ queries add a lot of expressive power beyond GData.

See Also

  1. Automated Discovery of Blog Feeds and Twitter, Facebook, LinkedIn Accounts Connected to Business Website
  2. Integrating Dropbox with Microsoft Outlook
  3. Exporting StackOverflow users blogs to Excel Hyperlinks