1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
import logging
import os
import time
import pytest
import parsl
logger = logging.getLogger(__name__)
@parsl.python_app
def this_app():
return 5
@pytest.mark.local
def test_row_counts():
import sqlalchemy
from sqlalchemy import text
from parsl.tests.configs.htex_local_alternate import fresh_config
if os.path.exists("runinfo/monitoring.db"):
logger.info("Monitoring database already exists - deleting")
os.remove("runinfo/monitoring.db")
engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db")
logger.info("loading parsl")
parsl.load(fresh_config())
# parsl.load() returns before all initialisation of monitoring
# is complete, which means it isn't safe to take a read lock on
# the database yet. This delay tries to work around that - some
# better async behaviour might be nice, but what?
#
# Taking a read lock before monitoring is initialized will cause
# a failure in the part of monitoring which creates tables, and
# which is not protected against read locks at the time this test
# was written.
time.sleep(10)
# to get an sqlite3 read lock that is held over a controllable
# long time, create a transaction and perform a SELECT in it.
# The lock will be held until the end of the transaction.
# (see bottom of https://sqlite.org/lockingv3.html)
logger.info("Getting a read lock on the monitoring database")
with engine.begin() as readlock_connection:
readlock_connection.execute(text("BEGIN TRANSACTION"))
result = readlock_connection.execute(text("SELECT COUNT(*) FROM workflow"))
(c, ) = result.first()
assert c == 1
# now readlock_connection should have a read lock that will
# stay locked until the transaction is ended, or the with
# block ends.
logger.info("invoking and waiting for result")
assert this_app().result() == 5
# there is going to be some raciness here making sure that
# the database manager actually tries to write while the
# read lock is held. I'm not sure if there is a better way
# to detect this other than a hopefully long-enough sleep.
time.sleep(10)
logger.info("cleaning up parsl")
parsl.dfk().cleanup()
# at this point, we should find data consistent with executing one
# task in the database.
logger.info("checking database content")
with engine.begin() as connection:
result = connection.execute(text("SELECT COUNT(*) FROM workflow"))
(c, ) = result.first()
assert c == 1
result = connection.execute(text("SELECT COUNT(*) FROM task"))
(c, ) = result.first()
assert c == 1
result = connection.execute(text("SELECT COUNT(*) FROM try"))
(c, ) = result.first()
assert c == 1
logger.info("all done")
|