File: test_lighthouse_msgstream.py

package info (click to toggle)
laniakea 0.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 15,460 kB
  • sloc: javascript: 38,493; python: 21,153; sh: 196; makefile: 129; ansic: 3
file content (104 lines) | stat: -rw-r--r-- 3,655 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018-2022 Matthias Klumpp <matthias@tenstral.net>
#
# SPDX-License-Identifier: LGPL-3.0+

import json
from uuid import UUID

import zmq
import pytest

from laniakea.msgstream import (
    create_message_tag,
    create_event_message,
    create_submit_socket,
    submit_event_message,
    verify_event_message,
    create_event_listen_socket,
)


class TestLighthouseMsgStream:
    @pytest.fixture(autouse=True)
    def setup(self, localconfig, lighthouse_server, make_curve_trusted_key):
        from laniakea.db import LkModule
        from laniakea.msgstream import keyfile_read_verify_key, keyfile_read_signing_key

        sender_keyfile = make_curve_trusted_key('test-event-submitter')
        self._server_key_fname = localconfig.secret_curve_keyfile_for_module(LkModule.LIGHTHOUSE)

        sender_id, self._sender_verify_key = keyfile_read_verify_key(sender_keyfile)
        assert sender_id == 'test-event-submitter'
        assert self._sender_verify_key

        sender_id, self._sender_signing_key = keyfile_read_signing_key(sender_keyfile)
        assert sender_id == 'test-event-submitter'
        assert self._sender_signing_key

        self._sender_id = sender_id

        self._zctx = zmq.Context()

        # Launch server process
        lighthouse_server.start()

    def test_msg_creation(self):
        '''
        Test creation of new signed messages and check them for validity.
        '''
        from laniakea.db import LkModule

        assert create_message_tag(LkModule.ARCHIVE, 'new-source-packages') == '_lk.archive.new-source-packages'

        m = create_event_message(self._sender_id, '_lk.testsuite.dummy', {'aaa': 'bbb'}, self._sender_signing_key)
        assert m['tag'] == '_lk.testsuite.dummy'
        assert m['format'] == '1.0'
        assert UUID(m['uuid']).version == 1
        assert m['data'] == {'aaa': 'bbb'}

        sigs = m['signatures']
        assert sigs
        assert len(sigs[self._sender_id]['ed25519:0']) > 80

        # this function will throw on error and thereby cause a test failure
        verify_event_message(self._sender_id, m, self._sender_verify_key)

    def test_msg_simple_submit_listen(self):
        # create subscriber socket that is listening to all events emitted by the Lighthouse instance
        sub_socket = create_event_listen_socket(self._zctx)

        # create connection with the Lighthouse server to submit new events
        pub_socket = create_submit_socket(self._zctx)

        submit_event_message(
            pub_socket, self._sender_id, '_lk.testsuite.my-event', {'hello': 'world'}, self._sender_signing_key
        )

        for i in range(1, 4):
            try:
                mparts = sub_socket.recv_multipart()
                break
            except zmq.error.Again as e:
                # the test might be slow, so we retry to get a connection three times
                # before we give up
                sub_socket.close()
                sub_socket = create_event_listen_socket(self._zctx)
                if i == 3:
                    raise e

        assert len(mparts) == 2
        topic = mparts[0]  # pylint: disable=unsubscriptable-object
        msg_b = mparts[1]  # pylint: disable=unsubscriptable-object
        assert topic == b'_lk.testsuite.my-event'

        msg = json.loads(msg_b)
        assert msg['tag'] == '_lk.testsuite.my-event'
        assert msg['format'] == '1.0'
        assert UUID(msg['uuid']).version == 1
        assert msg['data'] == {'hello': 'world'}

        sigs = msg['signatures']
        assert sigs
        assert len(sigs[self._sender_id]['ed25519:0']) > 80