1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
import logging
import os
import socket
import time
import pytest
import zmq
import parsl
logger = logging.getLogger(__name__)
@parsl.python_app
def this_app():
return 5
@pytest.mark.local
def test_row_counts():
import sqlalchemy
from sqlalchemy import text
from parsl.tests.configs.htex_local_alternate import fresh_config
if os.path.exists("runinfo/monitoring.db"):
logger.info("Monitoring database already exists - deleting")
os.remove("runinfo/monitoring.db")
logger.info("loading parsl")
parsl.load(fresh_config())
logger.info("invoking apps and waiting for result")
assert this_app().result() == 5
assert this_app().result() == 5
# now we've run some apps, send fuzz into the monitoring ZMQ
# socket, before trying to run some more tests.
# there are different kinds of fuzz:
# could send ZMQ messages that are weird
# could send random bytes to the TCP socket
# the latter is what i'm most suspicious of in my present investigation
# dig out the interchange port...
hub_address = parsl.dfk().monitoring.hub_address
hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port
# this will send a string to a new socket connection
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((hub_address, hub_zmq_port))
s.sendall(b'fuzzing\r')
context = zmq.Context()
channel_timeout = 10000 # in milliseconds
hub_channel = context.socket(zmq.DEALER)
hub_channel.setsockopt(zmq.LINGER, 0)
hub_channel.set_hwm(0)
hub_channel.setsockopt(zmq.SNDTIMEO, channel_timeout)
hub_channel.connect("tcp://{}:{}".format(hub_address, hub_zmq_port))
# this will send a non-object down the DFK's existing ZMQ connection
hub_channel.send(b'FuzzyByte\rSTREAM')
# This following attack is commented out, because monitoring is not resilient
# to this.
# In practice, it works some of the time but in some circumstances,
# it would still abandon writing multiple unrelated records to the database,
# causing ongoing monitoring data loss.
# This will send an unusual python object down the
# DFK's existing ZMQ connection. this doesn't break the router,
# but breaks the db_manager in a way that isn't reported until
# the very end of the run, and database writing is abandoned
# rather than completing, in this case.
# I'm unclear if this is a case we should be trying to handle.
# parsl.dfk().monitoring._dfk_channel.send_pyobj("FUZZ3")
# hopefully long enough for any breakage to happen
# before attempting to run more tasks
time.sleep(5)
assert this_app().result() == 5
assert this_app().result() == 5
logger.info("cleaning up parsl")
parsl.dfk().cleanup()
# at this point, we should find one row in the monitoring database.
logger.info("checking database content")
engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db")
with engine.begin() as connection:
result = connection.execute(text("SELECT COUNT(*) FROM workflow"))
(c, ) = result.first()
assert c == 1
result = connection.execute(text("SELECT COUNT(*) FROM task"))
(c, ) = result.first()
assert c == 4
result = connection.execute(text("SELECT COUNT(*) FROM try"))
(c, ) = result.first()
assert c == 4
logger.info("all done")
|