File: bluetoothreceive.py

package info (click to toggle)
gnome-keysign 1.3.0-5.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,460 kB
  • sloc: python: 5,143; xml: 126; makefile: 33; sh: 16
file content (147 lines) | stat: -rw-r--r-- 5,586 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging
import select
from bluetooth import BluetoothSocket, BluetoothError, RFCOMM
import socket

if __name__ == "__main__":
    import gi
    gi.require_version('Gtk', '3.0')
    from twisted.internet import gtk3reactor
    gtk3reactor.install()
    from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet.defer import inlineCallbacks, returnValue

if __name__ == "__main__" and __package__ is None:
    logging.getLogger().error("You seem to be trying to execute " +
                              "this script directly which is discouraged. " +
                              "Try python -m instead.")
    import os
    parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    os.sys.path.insert(0, parent_dir)
    os.sys.path.insert(0, os.path.join(parent_dir, 'monkeysign'))
    import keysign
    #mod = __import__('keysign')
    #sys.modules["keysign"] = mod
    __package__ = str('keysign')

from .gpgmeh import fingerprint_from_keydata
from .i18n import _
from .util import mac_verify

log = logging.getLogger(__name__)


class BluetoothReceive:
    def __init__(self, port=3, size=1024):
        self.port = port
        self.size = size
        self.client_socket = None
        self.stopped = False

    @inlineCallbacks
    def find_key(self, bt_mac, mac):
        self.client_socket = BluetoothSocket(RFCOMM)
        message = b""
        try:
            self.client_socket.setblocking(False)
            try:
                log.info("Trying to connect to %s port %s", bt_mac, self.port)
                self.client_socket.connect((bt_mac, self.port))
            except BluetoothError as be:
                if be.args[0] == "(115, 'Operation now in progress')":
                    pass
                else:
                    raise be
            success = False
            while not self.stopped and not success:
                r, w, e = yield threads.deferToThread(select.select, [self.client_socket], [], [], 0.5)
                if r:
                    log.info("Connection established")
                    self.client_socket.setblocking(True)
                    success = True
                    # try to receive until the sender closes the connection
                    try:
                        while True:
                            part_message = self.client_socket.recv(self.size)
                            log.debug("Read %d bytes: %r", len(part_message), part_message)
                            message += part_message
                    except BluetoothError as be:
                        if be.args[0] == "(104, 'Connection reset by peer')":
                            log.info("Bluetooth connection closed, let's check if we downloaded the key")
                        else:
                            raise be
            mac_key = fingerprint_from_keydata(message)
            verified = None
            if mac:
                verified = mac_verify(mac_key.encode('ascii'), message, mac)
            if verified:
                success = True
            else:
                log.info("MAC validation failed: %r", verified)
                success = False
                message = b""
        except BluetoothError as be:
            if be.args[0] == "(16, 'Device or resource busy')":
                log.info("Probably has been provided a partial bt mac")
            elif be.args[0] == "(111, 'Connection refused')":
                log.info("The sender refused our connection attempt")
            elif be.args[0] == "(112, 'Host is down')":
                log.info("The sender's Bluetooth is not available")
            elif be.args[0] == "(113, 'No route to host')":
                log.info("An error occurred with Bluetooth, if present probably the device is not powered")
            else:
                log.info("An unknown bt error occurred: %s" % be.args[0])
            key_data = None
            success = False
            returnValue((key_data, success, be))
        except Exception as e:
            log.error("An error occurred connecting or receiving: %s" % e)
            key_data = None
            success = False
            returnValue((key_data, success, e))

        if self.client_socket:
            self.client_socket.close()
        returnValue((message.decode("utf-8"), success, None))

    def stop(self):
        self.stopped = True
        if self.client_socket:
            try:
                self.client_socket.shutdown(socket.SHUT_RDWR)
                self.client_socket.close()
            except BluetoothError as be:
                if be.args[0] == "(9, 'Bad file descriptor')":
                    log.info("The old Bluetooth connection was already closed")
                else:
                    log.exception("An unknown bt error occurred")


def main(args):
    log.debug('Running main with args: %s', args)
    if not len(args) == 3:
        raise ValueError("You must provide three arguments: bluetooth code, hmac and port")

    def _received(result):
        key_data, success, error_message = result
        if success:
            print(key_data)
        else:
            print(error_message)

        reactor.callFromThread(reactor.stop)

    print(_("Trying to download the key, please wait"))
    bt_mac = args[0]
    hmac = args[1]
    port = int(args[2])
    receive = BluetoothReceive(port)
    d = receive.find_key(bt_mac, hmac)
    d.addCallback(_received)
    reactor.run()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    import sys
    main(sys.argv[1:])