1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package websocket.drawboard;
import java.awt.Color;
import java.awt.Graphics2D;
import java.awt.RenderingHints;
import java.awt.image.BufferedImage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;
import javax.imageio.ImageIO;
import websocket.drawboard.wsmessages.BinaryWebsocketMessage;
import websocket.drawboard.wsmessages.StringWebsocketMessage;
/**
* A Room represents a drawboard where a number of
* users participate.<br><br>
*
* Note: Instance methods should only be invoked by calling
* {@link #invokeAndWait(Runnable)} to ensure access is correctly synchronized.
*/
public final class Room {
/**
* Specifies the type of a room message that is sent to a client.<br>
* Note: Currently we are sending simple string messages - for production
* apps, a JSON lib should be used for object-level messages.<br><br>
*
* The number (single char) will be prefixed to the string when sending
* the message.
*/
public enum MessageType {
/**
* '0': Error: contains error message.
*/
ERROR('0'),
/**
* '1': DrawMessage: contains serialized DrawMessage(s) prefixed
* with the current Player's {@link Player#lastReceivedMessageId}
* and ",".<br>
* Multiple draw messages are concatenated with "|" as separator.
*/
DRAW_MESSAGE('1'),
/**
* '2': ImageMessage: Contains number of current players in this room.
* After this message a Binary Websocket message will follow,
* containing the current Room image as PNG.<br>
* This is the first message that a Room sends to a new Player.
*/
IMAGE_MESSAGE('2'),
/**
* '3': PlayerChanged: contains "+" or "-" which indicate a player
* was added or removed to this Room.
*/
PLAYER_CHANGED('3');
private final char flag;
MessageType(char flag) {
this.flag = flag;
}
}
/**
* The lock used to synchronize access to this Room.
*/
private final ReentrantLock roomLock = new ReentrantLock();
/**
* Indicates if this room has already been shutdown.
*/
private volatile boolean closed = false;
/**
* If <code>true</code>, outgoing DrawMessages will be buffered until the
* drawmessageBroadcastTimer ticks. Otherwise they will be sent
* immediately.
*/
private static final boolean BUFFER_DRAW_MESSAGES = true;
/**
* A timer which sends buffered drawmessages to the client at once
* at a regular interval, to avoid sending a lot of very small
* messages which would cause TCP overhead and high CPU usage.
*/
private final Timer drawmessageBroadcastTimer = new Timer();
private static final int TIMER_DELAY = 30;
/**
* The current active broadcast timer task. If null, then no Broadcast task is scheduled.
* The Task will be scheduled if the first player enters the Room, and
* cancelled if the last player exits the Room, to avoid unnecessary timer executions.
*/
private TimerTask activeBroadcastTimerTask;
/**
* The current image of the room drawboard. DrawMessages that are
* received from Players will be drawn onto this image.
*/
private final BufferedImage roomImage =
new BufferedImage(900, 600, BufferedImage.TYPE_INT_RGB);
private final Graphics2D roomGraphics = roomImage.createGraphics();
/**
* The maximum number of players that can join this room.
*/
private static final int MAX_PLAYER_COUNT = 100;
/**
* List of all currently joined players.
*/
private final List<Player> players = new ArrayList<>();
public Room() {
roomGraphics.setRenderingHint(RenderingHints.KEY_ANTIALIASING,
RenderingHints.VALUE_ANTIALIAS_ON);
// Clear the image with white background.
roomGraphics.setBackground(Color.WHITE);
roomGraphics.clearRect(0, 0, roomImage.getWidth(),
roomImage.getHeight());
}
private TimerTask createBroadcastTimerTask() {
return new TimerTask() {
@Override
public void run() {
invokeAndWait(new Runnable() {
@Override
public void run() {
broadcastTimerTick();
}
});
}
};
}
/**
* Creates a Player from the given Client and adds it to this room.
*
* @param client the client
*
* @return The newly created player
*/
public Player createAndAddPlayer(Client client) {
if (players.size() >= MAX_PLAYER_COUNT) {
throw new IllegalStateException("Maximum player count ("
+ MAX_PLAYER_COUNT + ") has been reached.");
}
Player p = new Player(this, client);
// Broadcast to the other players that one player joined.
broadcastRoomMessage(MessageType.PLAYER_CHANGED, "+");
// Add the new player to the list.
players.add(p);
// If currently no Broadcast Timer Task is scheduled, then we need to create one.
if (activeBroadcastTimerTask == null) {
activeBroadcastTimerTask = createBroadcastTimerTask();
drawmessageBroadcastTimer.schedule(activeBroadcastTimerTask,
TIMER_DELAY, TIMER_DELAY);
}
// Send the current number of players and the current room image.
String content = String.valueOf(players.size());
p.sendRoomMessage(MessageType.IMAGE_MESSAGE, content);
// Store image as PNG
ByteArrayOutputStream bout = new ByteArrayOutputStream();
try {
ImageIO.write(roomImage, "PNG", bout);
} catch (IOException ignore) {
// Should never happen
}
// Send the image as binary message.
BinaryWebsocketMessage msg = new BinaryWebsocketMessage(
ByteBuffer.wrap(bout.toByteArray()));
p.getClient().sendMessage(msg);
return p;
}
/**
* @see Player#removeFromRoom()
* @param p player to remove
*/
private void internalRemovePlayer(Player p) {
boolean removed = players.remove(p);
assert removed;
// If the last player left the Room, we need to cancel the Broadcast Timer Task.
if (players.size() == 0) {
// Cancel the task.
// Note that it can happen that the TimerTask is just about to execute (from
// the Timer thread) but waits until all players are gone (or even until a new
// player is added to the list), and then executes. This is OK. To prevent it,
// a TimerTask subclass would need to have some boolean "cancel" instance variable and
// query it in the invocation of Room#invokeAndWait.
activeBroadcastTimerTask.cancel();
activeBroadcastTimerTask = null;
}
// Broadcast that one player is removed.
broadcastRoomMessage(MessageType.PLAYER_CHANGED, "-");
}
/**
* @see Player#handleDrawMessage(DrawMessage, long)
* @param p player
* @param msg message containing details of new shapes to draw
* @param msgId message ID
*/
private void internalHandleDrawMessage(Player p, DrawMessage msg,
long msgId) {
p.setLastReceivedMessageId(msgId);
// Draw the RoomMessage onto our Room Image.
msg.draw(roomGraphics);
// Broadcast the Draw Message.
broadcastDrawMessage(msg);
}
/**
* Broadcasts the given drawboard message to all connected players.<br>
* Note: For DrawMessages, please use
* {@link #broadcastDrawMessage(DrawMessage)}
* as this method will buffer them and prefix them with the correct
* last received Message ID.
* @param type message type
* @param content message content
*/
private void broadcastRoomMessage(MessageType type, String content) {
for (Player p : players) {
p.sendRoomMessage(type, content);
}
}
/**
* Broadcast the given DrawMessage. This will buffer the message
* and the {@link #drawmessageBroadcastTimer} will broadcast them
* at a regular interval, prefixing them with the player's current
* {@link Player#lastReceivedMessageId}.
* @param msg message to broadcast
*/
private void broadcastDrawMessage(DrawMessage msg) {
if (!BUFFER_DRAW_MESSAGES) {
String msgStr = msg.toString();
for (Player p : players) {
String s = String.valueOf(p.getLastReceivedMessageId())
+ "," + msgStr;
p.sendRoomMessage(MessageType.DRAW_MESSAGE, s);
}
} else {
for (Player p : players) {
p.getBufferedDrawMessages().add(msg);
}
}
}
/**
* Tick handler for the broadcastTimer.
*/
private void broadcastTimerTick() {
// For each Player, send all per Player buffered
// DrawMessages, prefixing each DrawMessage with the player's
// lastReceivedMessageId.
// Multiple messages are concatenated with "|".
for (Player p : players) {
StringBuilder sb = new StringBuilder();
List<DrawMessage> drawMessages = p.getBufferedDrawMessages();
if (drawMessages.size() > 0) {
for (int i = 0; i < drawMessages.size(); i++) {
DrawMessage msg = drawMessages.get(i);
String s = String.valueOf(p.getLastReceivedMessageId())
+ "," + msg.toString();
if (i > 0) {
sb.append('|');
}
sb.append(s);
}
drawMessages.clear();
p.sendRoomMessage(MessageType.DRAW_MESSAGE, sb.toString());
}
}
}
/**
* A list of cached {@link Runnable}s to prevent recursive invocation of Runnables
* by one thread. This variable is only used by one thread at a time and then
* set to <code>null</code>.
*/
private List<Runnable> cachedRunnables = null;
/**
* Submits the given Runnable to the Room Executor and waits until it
* has been executed. Currently, this simply means that the Runnable
* will be run directly inside of a synchronized() block.<br>
* Note that if a runnable recursively calls invokeAndWait() with another
* runnable on this Room, it will not be executed recursively, but instead
* cached until the original runnable is finished, to keep the behavior of
* using an Executor.
*
* @param task The task to be executed
*/
public void invokeAndWait(Runnable task) {
// Check if the current thread already holds a lock on this room.
// If yes, then we must not directly execute the Runnable but instead
// cache it until the original invokeAndWait() has finished.
if (roomLock.isHeldByCurrentThread()) {
if (cachedRunnables == null) {
cachedRunnables = new ArrayList<>();
}
cachedRunnables.add(task);
} else {
roomLock.lock();
try {
// Explicitly overwrite value to ensure data consistency in
// current thread
cachedRunnables = null;
if (!closed) {
task.run();
}
// Run the cached runnables.
if (cachedRunnables != null) {
for (Runnable cachedRunnable : cachedRunnables) {
if (!closed) {
cachedRunnable.run();
}
}
cachedRunnables = null;
}
} finally {
roomLock.unlock();
}
}
}
/**
* Shuts down the roomExecutor and the drawmessageBroadcastTimer.
*/
public void shutdown() {
invokeAndWait(new Runnable() {
@Override
public void run() {
closed = true;
drawmessageBroadcastTimer.cancel();
roomGraphics.dispose();
}
});
}
/**
* A Player participates in a Room. It is the interface between the
* {@link Room} and the {@link Client}.<br><br>
*
* Note: This means a player object is actually a join between Room and
* Client.
*/
public static final class Player {
/**
* The room to which this player belongs.
*/
private Room room;
/**
* The room buffers the last draw message ID that was received from
* this player.
*/
private long lastReceivedMessageId = 0;
private final Client client;
/**
* Buffered DrawMessages that will be sent by a Timer.
*/
private final List<DrawMessage> bufferedDrawMessages =
new ArrayList<>();
private List<DrawMessage> getBufferedDrawMessages() {
return bufferedDrawMessages;
}
private Player(Room room, Client client) {
this.room = room;
this.client = client;
}
public Room getRoom() {
return room;
}
public Client getClient() {
return client;
}
/**
* Removes this player from its room, e.g. when
* the client disconnects.
*/
public void removeFromRoom() {
if (room != null) {
room.internalRemovePlayer(this);
room = null;
}
}
private long getLastReceivedMessageId() {
return lastReceivedMessageId;
}
private void setLastReceivedMessageId(long value) {
lastReceivedMessageId = value;
}
/**
* Handles the given DrawMessage by drawing it onto this Room's
* image and by broadcasting it to the connected players.
*
* @param msg The draw message received
* @param msgId The ID for the draw message received
*/
public void handleDrawMessage(DrawMessage msg, long msgId) {
room.internalHandleDrawMessage(this, msg, msgId);
}
/**
* Sends the given room message.
* @param type message type
* @param content message content
*/
private void sendRoomMessage(MessageType type, String content) {
Objects.requireNonNull(content);
Objects.requireNonNull(type);
String completeMsg = String.valueOf(type.flag) + content;
client.sendMessage(new StringWebsocketMessage(completeMsg));
}
}
}
|