File: decorator.py

package info (click to toggle)
jupyter-server 2.15.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,068 kB
  • sloc: python: 21,064; makefile: 186; sh: 25; javascript: 14
file content (142 lines) | stat: -rw-r--r-- 4,381 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Decorator for layering authorization into JupyterHandlers."""

# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
from functools import wraps
from typing import Any, Callable, Optional, TypeVar, Union, cast

from jupyter_core.utils import ensure_async
from tornado.log import app_log
from tornado.web import HTTPError

from .utils import HTTP_METHOD_TO_AUTH_ACTION

FuncT = TypeVar("FuncT", bound=Callable[..., Any])


def authorized(
    action: Optional[Union[str, FuncT]] = None,
    resource: Optional[str] = None,
    message: Optional[str] = None,
) -> FuncT:
    """A decorator for tornado.web.RequestHandler methods
    that verifies whether the current user is authorized
    to make the following request.

    Helpful for adding an 'authorization' layer to
    a REST API.

    .. versionadded:: 2.0

    Parameters
    ----------
    action : str
        the type of permission or action to check.

    resource: str or None
        the name of the resource the action is being authorized
        to access.

    message : str or none
        a message for the unauthorized action.
    """

    def wrapper(method):
        @wraps(method)
        async def inner(self, *args, **kwargs):
            # default values for action, resource
            nonlocal action
            nonlocal resource
            nonlocal message
            if action is None:
                http_method = self.request.method.upper()
                action = HTTP_METHOD_TO_AUTH_ACTION[http_method]
            if resource is None:
                resource = self.auth_resource
            if message is None:
                message = f"User is not authorized to {action} on resource: {resource}."

            user = self.current_user
            if not user:
                app_log.warning("Attempting to authorize request without authentication!")
                raise HTTPError(status_code=403, log_message=message)
            # If the user is allowed to do this action,
            # call the method.
            authorized = await ensure_async(
                self.authorizer.is_authorized(self, user, action, resource)
            )
            if authorized:
                out = method(self, *args, **kwargs)
                # If the method is a coroutine, await it
                if asyncio.iscoroutine(out):
                    return await out
                return out
            # else raise an exception.
            else:
                raise HTTPError(status_code=403, log_message=message)

        return inner

    if callable(action):
        method = action
        action = None
        # no-arguments `@authorized` decorator called
        return cast(FuncT, wrapper(method))

    return cast(FuncT, wrapper)


def allow_unauthenticated(method: FuncT) -> FuncT:
    """A decorator for tornado.web.RequestHandler methods
    that allows any user to make the following request.

    Selectively disables the 'authentication' layer of REST API which
    is active when `ServerApp.allow_unauthenticated_access = False`.

    To be used exclusively on endpoints which may be considered public,
    for example the login page handler.

    .. versionadded:: 2.13

    Parameters
    ----------
    method : bound callable
        the endpoint method to remove authentication from.
    """

    @wraps(method)
    def wrapper(self, *args, **kwargs):
        return method(self, *args, **kwargs)

    setattr(wrapper, "__allow_unauthenticated", True)

    return cast(FuncT, wrapper)


def ws_authenticated(method: FuncT) -> FuncT:
    """A decorator for websockets derived from `WebSocketHandler`
    that authenticates user before allowing to proceed.

    Differently from tornado.web.authenticated, does not redirect
    to the login page, which would be meaningless for websockets.

    .. versionadded:: 2.13

    Parameters
    ----------
    method : bound callable
        the endpoint method to add authentication for.
    """

    @wraps(method)
    def wrapper(self, *args, **kwargs):
        user = self.current_user
        if user is None:
            self.log.warning("Couldn't authenticate WebSocket connection")
            raise HTTPError(403)
        return method(self, *args, **kwargs)

    setattr(wrapper, "__allow_unauthenticated", False)

    return cast(FuncT, wrapper)