1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
import os
import time
from jobqueue import db, dispatcher, notify, store
db.DEBUG = False
DEBUG = False
path = os.path.abspath(os.path.dirname(__file__))
URI = "sqlite:///%s/test.db" % path
def setupdb(uri):
db.DB_URI = uri
store.ROOT = "/tmp/test_store/%s"
if uri.startswith("sqlite"):
try:
os.unlink(uri[10:])
except:
print("could not unlink", uri[10:])
pass
queue = dispatcher.Scheduler()
return queue
def checkspeed(uri=URI):
# Isolate the cost of database access
store.create = lambda *args: {}
store.get = lambda *args: {}
store.put = lambda *args: {}
notify.notify = lambda *args, **kw: None
queue = setupdb(uri)
testj = {"name": "test1", "notify": "me"}
t = time.time()
for i in range(80):
for j in range(10):
testj["notify"] = "me%d" % j
queue.submit(testj, origin="here%d" % j)
for j in range(10):
request = queue.nextjob(queue="cue")
queue.postjob(1, {"status": "COMPLETE", "result": 0})
print(10 * (i + 1), time.time() - t)
t = time.time()
def test(uri=URI):
queue = setupdb(uri)
def checkqueue(pending=[], active=[], complete=[]):
qpending = queue.jobs("PENDING")
qactive = queue.jobs("ACTIVE")
qcomplete = queue.jobs("COMPLETE")
if DEBUG:
print("pending", qpending, "active", qactive, "complete", qcomplete)
assert pending == qpending
assert active == qactive
assert complete == qcomplete
test1 = {"name": "test1", "notify": "me"}
test2 = {"name": "test2", "notify": "me"}
test3 = {"name": "test3", "notify": "you"}
# No jobs available for running initially
checkqueue([], [], [])
request = queue.nextjob(queue="cue")
if DEBUG:
print("nextjob", request)
assert request["request"] is None
# jobs = queue.jobs()
# print "initially empty job list", jobs
# assert jobs == []
job1 = queue.submit(test1, origin="here")
if DEBUG:
print("first job id", job1)
assert job1 == 1
job2 = queue.submit(test2, origin="here")
assert job2 == 2
checkqueue([1, 2], [], [])
if DEBUG:
print("status(0)", queue.status(1))
assert queue.status(1) == "PENDING"
if DEBUG:
print("status(3)", queue.status(3))
assert queue.status(3) == "UNKNOWN"
if DEBUG:
print("info(0)", queue.info(1))
assert queue.info(1)["name"] == test1["name"]
request = queue.nextjob(queue="cue")
if DEBUG:
print("nextjob", request)
assert request["request"]["name"] == test1["name"]
checkqueue([2], [1], [])
job2 = queue.submit(test3, origin="there")
request = queue.nextjob(queue="cue")
if DEBUG:
print("nextjob", request)
assert request["request"]["name"] == test3["name"]
checkqueue([2], [1, 3], [])
queue.postjob(1, {"status": "COMPLETE", "result": 0})
checkqueue([2], [3], [1])
if __name__ == "__main__":
test()
# checkspeed()
|