1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
#!/usr/bin/env python
import asyncio
import itertools
import json
from websockets.asyncio.server import serve
from connect4 import PLAYER1, PLAYER2, Connect4
async def handler(websocket):
# Initialize a Connect Four game.
game = Connect4()
# Players take alternate turns, using the same browser.
turns = itertools.cycle([PLAYER1, PLAYER2])
player = next(turns)
async for message in websocket:
# Parse a "play" event from the UI.
event = json.loads(message)
assert event["type"] == "play"
column = event["column"]
try:
# Play the move.
row = game.play(player, column)
except ValueError as exc:
# Send an "error" event if the move was illegal.
event = {
"type": "error",
"message": str(exc),
}
await websocket.send(json.dumps(event))
continue
# Send a "play" event to update the UI.
event = {
"type": "play",
"player": player,
"column": column,
"row": row,
}
await websocket.send(json.dumps(event))
# If move is winning, send a "win" event.
if game.winner is not None:
event = {
"type": "win",
"player": game.winner,
}
await websocket.send(json.dumps(event))
# Alternate turns.
player = next(turns)
async def main():
async with serve(handler, "", 8001) as server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
|