1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
|
import base64
import bisect
import functools
import hashlib
import json
import logging
import os
import psycopg2
import random
import socket
import struct
import selectors
import threading
import time
from collections import defaultdict, deque
from contextlib import closing, suppress
from enum import IntEnum
from psycopg2.pool import PoolError
from urllib.parse import urlparse
from weakref import WeakSet
from werkzeug.local import LocalStack
from werkzeug.datastructures import ImmutableMultiDict, MultiDict
from werkzeug.exceptions import BadRequest, HTTPException, ServiceUnavailable
import odoo
from odoo import api, modules
from .models.bus import dispatch
from odoo.http import root, Request, Response, SessionExpiredException, get_default_session
from odoo.modules.registry import Registry
from odoo.service import model as service_model
from odoo.service.server import CommonServer
from odoo.service.security import check_session
from odoo.tools import config, lazy_property
_logger = logging.getLogger(__name__)
MAX_TRY_ON_POOL_ERROR = 10
DELAY_ON_POOL_ERROR = 0.03
def acquire_cursor(db):
""" Try to acquire a cursor up to `MAX_TRY_ON_POOL_ERROR` """
for tryno in range(1, MAX_TRY_ON_POOL_ERROR + 1):
with suppress(PoolError):
return Registry(db).cursor()
time.sleep(random.uniform(DELAY_ON_POOL_ERROR, DELAY_ON_POOL_ERROR * tryno))
raise PoolError('Failed to acquire cursor after %s retries' % MAX_TRY_ON_POOL_ERROR)
# ------------------------------------------------------
# EXCEPTIONS
# ------------------------------------------------------
class UpgradeRequired(HTTPException):
code = 426
description = "Wrong websocket version was given during the handshake"
def get_headers(self, environ=None):
headers = super().get_headers(environ)
headers.append((
'Sec-WebSocket-Version',
'; '.join(WebsocketConnectionHandler.SUPPORTED_VERSIONS)
))
return headers
class WebsocketException(Exception):
""" Base class for all websockets exceptions """
class ConnectionClosed(WebsocketException):
"""
Raised when the other end closes the socket without performing
the closing handshake.
"""
class InvalidCloseCodeException(WebsocketException):
def __init__(self, code):
super().__init__(f"Invalid close code: {code}")
class InvalidDatabaseException(WebsocketException):
"""
When raised: the database probably does not exists anymore, the
database is corrupted or the database version doesn't match the
server version.
"""
class InvalidStateException(WebsocketException):
"""
Raised when an operation is forbidden in the current state.
"""
class InvalidWebsocketRequest(WebsocketException):
"""
Raised when a websocket request is invalid (format, wrong args).
"""
class PayloadTooLargeException(WebsocketException):
"""
Raised when a websocket message is too large.
"""
class ProtocolError(WebsocketException):
"""
Raised when a frame format doesn't match expectations.
"""
class RateLimitExceededException(Exception):
"""
Raised when a client exceeds the number of request in a given
time.
"""
# ------------------------------------------------------
# WEBSOCKET LIFECYCLE
# ------------------------------------------------------
class LifecycleEvent(IntEnum):
OPEN = 0
CLOSE = 1
# ------------------------------------------------------
# WEBSOCKET
# ------------------------------------------------------
class Opcode(IntEnum):
CONTINUE = 0x00
TEXT = 0x01
BINARY = 0x02
CLOSE = 0x08
PING = 0x09
PONG = 0x0A
class CloseCode(IntEnum):
CLEAN = 1000
GOING_AWAY = 1001
PROTOCOL_ERROR = 1002
INCORRECT_DATA = 1003
ABNORMAL_CLOSURE = 1006
INCONSISTENT_DATA = 1007
MESSAGE_VIOLATING_POLICY = 1008
MESSAGE_TOO_BIG = 1009
EXTENSION_NEGOTIATION_FAILED = 1010
SERVER_ERROR = 1011
RESTART = 1012
TRY_LATER = 1013
BAD_GATEWAY = 1014
SESSION_EXPIRED = 4001
KEEP_ALIVE_TIMEOUT = 4002
class ConnectionState(IntEnum):
OPEN = 0
CLOSING = 1
CLOSED = 2
DATA_OP = {Opcode.TEXT, Opcode.BINARY}
CTRL_OP = {Opcode.CLOSE, Opcode.PING, Opcode.PONG}
HEARTBEAT_OP = {Opcode.PING, Opcode.PONG}
VALID_CLOSE_CODES = {
code for code in CloseCode if code is not CloseCode.ABNORMAL_CLOSURE
}
CLEAN_CLOSE_CODES = {CloseCode.CLEAN, CloseCode.GOING_AWAY, CloseCode.RESTART}
RESERVED_CLOSE_CODES = range(3000, 5000)
_XOR_TABLE = [bytes(a ^ b for a in range(256)) for b in range(256)]
class Frame:
def __init__(
self,
opcode,
payload=b'',
fin=True,
rsv1=False,
rsv2=False,
rsv3=False
):
self.opcode = opcode
self.payload = payload
self.fin = fin
self.rsv1 = rsv1
self.rsv2 = rsv2
self.rsv3 = rsv3
class CloseFrame(Frame):
def __init__(self, code, reason):
if code not in VALID_CLOSE_CODES and code not in RESERVED_CLOSE_CODES:
raise InvalidCloseCodeException(code)
payload = struct.pack('!H', code)
if reason:
payload += reason.encode('utf-8')
self.code = code
self.reason = reason
super().__init__(Opcode.CLOSE, payload)
_websocket_instances = WeakSet()
class Websocket:
__event_callbacks = defaultdict(set)
# Maximum size for a message in bytes, whether it is sent as one
# frame or many fragmented ones.
MESSAGE_MAX_SIZE = 2 ** 20
# Proxies usually close a connection after 1 minute of inactivity.
# Therefore, a PING frame have to be sent if no frame is either sent
# or received within CONNECTION_TIMEOUT - 15 seconds.
CONNECTION_TIMEOUT = 60
INACTIVITY_TIMEOUT = CONNECTION_TIMEOUT - 15
# How much time (in second) the history of last dispatched notifications is
# kept in memory for each websocket.
# To avoid duplicate notifications, we fetch them based on their ids.
# However during parallel transactions, ids are assigned immediately (when
# they are requested), but the notifications are dispatched at the time of
# the commit. This means lower id notifications might be dispatched after
# higher id notifications.
# Simply incrementing the last id is sufficient to guarantee no duplicates,
# but it is not sufficient to guarantee all notifications are dispatched,
# and in particular not sufficient for those with a lower id coming after a
# higher id was dispatched.
# To solve the issue of missed notifications, the lowest id, stored in
# ``_last_notif_sent_id``, is held back by a few seconds to give time for
# concurrent transactions to finish. To avoid dispatching duplicate
# notifications, the history of already dispatched notifications during this
# period is kept in memory in ``_notif_history`` and the corresponding
# notifications are discarded from subsequent dispatching even if their id
# is higher than ``_last_notif_sent_id``.
# In practice, what is important functionally is the time between the create
# of the notification and the commit of the transaction in business code.
# If this time exceeds this threshold, the notification will never be
# dispatched if the target user receive any other notification in the
# meantime.
# Transactions known to be long should therefore create their notifications
# at the end, as close as possible to their commit.
MAX_NOTIFICATION_HISTORY_SEC = 10
# How many requests can be made in excess of the given rate.
RL_BURST = int(config['websocket_rate_limit_burst'])
# How many seconds between each request.
RL_DELAY = float(config['websocket_rate_limit_delay'])
def __init__(self, sock, session, cookies):
# Session linked to the current websocket connection.
self._session = session
# Cookies linked to the current websocket connection.
self._cookies = cookies
self._db = session.db
self.__socket = sock
self._close_sent = False
self._close_received = False
self._timeout_manager = TimeoutManager()
# Used for rate limiting.
self._incoming_frame_timestamps = deque(maxlen=self.RL_BURST)
# Used to notify the websocket that bus notifications are
# available.
self.__notif_sock_w, self.__notif_sock_r = socket.socketpair()
self._channels = set()
# For ``_last_notif_sent_id and ``_notif_history``, see
# ``MAX_NOTIFICATION_HISTORY_SEC`` for more details.
# id of the last sent notification that is no longer in _notif_history
self._last_notif_sent_id = 0
# history of last sent notifications in the format (notif_id, send_time)
# always sorted by notif_id ASC
self._notif_history = []
# Websocket start up
self.__selector = (
selectors.PollSelector()
if odoo.evented and hasattr(selectors, 'PollSelector')
else selectors.DefaultSelector()
)
self.__selector.register(self.__socket, selectors.EVENT_READ)
self.__selector.register(self.__notif_sock_r, selectors.EVENT_READ)
self.state = ConnectionState.OPEN
_websocket_instances.add(self)
self._trigger_lifecycle_event(LifecycleEvent.OPEN)
# ------------------------------------------------------
# PUBLIC METHODS
# ------------------------------------------------------
def get_messages(self):
while self.state is not ConnectionState.CLOSED:
try:
readables = {
selector_key[0].fileobj for selector_key in
self.__selector.select(self.INACTIVITY_TIMEOUT)
}
if self._timeout_manager.has_timed_out() and self.state is ConnectionState.OPEN:
self.disconnect(
CloseCode.ABNORMAL_CLOSURE
if self._timeout_manager.timeout_reason is TimeoutReason.NO_RESPONSE
else CloseCode.KEEP_ALIVE_TIMEOUT
)
continue
if not readables:
self._send_ping_frame()
continue
if self.__notif_sock_r in readables:
self._dispatch_bus_notifications()
if self.__socket in readables:
message = self._process_next_message()
if message is not None:
yield message
except Exception as exc:
self._handle_transport_error(exc)
def disconnect(self, code, reason=None):
"""
Initiate the closing handshake that is, send a close frame
to the other end which will then send us back an
acknowledgment. Upon the reception of this acknowledgment,
the `_terminate` method will be called to perform an
orderly shutdown. Note that we don't need to wait for the
acknowledgment if the connection was failed beforewards.
"""
if code is not CloseCode.ABNORMAL_CLOSURE:
self._send_close_frame(code, reason)
else:
self._terminate()
@classmethod
def onopen(cls, func):
cls.__event_callbacks[LifecycleEvent.OPEN].add(func)
return func
@classmethod
def onclose(cls, func):
cls.__event_callbacks[LifecycleEvent.CLOSE].add(func)
return func
def subscribe(self, channels, last):
""" Subscribe to bus channels. """
self._channels = channels
# Only assign the last id according to the client once: the server is
# more reliable later on, see ``MAX_NOTIFICATION_HISTORY_SEC``.
if self._last_notif_sent_id == 0:
self._last_notif_sent_id = last
# Dispatch past notifications if there are any.
self.trigger_notification_dispatching()
def trigger_notification_dispatching(self):
"""
Warn the socket that notifications are available. Ignore if a
dispatch is already planned or if the socket is already in the
closing state.
"""
if self.state is not ConnectionState.OPEN:
return
readables = {
selector_key[0].fileobj for selector_key in
self.__selector.select(0)
}
if self.__notif_sock_r not in readables:
# Send a random bit to mark the socket as readable.
self.__notif_sock_w.send(b'x')
# ------------------------------------------------------
# PRIVATE METHODS
# ------------------------------------------------------
def _get_next_frame(self):
# 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
# +-+-+-+-+-------+-+-------------+-------------------------------+
# |F|R|R|R| opcode|M| Payload len | Extended payload length |
# |I|S|S|S| (4) |A| (7) | (16/64) |
# |N|V|V|V| |S| | (if payload len==126/127) |
# | |1|2|3| |K| | |
# +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
# | Extended payload length continued, if payload len == 127 |
# + - - - - - - - - - - - - - - - +-------------------------------+
# | |Masking-key, if MASK set to 1 |
# +-------------------------------+-------------------------------+
# | Masking-key (continued) | Payload Data |
# +-------------------------------- - - - - - - - - - - - - - - - +
# : Payload Data continued ... :
# + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
# | Payload Data continued ... |
# +---------------------------------------------------------------+
def recv_bytes(n):
""" Pull n bytes from the socket """
data = bytearray()
while len(data) < n:
received_data = self.__socket.recv(n - len(data))
if not received_data:
raise ConnectionClosed()
data.extend(received_data)
return data
def is_bit_set(byte, n):
"""
Check whether nth bit of byte is set or not (from left
to right).
"""
return byte & (1 << (7 - n))
def apply_mask(payload, mask):
# see: https://www.willmcgugan.com/blog/tech/post/speeding-up-websockets-60x/
a, b, c, d = (_XOR_TABLE[n] for n in mask)
payload[::4] = payload[::4].translate(a)
payload[1::4] = payload[1::4].translate(b)
payload[2::4] = payload[2::4].translate(c)
payload[3::4] = payload[3::4].translate(d)
return payload
self._limit_rate()
first_byte, second_byte = recv_bytes(2)
fin, rsv1, rsv2, rsv3 = (is_bit_set(first_byte, n) for n in range(4))
try:
opcode = Opcode(first_byte & 0b00001111)
except ValueError as exc:
raise ProtocolError(exc)
payload_length = second_byte & 0b01111111
if rsv1 or rsv2 or rsv3:
raise ProtocolError("Reserved bits must be unset")
if not is_bit_set(second_byte, 0):
raise ProtocolError("Frame must be masked")
if opcode in CTRL_OP:
if not fin:
raise ProtocolError("Control frames cannot be fragmented")
if payload_length > 125:
raise ProtocolError(
"Control frames payload must be smaller than 126"
)
if payload_length == 126:
payload_length = struct.unpack('!H', recv_bytes(2))[0]
elif payload_length == 127:
payload_length = struct.unpack('!Q', recv_bytes(8))[0]
if payload_length > self.MESSAGE_MAX_SIZE:
raise PayloadTooLargeException()
mask = recv_bytes(4)
payload = apply_mask(recv_bytes(payload_length), mask)
frame = Frame(opcode, bytes(payload), fin, rsv1, rsv2, rsv3)
self._timeout_manager.acknowledge_frame_receipt(frame)
return frame
def _process_next_message(self):
"""
Process the next message coming throught the socket. If a
data message can be extracted, return its decoded payload.
As per the RFC, only control frames will be processed once
the connection reaches the closing state.
"""
frame = self._get_next_frame()
if frame.opcode in CTRL_OP:
self._handle_control_frame(frame)
return
if self.state is not ConnectionState.OPEN:
# After receiving a control frame indicating the connection
# should be closed, a peer discards any further data
# received.
return
if frame.opcode is Opcode.CONTINUE:
raise ProtocolError("Unexpected continuation frame")
message = frame.payload
if not frame.fin:
message = self._recover_fragmented_message(frame)
return (
message.decode('utf-8')
if message is not None and frame.opcode is Opcode.TEXT else message
)
def _recover_fragmented_message(self, initial_frame):
message_fragments = bytearray(initial_frame.payload)
while True:
frame = self._get_next_frame()
if frame.opcode in CTRL_OP:
# Control frames can be received in the middle of a
# fragmented message, process them as soon as possible.
self._handle_control_frame(frame)
if self.state is not ConnectionState.OPEN:
return
continue
if frame.opcode is not Opcode.CONTINUE:
raise ProtocolError("A continuation frame was expected")
message_fragments.extend(frame.payload)
if len(message_fragments) > self.MESSAGE_MAX_SIZE:
raise PayloadTooLargeException()
if frame.fin:
return bytes(message_fragments)
def _send(self, message):
if self.state is not ConnectionState.OPEN:
raise InvalidStateException(
"Trying to send a frame on a closed socket"
)
opcode = Opcode.BINARY
if not isinstance(message, (bytes, bytearray)):
opcode = Opcode.TEXT
self._send_frame(Frame(opcode, message))
def _send_frame(self, frame):
if frame.opcode in CTRL_OP and len(frame.payload) > 125:
raise ProtocolError(
"Control frames should have a payload length smaller than 126"
)
if isinstance(frame.payload, str):
frame.payload = frame.payload.encode('utf-8')
elif not isinstance(frame.payload, (bytes, bytearray)):
frame.payload = json.dumps(frame.payload).encode('utf-8')
output = bytearray()
first_byte = (
(0b10000000 if frame.fin else 0)
| (0b01000000 if frame.rsv1 else 0)
| (0b00100000 if frame.rsv2 else 0)
| (0b00010000 if frame.rsv3 else 0)
| frame.opcode
)
payload_length = len(frame.payload)
if payload_length < 126:
output.extend(
struct.pack('!BB', first_byte, payload_length)
)
elif payload_length < 65536:
output.extend(
struct.pack('!BBH', first_byte, 126, payload_length)
)
else:
output.extend(
struct.pack('!BBQ', first_byte, 127, payload_length)
)
output.extend(frame.payload)
self.__socket.sendall(output)
self._timeout_manager.acknowledge_frame_sent(frame)
if not isinstance(frame, CloseFrame):
return
self.state = ConnectionState.CLOSING
self._close_sent = True
if frame.code not in CLEAN_CLOSE_CODES or self._close_received:
return self._terminate()
# After sending a control frame indicating the connection
# should be closed, a peer does not send any further data.
self.__selector.unregister(self.__notif_sock_r)
def _send_close_frame(self, code, reason=None):
""" Send a close frame. """
self._send_frame(CloseFrame(code, reason))
def _send_ping_frame(self):
""" Send a ping frame """
self._send_frame(Frame(Opcode.PING))
def _send_pong_frame(self, payload):
""" Send a pong frame """
self._send_frame(Frame(Opcode.PONG, payload))
def _terminate(self):
""" Close the underlying TCP socket. """
with suppress(OSError, TimeoutError):
self.__socket.shutdown(socket.SHUT_WR)
# Call recv until obtaining a return value of 0 indicating
# the other end has performed an orderly shutdown. A timeout
# is set to ensure the connection will be closed even if
# the other end does not close the socket properly.
self.__socket.settimeout(1)
while self.__socket.recv(4096):
pass
self.__selector.unregister(self.__socket)
self.__selector.close()
self.__socket.close()
self.state = ConnectionState.CLOSED
dispatch.unsubscribe(self)
self._trigger_lifecycle_event(LifecycleEvent.CLOSE)
with acquire_cursor(self._db) as cr:
env = api.Environment(cr, self._session.uid, self._session.context)
env["ir.websocket"]._on_websocket_closed(self._cookies)
def _handle_control_frame(self, frame):
if frame.opcode is Opcode.PING:
self._send_pong_frame(frame.payload)
elif frame.opcode is Opcode.CLOSE:
self.state = ConnectionState.CLOSING
self._close_received = True
code, reason = CloseCode.CLEAN, None
if len(frame.payload) >= 2:
code = struct.unpack('!H', frame.payload[:2])[0]
reason = frame.payload[2:].decode('utf-8')
elif frame.payload:
raise ProtocolError("Malformed closing frame")
if not self._close_sent:
self._send_close_frame(code, reason)
else:
self._terminate()
def _handle_transport_error(self, exc):
"""
Find out which close code should be sent according to given
exception and call `self.disconnect` in order to close the
connection cleanly.
"""
code, reason = CloseCode.SERVER_ERROR, str(exc)
if isinstance(exc, (ConnectionClosed, OSError)):
code = CloseCode.ABNORMAL_CLOSURE
elif isinstance(exc, (ProtocolError, InvalidCloseCodeException)):
code = CloseCode.PROTOCOL_ERROR
elif isinstance(exc, UnicodeDecodeError):
code = CloseCode.INCONSISTENT_DATA
elif isinstance(exc, PayloadTooLargeException):
code = CloseCode.MESSAGE_TOO_BIG
elif isinstance(exc, (PoolError, RateLimitExceededException)):
code = CloseCode.TRY_LATER
elif isinstance(exc, SessionExpiredException):
code = CloseCode.SESSION_EXPIRED
if code is CloseCode.SERVER_ERROR:
reason = None
registry = Registry(self._session.db)
sequence = registry.registry_sequence
registry = registry.check_signaling()
if sequence != registry.registry_sequence:
_logger.warning("Bus operation aborted; registry has been reloaded")
else:
_logger.error(exc, exc_info=True)
self.disconnect(code, reason)
def _limit_rate(self):
"""
This method is a simple rate limiter designed not to allow
more than one request by `RL_DELAY` seconds. `RL_BURST` specify
how many requests can be made in excess of the given rate at the
begining. When requests are received too fast, raises the
`RateLimitExceededException`.
"""
now = time.time()
if len(self._incoming_frame_timestamps) >= self.RL_BURST:
elapsed_time = now - self._incoming_frame_timestamps[0]
if elapsed_time < self.RL_DELAY * self.RL_BURST:
raise RateLimitExceededException()
self._incoming_frame_timestamps.append(now)
def _trigger_lifecycle_event(self, event_type):
"""
Trigger a lifecycle event that is, call every function
registered for this event type. Every callback is given both the
environment and the related websocket.
"""
if not self.__event_callbacks[event_type]:
return
with closing(acquire_cursor(self._db)) as cr:
lang = api.Environment(cr, self._session.uid, {})['res.lang']._get_code(self._session.context.get('lang'))
env = api.Environment(cr, self._session.uid, dict(self._session.context, lang=lang))
for callback in self.__event_callbacks[event_type]:
try:
service_model.retrying(functools.partial(callback, env, self), env)
except Exception:
_logger.warning(
'Error during Websocket %s callback',
LifecycleEvent(event_type).name,
exc_info=True
)
def _dispatch_bus_notifications(self):
"""
Dispatch notifications related to the registered channels. If
the session is expired, close the connection with the
`SESSION_EXPIRED` close code. If no cursor can be acquired,
close the connection with the `TRY_LATER` close code.
"""
session = root.session_store.get(self._session.sid)
if not session:
raise SessionExpiredException()
with acquire_cursor(session.db) as cr:
env = api.Environment(cr, session.uid, dict(session.context, lang=None))
if session.uid is not None and not check_session(session, env):
raise SessionExpiredException()
# Mark the notification request as processed.
self.__notif_sock_r.recv(1)
notifications = env["bus.bus"]._poll(
self._channels, self._last_notif_sent_id, [n[0] for n in self._notif_history]
)
if not notifications:
return
for notif in notifications:
bisect.insort(self._notif_history, (notif["id"], time.time()), key=lambda x: x[0])
# Discard all the smallest notification ids that have expired and
# increment the last id accordingly. History can only be trimmed of ids
# that are below the new last id otherwise some notifications might be
# dispatched again.
# For example, if the theshold is 10s, and the state is:
# last id 2, history [(3, 8s), (6, 10s), (7, 7s)]
# If 6 is removed because it is above the threshold, the next query will
# be (id > 2 AND id NOT IN (3, 7)) which will fetch 6 again.
# 6 can only be removed after 3 reaches the threshold and is removed as
# well, and if 4 appears in the meantime, 3 can be removed but 6 will
# have to wait for 4 to reach the threshold as well.
last_index = -1
for i, notif in enumerate(self._notif_history):
if time.time() - notif[1] > self.MAX_NOTIFICATION_HISTORY_SEC:
last_index = i
else:
break
if last_index != -1:
self._last_notif_sent_id = self._notif_history[last_index][0]
self._notif_history = self._notif_history[last_index + 1 :]
self._send(notifications)
class TimeoutReason(IntEnum):
KEEP_ALIVE = 0
NO_RESPONSE = 1
class TimeoutManager:
"""
This class handles the Websocket timeouts. If no response to a
PING/CLOSE frame is received after `TIMEOUT` seconds or if the
connection is opened for more than `self._keep_alive_timeout` seconds,
the connection is considered to have timed out. To determine if the
connection has timed out, use the `has_timed_out` method.
"""
TIMEOUT = 15
# Timeout specifying how many seconds the connection should be kept
# alive.
KEEP_ALIVE_TIMEOUT = int(config['websocket_keep_alive_timeout'])
def __init__(self):
super().__init__()
self._awaited_opcode = None
# Time in which the connection was opened.
self._opened_at = time.time()
# Custom keep alive timeout for each TimeoutManager to avoid multiple
# connections timing out at the same time.
self._keep_alive_timeout = (
self.KEEP_ALIVE_TIMEOUT + random.uniform(0, self.KEEP_ALIVE_TIMEOUT / 2)
)
self.timeout_reason = None
# Start time recorded when we started awaiting an answer to a
# PING/CLOSE frame.
self._waiting_start_time = None
def acknowledge_frame_receipt(self, frame):
if self._awaited_opcode is frame.opcode:
self._awaited_opcode = None
self._waiting_start_time = None
def acknowledge_frame_sent(self, frame):
"""
Acknowledge a frame was sent. If this frame is a PING/CLOSE
frame, start waiting for an answer.
"""
if self.has_timed_out():
return
if frame.opcode is Opcode.PING:
self._awaited_opcode = Opcode.PONG
elif frame.opcode is Opcode.CLOSE:
self._awaited_opcode = Opcode.CLOSE
if self._awaited_opcode is not None:
self._waiting_start_time = time.time()
def has_timed_out(self):
"""
Determine whether the connection has timed out or not. The
connection times out when the answer to a CLOSE/PING frame
is not received within `TIMEOUT` seconds or if the connection
is opened for more than `self._keep_alive_timeout` seconds.
"""
now = time.time()
if now - self._opened_at >= self._keep_alive_timeout:
self.timeout_reason = TimeoutReason.KEEP_ALIVE
return True
if self._awaited_opcode and now - self._waiting_start_time >= self.TIMEOUT:
self.timeout_reason = TimeoutReason.NO_RESPONSE
return True
return False
# ------------------------------------------------------
# WEBSOCKET SERVING
# ------------------------------------------------------
_wsrequest_stack = LocalStack()
wsrequest = _wsrequest_stack()
class WebsocketRequest:
def __init__(self, db, httprequest, websocket):
self.db = db
self.httprequest = httprequest
self.session = None
self.ws = websocket
def __enter__(self):
_wsrequest_stack.push(self)
return self
def __exit__(self, *args):
_wsrequest_stack.pop()
def serve_websocket_message(self, message):
try:
jsonrequest = json.loads(message)
event_name = jsonrequest['event_name'] # mandatory
except KeyError as exc:
raise InvalidWebsocketRequest(
f'Key {exc.args[0]!r} is missing from request'
) from exc
except ValueError as exc:
raise InvalidWebsocketRequest(
f'Invalid JSON data, {exc.args[0]}'
) from exc
data = jsonrequest.get('data')
self.session = self._get_session()
try:
self.registry = Registry(self.db)
self.registry.check_signaling()
except (
AttributeError, psycopg2.OperationalError, psycopg2.ProgrammingError
) as exc:
raise InvalidDatabaseException() from exc
with closing(acquire_cursor(self.db)) as cr:
lang = api.Environment(cr, self.session.uid, {})['res.lang']._get_code(self.session.context.get('lang'))
self.env = api.Environment(cr, self.session.uid, dict(self.session.context, lang=lang))
threading.current_thread().uid = self.env.uid
service_model.retrying(
functools.partial(self._serve_ir_websocket, event_name, data),
self.env,
)
def _serve_ir_websocket(self, event_name, data):
"""
Delegate most of the processing to the ir.websocket model
which is extensible by applications. Directly call the
appropriate ir.websocket method since only two events are
tolerated: `subscribe` and `update_presence`.
"""
self.env['ir.websocket']._authenticate()
if event_name == 'subscribe':
self.env['ir.websocket']._subscribe(data)
if event_name == 'update_presence':
self.env['ir.websocket']._update_bus_presence(**data)
def _get_session(self):
session = root.session_store.get(self.ws._session.sid)
if not session:
raise SessionExpiredException()
return session
def update_env(self, user=None, context=None, su=None):
"""
Update the environment of the current websocket request.
"""
Request.update_env(self, user, context, su)
def update_context(self, **overrides):
"""
Override the environment context of the current request with the
values of ``overrides``. To replace the entire context, please
use :meth:`~update_env` instead.
"""
self.update_env(context=dict(self.env.context, **overrides))
@lazy_property
def cookies(self):
cookies = MultiDict(self.httprequest.cookies)
if self.registry:
self.registry['ir.http']._sanitize_cookies(cookies)
return ImmutableMultiDict(cookies)
class WebsocketConnectionHandler:
SUPPORTED_VERSIONS = {'13'}
# Given by the RFC in order to generate Sec-WebSocket-Accept from
# Sec-WebSocket-Key value.
_HANDSHAKE_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
_REQUIRED_HANDSHAKE_HEADERS = {
'connection', 'host', 'sec-websocket-key',
'sec-websocket-version', 'upgrade', 'origin',
}
# Latest version of the websocket worker. This version should be incremented
# every time `websocket_worker.js` is modified to force the browser to fetch
# the new worker bundle.
_VERSION = "18.0-2"
@classmethod
def websocket_allowed(cls, request):
return not modules.module.current_test
@classmethod
def open_connection(cls, request, version):
"""
Open a websocket connection if the handshake is successfull.
:return: Response indicating the server performed a connection
upgrade.
:raise: UpgradeRequired if there is no intersection between the
versions the client supports and those we support.
:raise: BadRequest if the handshake data is incorrect.
"""
if not cls.websocket_allowed(request):
raise ServiceUnavailable("Websocket is disabled in test mode")
public_session = cls._handle_public_configuration(request)
try:
response = cls._get_handshake_response(request.httprequest.headers)
socket = request.httprequest._HTTPRequest__environ['socket']
session, db, httprequest = (public_session or request.session), request.db, request.httprequest
response.call_on_close(lambda: cls._serve_forever(
Websocket(socket, session, httprequest.cookies),
db,
httprequest,
version
))
# Force save the session. Session must be persisted to handle
# WebSocket authentication.
request.session.is_dirty = True
return response
except KeyError as exc:
raise RuntimeError(
f"Couldn't bind the websocket. Is the connection opened on the evented port ({config['gevent_port']})?"
) from exc
except HTTPException as exc:
# The HTTP stack does not log exceptions derivated from the
# HTTPException class since they are valid responses.
_logger.error(exc)
raise
@classmethod
def _get_handshake_response(cls, headers):
"""
:return: Response indicating the server performed a connection
upgrade.
:raise: BadRequest
:raise: UpgradeRequired
"""
cls._assert_handshake_validity(headers)
# sha-1 is used as it is required by
# https://datatracker.ietf.org/doc/html/rfc6455#page-7
accept_header = hashlib.sha1(
(headers['sec-websocket-key'] + cls._HANDSHAKE_GUID).encode()).digest()
accept_header = base64.b64encode(accept_header)
return Response(status=101, headers={
'Upgrade': 'websocket',
'Connection': 'Upgrade',
'Sec-WebSocket-Accept': accept_header.decode(),
})
@classmethod
def _handle_public_configuration(cls, request):
if not os.getenv('ODOO_BUS_PUBLIC_SAMESITE_WS'):
return
headers = request.httprequest.headers
origin_url = urlparse(headers.get('origin'))
if origin_url.netloc != headers.get('host') or origin_url.scheme != request.httprequest.scheme:
session = root.session_store.new()
session.update(get_default_session(), db=request.session.db)
root.session_store.save(session)
return session
return None
@classmethod
def _assert_handshake_validity(cls, headers):
"""
:raise: UpgradeRequired if there is no intersection between
the version the client supports and those we support.
:raise: BadRequest in case of invalid handshake.
"""
missing_or_empty_headers = {
header for header in cls._REQUIRED_HANDSHAKE_HEADERS
if header not in headers
}
if missing_or_empty_headers:
raise BadRequest(
f"""Empty or missing header(s): {', '.join(missing_or_empty_headers)}"""
)
if headers['upgrade'].lower() != 'websocket':
raise BadRequest('Invalid upgrade header')
if 'upgrade' not in headers['connection'].lower():
raise BadRequest('Invalid connection header')
if headers['sec-websocket-version'] not in cls.SUPPORTED_VERSIONS:
raise UpgradeRequired()
key = headers['sec-websocket-key']
try:
decoded_key = base64.b64decode(key, validate=True)
except ValueError:
raise BadRequest("Sec-WebSocket-Key should be b64 encoded")
if len(decoded_key) != 16:
raise BadRequest(
"Sec-WebSocket-Key should be of length 16 once decoded"
)
@classmethod
def _serve_forever(cls, websocket, db, httprequest, version):
"""
Process incoming messages and dispatch them to the application.
"""
current_thread = threading.current_thread()
current_thread.type = 'websocket'
if httprequest.user_agent and version != cls._VERSION:
# Close the connection from an outdated worker. We can't use a
# custom close code because the connection is considered successful,
# preventing exponential reconnect backoff. This would cause old
# workers to reconnect frequently, putting pressure on the server.
# Clean closes don't trigger reconnections, assuming they are
# intentional. The reason indicates to the origin worker not to
# reconnect, preventing old workers from lingering after updates.
# Non browsers are ignored since IOT devices do not provide the
# worker version.
websocket.disconnect(CloseCode.CLEAN, "OUTDATED_VERSION")
for message in websocket.get_messages():
with WebsocketRequest(db, httprequest, websocket) as req:
try:
req.serve_websocket_message(message)
except SessionExpiredException:
websocket.disconnect(CloseCode.SESSION_EXPIRED)
except PoolError:
websocket.disconnect(CloseCode.TRY_LATER)
except Exception:
_logger.exception("Exception occurred during websocket request handling")
def _kick_all():
""" Disconnect all the websocket instances. """
for websocket in _websocket_instances:
if websocket.state is ConnectionState.OPEN:
websocket.disconnect(CloseCode.GOING_AWAY)
CommonServer.on_stop(_kick_all)
|