File: dqst.py

package info (click to toggle)
python-dirq 1.8-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 336 kB
  • sloc: python: 2,200; makefile: 166
file content (212 lines) | stat: -rw-r--r-- 6,356 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test program for testing the dirq.QueueSet module.
"""

import os
import re
import shutil
import sys
import time

from optparse import OptionParser

sys.path.insert(1, re.sub(r'/\w*$', '', os.getcwd()))

import dirq  # noqa E402
from dirq import queue  # noqa E402
from dirq.QueueSimple import QueueSimple  # noqa E402

opts = None
TEST = ''
ProgramName = sys.argv[0]


def init():
    """ Initialize. """
    global opts, TEST
    parser = OptionParser(usage="%prog [OPTIONS] [--] TEST",
                          version=("%%prog %s" % dirq.VERSION))
    parser.add_option('-d', '--debug', dest='debug', action="store_true",
                      default=False, help="show debugging information")
    parser.add_option('-p', '--path', dest='path', type='string', default='',
                      help="set the queue path")
    parser.add_option('-c', '--count', dest='count', type='int', default=0,
                      help="set the elements count")
    parser.add_option("-s", "--size", dest="size", type='int', default=0,
                      help="set the body size for added elements")
    parser.add_option("-r", "--random", dest="random", action="store_true",
                      default=False, help="randomize the body size")
    parser.add_option("--granularity", dest="granularity", type="int",
                      default=None, help="time granularity for intermediate "
                      "directories (QueueSimple)")
    parser.add_option("--header", dest="header", action="store_true",
                      default=False, help="set header for added elements")
    parser.add_option("--maxelts", dest="maxelts", type='int', default=0,
                      help="set the maximum number of elements per directory")
    parser.add_option("--type", dest="type", type="string", default="simple",
                      help="set the type of dirq (simple|normal)")
    opts, args = parser.parse_args()
    if not opts.path:
        _die("%s: mandatory option not set: -p/--path", ProgramName)
    if len(args) != 0:
        TEST = args[0]
    else:
        parser.print_help()
        sys.exit()


def debug(fmt, *arguments):
    """Report a debugging message.
    """
    if not opts.debug:
        return
    message = fmt % arguments
    message = re.sub(r'\s+$', '.', message)
    sys.stderr.write("# %s %s[%d]: %s\n" %
                     (time.strftime("%Y/%m/%d-%H:%M:%S",
                                    time.localtime(time.time())),
                      os.path.basename(sys.argv[0]),
                      os.getpid(), message))


def _die(fmt, *arguments):
    """Report a fatal error."""
    sys.stderr.write(fmt % arguments + "\n")
    sys.stderr.flush()
    sys.exit(1)


def new_dirq(path, _schema):
    """Create a new Directory::Queue object, optionally with schema.
    """
    kwargs = {}
    if opts.type == "simple":
        if opts.granularity is not None:
            kwargs['granularity'] = opts.granularity
        return QueueSimple(path, **kwargs)
    else:
        if _schema:
            schema = {'body': 'string',
                      'header': 'table?'}
            kwargs['schema'] = schema
        if opts.maxelts:
            kwargs['maxelts'] = opts.maxelts
        return queue.Queue(path, **kwargs)


def new_dirqs():
    """Create a new Directory::Queue object, optionally with schema.
    """
    time1 = time.time()
    qs = queue.QueueSet()
    for path in opts.path.split(','):
        qs.add(new_dirq(path, 0))
    debug("created queue set in %.4f seconds", time.time() - time1)
    return qs


def test_count():
    """Count the elements in the queue.
    """
    qs = new_dirqs()
    time1 = time.time()
    count = qs.count()
    time2 = time.time()
    debug("queue set has %d elements", count)
    debug("done in %.4f seconds", time2 - time1)


def test_add():
    """Add elements to the queue.
    """


def test_complex():
    """Add elements to the queue.
    """
    wd = opts.path
    os.mkdir(wd)
    qn = 6
    paths = []
    for i in range(qn):
        paths.append(wd + '/q%i' % i)
    count = opts.count or 1000
    debug("creating %i initial queues. adding %i elements into each." %
          (qn, count))
    queues = []
    t1 = time.time()
    while qn:
        dq = new_dirq(paths[qn - 1], 1)
        debug("adding %d elements to the queue...", count)
        element = {}
        done = 0
        time1 = time.time()
        while not count or done < count:
            done += 1
            element['body'] = 'Element %i \u263A\n' % done
            dq.add(element)
        time2 = time.time()
        debug("done in %.4f seconds", time2 - time1)
        queues.append(dq)
        qn -= 1
    debug("total done in %.4f seconds", time.time() - t1)

    time1 = time.time()
    i = 3
    qs = queue.QueueSet(queues[0:i])
    debug("created queue set in %.4f seconds", time.time() - time1)
    debug("elements in %i queues: %i" % (i, qs.count()))

    debug("adding remaining queues to the set.")
    t1 = time.time()
    qs.add(queues[i:])
    debug("done in %.4f sec." % (time.time() - t1))
    debug("total element with added queues: %i" % qs.count())

    debug("removing %i first queues." % i)
    t1 = time.time()
    for dq in queues[0:i]:
        qs.remove(dq)
    debug("done in %.4f sec." % (time.time() - t1))

    debug("number of elements left: %i" % qs.count())

    debug("deleting queues from disk...")
    for path in paths:
        shutil.rmtree(path, ignore_errors=True)
    debug("done.")


def test_iterate():
    """Iterate through the set of queues (only lock+unlock).
    """
    debug("iterating all elements in the set of queues (one pass)...")
    qs = new_dirqs()
    done = 0
    time1 = time.time()
    dq, name = qs.first()
    while dq:
        if not dq.lock(name):
            dq, name = qs.next()
            continue
        dq.unlock(name)
        done += 1
        dq, name = qs.next()
    time2 = time.time()
    debug("done in %.4f seconds (%d elements)", time2 - time1, done)


if __name__ == "__main__":
    init()
    if TEST == "count":
        test_count()
    elif TEST == "add":
        test_add()
    elif TEST == "complex":
        test_complex()
    elif TEST == "iterate":
        test_iterate()
    else:
        _die("%s: unsupported test: %s", ProgramName, TEST)