1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
import logging
import select
from bluetooth import BluetoothSocket, BluetoothError, RFCOMM
import socket
if __name__ == "__main__":
import gi
gi.require_version('Gtk', '3.0')
from twisted.internet import gtk3reactor
gtk3reactor.install()
from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet.defer import inlineCallbacks, returnValue
if __name__ == "__main__" and __package__ is None:
logging.getLogger().error("You seem to be trying to execute " +
"this script directly which is discouraged. " +
"Try python -m instead.")
import os
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.sys.path.insert(0, parent_dir)
os.sys.path.insert(0, os.path.join(parent_dir, 'monkeysign'))
import keysign
#mod = __import__('keysign')
#sys.modules["keysign"] = mod
__package__ = str('keysign')
from .gpgmeh import fingerprint_from_keydata
from .i18n import _
from .util import mac_verify
log = logging.getLogger(__name__)
class BluetoothReceive:
def __init__(self, port=3, size=1024):
self.port = port
self.size = size
self.client_socket = None
self.stopped = False
@inlineCallbacks
def find_key(self, bt_mac, mac):
self.client_socket = BluetoothSocket(RFCOMM)
message = b""
try:
self.client_socket.setblocking(False)
try:
log.info("Trying to connect to %s port %s", bt_mac, self.port)
self.client_socket.connect((bt_mac, self.port))
except BluetoothError as be:
if be.args[0] == "(115, 'Operation now in progress')":
pass
else:
raise be
success = False
while not self.stopped and not success:
r, w, e = yield threads.deferToThread(select.select, [self.client_socket], [], [], 0.5)
if r:
log.info("Connection established")
self.client_socket.setblocking(True)
success = True
# try to receive until the sender closes the connection
try:
while True:
part_message = self.client_socket.recv(self.size)
log.debug("Read %d bytes: %r", len(part_message), part_message)
message += part_message
except BluetoothError as be:
if be.args[0] == "(104, 'Connection reset by peer')":
log.info("Bluetooth connection closed, let's check if we downloaded the key")
else:
raise be
mac_key = fingerprint_from_keydata(message)
verified = None
if mac:
verified = mac_verify(mac_key.encode('ascii'), message, mac)
if verified:
success = True
else:
log.info("MAC validation failed: %r", verified)
success = False
message = b""
except BluetoothError as be:
if be.args[0] == "(16, 'Device or resource busy')":
log.info("Probably has been provided a partial bt mac")
elif be.args[0] == "(111, 'Connection refused')":
log.info("The sender refused our connection attempt")
elif be.args[0] == "(112, 'Host is down')":
log.info("The sender's Bluetooth is not available")
elif be.args[0] == "(113, 'No route to host')":
log.info("An error occurred with Bluetooth, if present probably the device is not powered")
else:
log.info("An unknown bt error occurred: %s" % be.args[0])
key_data = None
success = False
returnValue((key_data, success, be))
except Exception as e:
log.error("An error occurred connecting or receiving: %s" % e)
key_data = None
success = False
returnValue((key_data, success, e))
if self.client_socket:
self.client_socket.close()
returnValue((message.decode("utf-8"), success, None))
def stop(self):
self.stopped = True
if self.client_socket:
try:
self.client_socket.shutdown(socket.SHUT_RDWR)
self.client_socket.close()
except BluetoothError as be:
if be.args[0] == "(9, 'Bad file descriptor')":
log.info("The old Bluetooth connection was already closed")
else:
log.exception("An unknown bt error occurred")
def main(args):
log.debug('Running main with args: %s', args)
if not len(args) == 3:
raise ValueError("You must provide three arguments: bluetooth code, hmac and port")
def _received(result):
key_data, success, error_message = result
if success:
print(key_data)
else:
print(error_message)
reactor.callFromThread(reactor.stop)
print(_("Trying to download the key, please wait"))
bt_mac = args[0]
hmac = args[1]
port = int(args[2])
receive = BluetoothReceive(port)
d = receive.find_key(bt_mac, hmac)
d.addCallback(_received)
reactor.run()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
import sys
main(sys.argv[1:])
|