1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" Producer, browser and consumer. """
from dirq.queue import Queue, QueueError, QueueLockError
import os
import sys
import tempfile
# total number of elements
COUNT = 9
# queue head directory
path = tempfile.mkdtemp()
# max elements per elements directory
maxelts = 3
# element's schema
schema = {'body': 'string',
'header': 'table?'}
# ========
# PRODUCER
print("*** PRODUCER")
dirq_p = Queue(path, maxelts=maxelts, schema=schema)
print("adding %d elements to the queue at %s" % (COUNT, path))
done = 1
while done <= COUNT:
element = {}
try:
element['body'] = ('Élément %i \u263A\n' % done).decode("utf-8")
except AttributeError:
element['body'] = 'Élément %i \u263A\n' % done
if done % 2: # header only for odd sequential elements
element['header'] = dict(os.environ)
name = dirq_p.enqueue(element)
# name = dirq_p.add(element) # same
print("added %.2i: %s" % (done, name))
done += 1
total_stored = dirq_p.count()
print("total elements in the queue: %d" % total_stored)
assert total_stored == COUNT
del dirq_p
print('=' * 25)
# =======
# BROWSER
print("*** BROWSER: Python iterator protocol.")
dirq_b = Queue(path, schema=schema)
done = 0
miss = 0
for i, name in enumerate(dirq_b.names()):
done += 1 # enumerate(o, start=1) in Python 2.6
print("element: %s %s" % (path, name))
try:
if i in [1, 4, 5]: # artificially lock some elements
os.mkdir('%s/%s/locked' % (path, name))
e = dirq_b.get_element(name) # lock(name), get(name), unlock(name)
except QueueLockError:
error = sys.exc_info()[1]
print(error)
miss += 1
continue
except QueueError:
error = sys.exc_info()[1]
print(error)
miss += 1
continue
except Exception:
error = sys.exc_info()[1]
print(error)
break
print("element: %i" % i, e.keys())
print("found %i elements; got %i" % (done, done - miss))
del dirq_b
print("*** BROWSER: lower level iterator protocol.")
dirq_b = Queue(path, schema=schema)
done = 0
miss = 0
name = dirq_b.first()
while name:
print("element: %s %s" % (path, name))
if not dirq_b.lock(name):
name = dirq_b.next()
miss += 1
continue
element = dirq_b.get(name)
print("element: %i" % done, element.keys())
dirq_b.unlock(name)
done += 1
name = dirq_b.next()
print("found %i elements; got %i" % (done + miss, done))
assert done == (COUNT - miss)
del dirq_b
print('=' * 25)
# ========
# CONSUMER
print("*** CONSUMER: Python iterator protocol.")
dirq_c = Queue(path, schema=schema)
miss = 0
for i, name in enumerate(dirq_c):
i += 1 # enumerate(o, start=1) in Python 2.6
print("element %i: %s %s" % (i, path, name))
try:
e = dirq_c.dequeue(name) # lock(name), get(name), remove(name)
except QueueError:
error = sys.exc_info()[1]
print(error)
miss += 1
continue
except Exception:
error = sys.exc_info()[1]
print(error)
break
print(e.keys())
print("consumed %i elements out of %i seen" % (i - miss, i))
total_left = dirq_c.count()
print("elements left in the queue: %d" % total_left)
assert total_left == miss
del dirq_c
print("*** CONSUMER: lower level iterator protocol.")
path = tempfile.mkdtemp()
print("create new queue and add elements")
print("adding %d elements to the queue at %s" % (COUNT, path))
dirq_p = Queue(path, maxelts=maxelts, schema=schema)
done = 1
while done <= COUNT:
element = {}
try:
element['body'] = ('Élément %i \u263A\n' % done).decode("utf-8")
except AttributeError:
element['body'] = 'Élément %i \u263A\n' % done
if done % 2: # header only for odd sequential elements
element['header'] = dict(os.environ)
name = dirq_p.enqueue(element)
print("added %.2i: %s" % (done, name))
done += 1
print("start consuming...")
dirq_c = Queue(path, schema=schema)
done = 0
name = dirq_c.first()
while name:
print("element: %s %s" % (path, name))
if not dirq_c.lock(name):
print("couldn't lock: %s" % name)
name = dirq_c.next()
continue
element = dirq_c.get(name)
print(element.keys())
dirq_c.remove(name)
done += 1
name = dirq_c.next()
print("consumed %i elements" % done)
total_left = dirq_c.count()
print("elements left in the queue: %d" % total_left)
assert total_left == 0
|