1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# Test of a ESPnow echo server and client transferring encrypted data.
# This test works with ESP32 or ESP8266 as server or client.
# First instance (echo server):
# Set the shared PMK
# Set the PEERS global to our mac addresses
# Run the echo server
# First exchange an unencrypted message from the client (so we
# can get its MAC address) and echo the message back (unenecrypted).
# Then set the peer LMK so all further communications are encrypted.
# Second instance (echo client):
# Set the shared PMK
# Send an unencrypted message to the server and wait for echo response.
# Set the LMK for the peer communications so all further comms are encrypted.
# Send random messages and compare with response from server.
try:
import network
import random
import time
import espnow
except ImportError:
print("SKIP")
raise SystemExit
# Set read timeout to 5 seconds
timeout_ms = 5000
default_pmk = b"MicroPyth0nRules"
default_lmk = b"0123456789abcdef"
sync = True
def echo_server(e):
peers = []
while True:
# Wait for messages from the client
peer, msg = e.recv(timeout_ms)
if peer is None:
return
if peer not in peers:
# If this is first message, add the peer unencrypted
e.add_peer(peer)
# Echo the message back to the sender
if not e.send(peer, msg, sync):
print("ERROR: send() failed to", peer)
return
if peer not in peers:
# If this is first message, add the peer encrypted
peers.append(peer)
e.del_peer(peer)
e.add_peer(peer, default_lmk)
if msg == b"!done":
return
# Send a message from the client and compare with response from server.
def echo_test(e, peer, msg, sync):
print("TEST: send/recv(msglen=", len(msg), ",sync=", sync, "): ", end="", sep="")
try:
if not e.send(peer, msg, sync):
print("ERROR: Send failed.")
return
except OSError as exc:
# Don't print exc as it is differs for esp32 and esp8266
print("ERROR: OSError:")
return
p2, msg2 = e.recv(timeout_ms)
if p2 is None:
print("ERROR: No response from server.")
raise SystemExit
print("OK" if msg2 == msg else "ERROR: Received != Sent")
# Send some random messages to server and check the responses
def echo_client(e, peer, msglens):
for sync in [True, False]:
for msglen in msglens:
msg = bytearray(msglen)
if msglen > 0:
msg[0] = b"_"[0] # Random message must not start with '!'
for i in range(1, msglen):
msg[i] = random.getrandbits(8)
echo_test(e, peer, msg, sync)
# Initialise the wifi and espnow hardware and software
def init(sta_active=True, ap_active=False):
wlans = [network.WLAN(i) for i in [network.WLAN.IF_STA, network.WLAN.IF_AP]]
e = espnow.ESPNow()
e.active(True)
e.set_pmk(default_pmk)
wlans[0].active(sta_active)
wlans[1].active(ap_active)
wlans[0].disconnect() # Force esp8266 STA interface to disconnect from AP
return e
# Server
def instance0():
e = init(True, False)
macs = [network.WLAN(i).config("mac") for i in (0, 1)]
print("Server Start")
multitest.globals(PEERS=macs)
multitest.next()
echo_server(e)
print("Server Done")
e.active(False)
# Client
def instance1():
e = init(True, False)
multitest.next()
peer = PEERS[0]
e.add_peer(peer)
echo_test(e, peer, b"start", True)
# Wait long enough for the server to set the lmk
time.sleep(0.1)
e.del_peer(peer)
e.add_peer(peer, default_lmk)
echo_client(e, peer, [250])
echo_test(e, peer, b"!done", True)
e.active(False)
|