File: test_bluetooth.py

package info (click to toggle)
gnome-keysign 1.3.0-5.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,460 kB
  • sloc: python: 5,143; xml: 126; makefile: 33; sh: 16
file content (169 lines) | stat: -rw-r--r-- 6,503 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import os
import logging
import select
import socket
from subprocess import check_call
import tempfile
import unittest
import gi
gi.require_version('Gtk', '3.0')

from twisted.internet import threads
from pytest_twisted import inlineCallbacks

try:
    from keysign.bluetoothoffer import BluetoothOffer
    from keysign.bluetoothreceive import BluetoothReceive, BluetoothError
    HAVE_BT = True
except ImportError:
    HAVE_BT = False
from keysign.gpgmeh import get_public_key_data, openpgpkey_from_data
from keysign.util import mac_generate


log = logging.getLogger(__name__)
thisdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.join(thisdir, "..")


@unittest.skipUnless(HAVE_BT, "requires bluetooth module")
def get_fixture_dir(fixture=""):
    dname = os.path.join(thisdir, "fixtures", fixture)
    return dname


@unittest.skipUnless(HAVE_BT, "requires bluetooth module")
def get_fixture_file(fixture):
    fname = os.path.join(get_fixture_dir(), fixture)
    return fname


@unittest.skipUnless(HAVE_BT, "requires bluetooth module")
def import_key_from_file(fixture, homedir):
    fname = get_fixture_file(fixture)
    original = open(fname, 'rb').read()
    gpgcmd = ["gpg", "--homedir={}".format(homedir)]
    # Now we import a single key
    check_call(gpgcmd + ["--import", fname])

    return openpgpkey_from_data(original)


@inlineCallbacks
@unittest.skipUnless(HAVE_BT, "requires bluetooth module")
def test_bt():
    """This test requires two working Bluetooth devices"""
    # This should be a new, empty directory
    homedir = tempfile.mkdtemp()
    os.environ["GNUPGHOME"] = homedir
    key = import_key_from_file("seckey-no-pw-1.asc", homedir)
    file_key_data = get_public_key_data(key.fingerprint)
    log.info("Running with key %r", key)
    hmac = mac_generate(key.fingerprint.encode('ascii'), file_key_data)
    # Start offering the key
    offer = BluetoothOffer(key)
    data = yield offer.allocate_code()
    if data is None:
        log.warning("No code allocated (%r), probably no Bluetooth adapter!", data)
    # getting the code from "BT=code;...."
    code = data.split("=", 1)[1]
    code = code.split(";", 1)[0]
    port = int(data.rsplit("=", 1)[1])
    offer.start()
    receive = BluetoothReceive(port)
    msg_tuple = yield receive.find_key(code, hmac)
    log.debug("Receiving yielded: %r", msg_tuple)
    downloaded_key_data, success, _ = msg_tuple
    assert success, "No success :( %r" % msg_tuple
    log.info("Checking with key: %r", downloaded_key_data)
    assert downloaded_key_data.encode("utf-8") == file_key_data


@inlineCallbacks
@unittest.skipUnless(HAVE_BT, "requires bluetooth module")
def test_bt_wrong_hmac():
    """This test requires two working Bluetooth devices"""
    # This should be a new, empty directory
    homedir = tempfile.mkdtemp()
    os.environ["GNUPGHOME"] = homedir
    key = import_key_from_file("seckey-no-pw-1.asc", homedir)
    log.info("Running with key %r", key)
    hmac = "wrong_hmac_eg_tampered_key"
    # Start offering the key
    offer = BluetoothOffer(key)
    data = yield offer.allocate_code()
    # getting the code from "BT=code;...."
    code = data.split("=", 1)[1]
    code = code.split(";", 1)[0]
    port = int(data.rsplit("=", 1)[1])
    offer.start()
    receive = BluetoothReceive(port)
    msg_tuple = yield receive.find_key(code, hmac)
    downloaded_key_data, success, err = msg_tuple
    assert not success
    assert not isinstance(err, BluetoothError), "We expect a ValidationError or something else, anyway, than a Bluetooth error, because the transfer ought to have worked. How else do we distinguish a broken Bluetooth (won't work at all) from a tampered connection (may work next time)"


@inlineCallbacks
@unittest.skipUnless(HAVE_BT, "requires bluetooth module")
def test_bt_wrong_mac():
    """This test requires one working Bluetooth device"""
    receive = BluetoothReceive()
    msg_tuple = yield receive.find_key("01:23:45:67:89:AB", "hmac")
    downloaded_key_data, success, error = msg_tuple
    assert downloaded_key_data is None
    assert not success
    assert error.args[0] == "(112, 'Host is down')"


@inlineCallbacks
@unittest.skipUnless(HAVE_BT, "requires bluetooth module")
def test_bt_corrupted_key():
    """This test requires two working Bluetooth devices"""

    @inlineCallbacks
    def start(bo):
        success = False
        try:
            while not success:
                # server_socket.accept() is not stoppable. So with select we can call accept()
                # only when we are sure that there is already a waiting connection
                ready_to_read, ready_to_write, in_error = yield threads.deferToThread(
                    select.select, [bo.server_socket], [], [], 0.5)
                if ready_to_read:
                    # We are sure that a connection is available, so we can call
                    # accept() without deferring it to a thread
                    client_socket, address = bo.server_socket.accept()
                    key_data = get_public_key_data(bo.key.fingerprint)
                    kd_decoded = key_data.decode('utf-8')
                    # We send only a part of the key. In this way we can simulate the case
                    # where the connection has been lost
                    half = len(kd_decoded)/2
                    kd_corrupted = kd_decoded[:half]
                    yield threads.deferToThread(client_socket.sendall, kd_corrupted)
                    client_socket.shutdown(socket.SHUT_RDWR)
                    client_socket.close()
                    success = True
        except Exception as e:
            log.error("An error occurred: %s" % e)

    # This should be a new, empty directory
    homedir = tempfile.mkdtemp()
    os.environ["GNUPGHOME"] = homedir
    key = import_key_from_file("seckey-no-pw-1.asc", homedir)
    log.info("Running with key %r", key)
    file_key_data = get_public_key_data(key.fingerprint)
    hmac = mac_generate(key.fingerprint.encode('ascii'), file_key_data)
    # Start offering the key
    offer = BluetoothOffer(key)
    data = yield offer.allocate_code()
    # getting the code from "BT=code;...."
    code = data.split("=", 1)[1]
    code = code.split(";", 1)[0]
    port = int(data.rsplit("=", 1)[1])
    start(offer)
    receive = BluetoothReceive(port)
    msg_tuple = yield receive.find_key(code, hmac)
    downloaded_key_data, result, error = msg_tuple
    assert not result
    assert isinstance(error, ValueError)