1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
# coding: utf-8
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import threading
import logging
import time
import json
import os
from websocket import WebSocketApp
from typing import List, Optional
from azure.messaging.webpubsubservice import WebPubSubServiceClient
logging.basicConfig(level=logging.DEBUG)
LOG = logging.getLogger()
class WebsocketClientsManager:
'''
This class contains multiple websocket clients which are connected to Azure Web PubSub Services.
'''
def __init__(self) -> None:
self.clients = []
self.connection_ids = []
self.recv_messages = []
def add_client(self, service: WebPubSubServiceClient, user_id: str, groups: Optional[List[str]] = None):
def on_message(websocket_app: WebSocketApp, message: str):
message = json.loads(message)
if message["type"] == "message":
self.recv_messages.append(message["data"])
if message["type"] == "system" and message["event"] == "connected":
self.connection_ids.append(message["connectionId"])
LOG.debug(message)
def on_open(websocket_app: WebSocketApp):
LOG.debug("connected")
token = service.get_client_access_token(groups=groups, user_id=user_id)
client = WebSocketApp(token["url"], subprotocols=['json.webpubsub.azure.v1'], on_open=on_open, on_message=on_message)
self.clients.append(client)
def start_all(self):
for client in self.clients:
wst = threading.Thread(target=client.run_forever, daemon=True)
wst.start()
LOG.debug("Waiting for all clients connected...")
while len(self.connection_ids) != self.client_number:
pass
@property
def client_number(self):
return len(self.clients)
def test_overall_integration(webpubsub_connection_string: str):
# build a service client from the connection string.
service = WebPubSubServiceClient.from_connection_string(webpubsub_connection_string, hub='hub', logging_enable=False)
# build multiple websocket clients connected to the Web PubSub service
clients = WebsocketClientsManager()
for i in range(5):
clients.add_client(service, user_id="User%d" % clients.client_number, groups=["InitGroup"])
clients.start_all()
# test naive send_to_all
service.send_to_all(message='Message_For_All', content_type='text/plain') # N messages
# test if generating token with the initial group is working
service.send_to_group(group="InitGroup", message='Message_For_InitGroup', content_type='text/plain') # N messages
# test if parameter "filter" in send is working
service.send_to_all("Message_Not_For_User0", filter="userId ne 'User0'", content_type='text/plain') # N - 1 messages
# test if remove_connection_from_all_groups works
group_names = ["Group%d" % i for i in range(3)]
for group in group_names:
service.add_connection_to_group(group, clients.connection_ids[0])
service.send_to_group(group, "Message_For_RemoveFromAll", content_type='text/plain')
service.remove_connection_from_all_groups(clients.connection_ids[0])
for group in group_names:
service.send_to_group(group, "Message_For_RemoveFromAll", content_type='text/plain')
# other tests
service.send_to_user("User0", message='Message_For_User0', content_type='text/plain') # 1 messages
time.sleep(5)
LOG.info("Received Message: ", clients.recv_messages)
assert service.group_exists("InitGroup") == True
assert clients.recv_messages.count("Message_For_All") == clients.client_number
assert clients.recv_messages.count("Message_For_InitGroup") == clients.client_number
assert clients.recv_messages.count("Message_Not_For_User0") == clients.client_number - 1
assert clients.recv_messages.count("Message_For_User0") == 1
assert clients.recv_messages.count("Message_For_RemoveFromAll") == 3
LOG.info("Complete All Integration Test Successfully")
if __name__ == "__main__":
try:
connection_string = os.environ['WEBPUBSUB_CONNECTION_STRING']
except KeyError:
LOG.error("Missing environment variable 'WEBPUBSUB_CONNECTION_STRING' - please set if before running the example")
exit()
test_overall_integration(connection_string)
|