File: test_fuzz_zmq.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (108 lines) | stat: -rw-r--r-- 3,436 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import logging
import os
import socket
import time

import pytest
import zmq

import parsl

logger = logging.getLogger(__name__)


@parsl.python_app
def this_app():
    return 5


@pytest.mark.local
def test_row_counts():
    import sqlalchemy
    from sqlalchemy import text

    from parsl.tests.configs.htex_local_alternate import fresh_config

    if os.path.exists("runinfo/monitoring.db"):
        logger.info("Monitoring database already exists - deleting")
        os.remove("runinfo/monitoring.db")

    logger.info("loading parsl")
    parsl.load(fresh_config())

    logger.info("invoking apps and waiting for result")

    assert this_app().result() == 5
    assert this_app().result() == 5

    # now we've run some apps, send fuzz into the monitoring ZMQ
    # socket, before trying to run some more tests.

    # there are different kinds of fuzz:
    # could send ZMQ messages that are weird
    # could send random bytes to the TCP socket
    #   the latter is what i'm most suspicious of in my present investigation

    # dig out the interchange port...
    hub_address = parsl.dfk().monitoring.hub_address
    hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port

    # this will send a string to a new socket connection
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((hub_address, hub_zmq_port))
        s.sendall(b'fuzzing\r')

    context = zmq.Context()
    channel_timeout = 10000  # in milliseconds
    hub_channel = context.socket(zmq.DEALER)
    hub_channel.setsockopt(zmq.LINGER, 0)
    hub_channel.set_hwm(0)
    hub_channel.setsockopt(zmq.SNDTIMEO, channel_timeout)
    hub_channel.connect("tcp://{}:{}".format(hub_address, hub_zmq_port))

    # this will send a non-object down the DFK's existing ZMQ connection
    hub_channel.send(b'FuzzyByte\rSTREAM')

    # This following attack is commented out, because monitoring is not resilient
    # to this.
    # In practice, it works some of the time but in some circumstances,
    # it would still abandon writing multiple unrelated records to the database,
    # causing ongoing monitoring data loss.

    # This will send an unusual python object down the
    # DFK's existing ZMQ connection. this doesn't break the router,
    # but breaks the db_manager in a way that isn't reported until
    # the very end of the run, and database writing is abandoned
    # rather than completing, in this case.
    # I'm unclear if this is a case we should be trying to handle.
    # parsl.dfk().monitoring._dfk_channel.send_pyobj("FUZZ3")

    # hopefully long enough for any breakage to happen
    # before attempting to run more tasks
    time.sleep(5)

    assert this_app().result() == 5
    assert this_app().result() == 5

    logger.info("cleaning up parsl")
    parsl.dfk().cleanup()

    # at this point, we should find one row in the monitoring database.

    logger.info("checking database content")
    engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db")
    with engine.begin() as connection:

        result = connection.execute(text("SELECT COUNT(*) FROM workflow"))
        (c, ) = result.first()
        assert c == 1

        result = connection.execute(text("SELECT COUNT(*) FROM task"))
        (c, ) = result.first()
        assert c == 4

        result = connection.execute(text("SELECT COUNT(*) FROM try"))
        (c, ) = result.first()
        assert c == 4

    logger.info("all done")