File: send_test_pki.py

package info (click to toggle)
gtk-meshtastic-client 1.5-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,056 kB
  • sloc: python: 4,794; xml: 64; makefile: 3
file content (97 lines) | stat: -rw-r--r-- 3,090 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import meshtastic.serial_interface
from pubsub import pub
from meshtastic.protobuf import mesh_pb2, storeforward_pb2, paxcount_pb2, portnums_pb2
from meshtastic import BROADCAST_NUM
import time
import sys
import signal
import sqlite3
import os
import base64
from enum import Enum
from gi.repository import GLib
from datetime import datetime, timezone

class MsgDirection(Enum):
    In = 0
    Out = 1

def onDisconnection(interface):
    #there's no need to close a connection once disconnected
    print("Awknowledged disconnection")

def onConnection(interface):
    print("Awknowledged connection")
    index = 0 #primary channel
    #publicKey = "MA==" #"MA==" is the default, indicating there is no public key
    # send_address = 2733239644
    # publicKey = "gQ5GJwFP50lqwz09yRt5GemztnjKkkyPSg2qFvUNvHI="

    noKey = True
    # It looks like you don't need the PKIKey to end the message.

    print(idToHex(send_address))

    if publicKey == "MA==":
        text = "send test Data no encryption"
        print("No encryption")
        # packet = interface.sendText(text=text, wantAck=True, destinationId=send_address)
        interface.sendData(
            data=text.encode("utf-8"),
            destinationId=send_address,
            portNum = portnums_pb2.PortNum.TEXT_MESSAGE_APP,
            wantAck=True,
            channelIndex = 0)

    else:
        if not noKey:
            print("Encrypted with key")
            text = "send test Data encrypted"
            interface.sendData(
                data=text.encode("utf-8"),
                destinationId=send_address,
                portNum = portnums_pb2.PortNum.TEXT_MESSAGE_APP,
                pkiEncrypted=True,
                publicKey=base64.b64decode(publicKey),
                wantAck=True,
                channelIndex = 0)
        else:
            print("Encrypted dont try with key")
            text = "send test Data encrypted no key"
            interface.sendData(
                data=text.encode("utf-8"),
                destinationId=send_address,
                portNum = portnums_pb2.PortNum.TEXT_MESSAGE_APP,
                pkiEncrypted=True,
                wantAck=True,
                channelIndex = 0)


def onReceive(packet, interface):
    if 'decoded' in packet:
        if packet['decoded'].get('portnum') == 'TEXT_MESSAGE_APP':
            print("Received Message")
            print(packet["decoded"]["text"])


def idToHex(nodeId):
    return '!' + hex(nodeId)[2:]


# You need to close any active connections if you exit
# If you don't, the interface will continue to be active
# and any stored messages will stay stored on the device
# and will be sent on the stale connection
def signal_handler(sig, frame):
    print("closing")
    interface.close()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
pub.subscribe(onReceive, 'meshtastic.receive')
pub.subscribe(onDisconnection, 'meshtastic.connection.lost')
pub.subscribe(onConnection, 'meshtastic.connection.established')
interface = meshtastic.serial_interface.SerialInterface()

while True:
    time.sleep(1)