Persisting Native Python Queues

Native Python queues do not allow you to stop and resume an application without loosing queue items. Adding persistence of objects to a derived Python Queue class partially addresses this issue. We use the DyBASE embedded object oriented database to persist queues items in files. If needed, you can instantiate multiple queues pointing to different files. Since our PersistentQueue class is derived from the Python Queue, it works in multithreading environments. Transactional support such as acknowledging successfully processed queue items is not currently a feature of this class.

In Using Queues in Web Crawling and Analysis Infrastructure we noted the relevancy of queues to connect heterogeneous technologies. Queues are also used in the context of a single technology to follow the typical producer/consumer pattern. For example, the Python programming language offers FIFO and priority queues, as does .NET. However, neither of these native queues persists. The Microsoft Windows Azure platform incorporates persistant queues but has other limitations, and also may be overkill for your solution.

There are several ways to persist a queue. If the items that you want to persist have a fixed buffer length then Berkeley DB’s queues or STXXL’s queues work well. You can’t use database managers like GDBM if you need a FIFO queue since you need to traverse the elements in order and the hash table does not assure this order. STXXL, and DyBASE use a B+Tree data structure. You may be tempted to use a database engine like SQLite which can be useful in many scenarios, but an SQL engine adds complexity that is not required for FIFO queues.

Prerequisites

  1. DyBASE: http://www.garret.ru/dybase.html

Code

The code is also available at github.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/python
 
from Queue import Queue
import dybase
import sys
 
MAX_INT = sys.maxint
MIN_INT = -MAX_INT - 1
 
#DEBUG = True
DEBUG = False
 
class Root(dybase.Persistent):
    def __init__(self):
        self.start = 0
        self.stop = 0
 
class SizeOfPersistentQueueExceeded(Exception):
    pass
 
class incomplete_persistent_deque:
    def __init__(self, filename):
        self._init_db(filename)
 
    def _init_db(self, filename):
        self.db = dybase.Storage()
        if self.db.open(filename):
            self.root = self.db.getRootObject()
            if self.root == None:
                self.root = Root()
                self.root.elements = self.db.createIntIndex() # createLongIndex can be used on 64 bits systems but it is strange to pass 2**32 elements in the queue
                self.root.pending_elements = self.db.createIntIndex()
 
                self.db.setRootObject(self.root)
                self.db.commit()
            else:
                if DEBUG:
                    print "self.root already exists"
 
        if DEBUG:
            print "self.root.start =", self.root.start
            print "self.root.stop = ", self.root.stop
 
    def __len__(self):
        if self.root.stop >= self.root.start:
            return self.root.stop - self.root.start
        else:
            return (MAX_INT - self.root.start + 1) + (self.root.stop - MIN_INT)
 
    def append(self, item):
        # add element to index
        self.root.elements.insert(self.root.stop, item)
        self.root.stop += 1
        if self.root.stop > MAX_INT:
            # check also if stop touches start
            self.root.stop = MIN_INT
 
        if self.root.start == self.root.stop:
            raise SizeOfPersistentQueueExceeded
 
        # persist
        self.root.store()
        self.db.commit()
 
    def popleft(self):
        # don't check if empty, Queue class take charge of that
        # remove element from index
        item = self.root.elements.get(self.root.start)
        self.root.elements.remove(self.root.start)
        self.root.start += 1
        if self.root.start > MAX_INT:
            # check also if start touches stop
            self.root.start = MIN_INT
 
        if self.root.start == self.root.stop: # if queue is empty resync start & stop to 0. It is for beautifier purposes can be removed.
            self.root.start = 0
            self.root.stop = 0
 
        # persist
        self.root.store()
        self.db.commit()
 
        return item
 
class PersistentQueue(Queue):
    def __init__(self, filename, maxsize = 0):
        self.filename = filename
        Queue.__init__(self, maxsize)
 
    def _init(self, maxsize):
        # original: self.queue = deque()
 
        # incomplete_persistent_deque:
        # - incomplete implementation but enough for Queue:
        # - implemented methods:
        # -- __len__
        # -- append
        # -- popleft
        #
 
        self.queue = incomplete_persistent_deque(self.filename)
 
    def connect(self): # to handle failovers
        pass
 
    def ack(self):
        pass
 
    #def ack(self, item):
 
class ElementTest:
    def __init__(self, value):
        self.value = value
 
    def __repr__(self):
        return self.value
 
    def __str__(self):
        return self.value
 
def test1():
    q = PersistentQueue("myqueue.dbs")
    if not q.empty(): # get pending items
        while not q.empty():
            e = q.get()
            print e
 
    for s in ['one', 'two', 'three']:
        q.put(ElementTest(s))
 
def main(): # run this script twice to see the persisted elements
    test1()
 
if __name__ == '__main__':
    main()

See Also

  1. Esoteric Queue Scheduling Disciplines
  2. Using Queues in Web Crawling and Analysis Infrastructure
  3. Adding Acknowledgement Semantics to a Persistent Queue

Resources

  1. [queue persistent site:stackoverflow.com] Google query
  2. bsddb3 Python interface for Berkeley DB
  3. bsddb3 examples
  4. STXXL queue class template