File: core.py

package info (click to toggle)
pyro5 5.15-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,112 kB
  • sloc: python: 14,291; makefile: 163; sh: 66; javascript: 62
file content (295 lines) | stat: -rw-r--r-- 12,749 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""
Multi purpose stuff used by both clients and servers (URI etc)

Pyro - Python Remote Objects.  Copyright by Irmen de Jong (irmen@razorvine.net).
"""

import re
import logging
import contextlib
import ipaddress
import socket
import random
import serpent
from typing import Union, Optional
from . import config, errors, socketutil, serializers


__all__ = ["URI", "DAEMON_NAME", "NAMESERVER_NAME", "resolve", "locate_ns", "type_meta"]

log = logging.getLogger("Pyro5.core")

# standard object name for the Daemon object
DAEMON_NAME = "Pyro.Daemon"

# standard name for the Name server itself
NAMESERVER_NAME = "Pyro.NameServer"


class URI(object):
    """
    Pyro object URI (universal resource identifier).
    The uri format is like this: ``PYRO:objectid@location`` where location is one of:

    - ``hostname:port`` (tcp/ip socket on given port)
    - ``./u:sockname`` (Unix domain socket on localhost)

    There is also a 'Magic format' for simple name resolution using Name server:
      ``PYRONAME:objectname[@location]``  (optional name server location, can also omit location port)
    And one that looks up things in the name server by metadata:
      ``PYROMETA:meta1,meta2,...[@location]``  (optional name server location, can also omit location port)

    You can write the protocol in lowercase if you like (``pyro:...``) but it will
    automatically be converted to uppercase internally.
    """
    uriRegEx = re.compile(r"(?P<protocol>[Pp][Yy][Rr][Oo][a-zA-Z]*):(?P<object>\S+?)(@(?P<location>.+))?$")

    def __init__(self, uri):
        if isinstance(uri, URI):
            state = uri.__getstate__()
            self.__setstate__(state)
            return
        if not isinstance(uri, str):
            raise TypeError("uri parameter object is of wrong type")
        self.sockname = self.host = self.port = None
        match = self.uriRegEx.match(uri)
        if not match:
            raise errors.PyroError("invalid uri")
        self.protocol = match.group("protocol").upper()
        self.object = match.group("object")
        location = match.group("location")
        if self.protocol == "PYRONAME":
            self._parseLocation(location, config.NS_PORT)
        elif self.protocol == "PYRO":
            if not location:
                raise errors.PyroError("invalid uri")
            self._parseLocation(location, None)
        elif self.protocol == "PYROMETA":
            self.object = set(m.strip() for m in self.object.split(","))
            self._parseLocation(location, config.NS_PORT)
        else:
            raise errors.PyroError("invalid uri (protocol)")

    def _parseLocation(self, location, defaultPort):
        if not location:
            return
        if location.startswith("./u:"):
            self.sockname = location[4:]
            if (not self.sockname) or ':' in self.sockname:
                raise errors.PyroError("invalid uri (location)")
        else:
            if location.startswith("["):  # ipv6
                if location.startswith("[["):  # possible mistake: double-bracketing
                    raise errors.PyroError("invalid ipv6 address: enclosed in too many brackets")
                ipv6locationmatch = re.match(r"\[([0-9a-fA-F:%]+)](:(\d+))?", location)
                if not ipv6locationmatch:
                    raise errors.PyroError("invalid ipv6 address: the part between brackets must be a numeric ipv6 address")
                self.host, _, self.port = ipv6locationmatch.groups()
            else:
                self.host, _, self.port = location.partition(":")
            if not self.port:
                self.port = defaultPort
            try:
                self.port = int(self.port)
            except (ValueError, TypeError):
                raise errors.PyroError("invalid port in uri, port=" + str(self.port))

    @staticmethod
    def isUnixsockLocation(location):
        """determine if a location string is for a Unix domain socket"""
        return location.startswith("./u:")

    @property
    def location(self):
        """property containing the location string, for instance ``"servername.you.com:5555"``"""
        if self.host:
            if ":" in self.host:  # ipv6
                return "[%s]:%d" % (self.host, self.port)
            else:
                return "%s:%d" % (self.host, self.port)
        elif self.sockname:
            return "./u:" + self.sockname
        else:
            return None

    def __str__(self):
        if self.protocol == "PYROMETA":
            result = "PYROMETA:" + ",".join(self.object)
        else:
            result = self.protocol + ":" + self.object
        if self.location:
            return result + "@" + self.location
        return result

    def __repr__(self):
        return "<%s.%s at 0x%x; %s>" % (self.__class__.__module__, self.__class__.__name__, id(self), str(self))

    def __eq__(self, other):
        if not isinstance(other, URI):
            return False
        return self.__getstate__() == other.__getstate__()

    def __ne__(self, other):
        return not self.__eq__(other)

    def __hash__(self):
        return hash(self.__getstate__())

    def __getstate__(self):
        return self.protocol, self.object, self.sockname, self.host, self.port

    def __setstate__(self, state):
        self.protocol, self.object, self.sockname, self.host, self.port = state


class _ExceptionWrapper(object):
    """Class that wraps a remote exception. If this is returned, Pyro will
    re-throw the exception on the receiving side. Usually this is taken care of
    by a special response message flag, but in the case of batched calls this
    flag is useless and another mechanism was needed."""

    def __init__(self, exception):
        self.exception = exception

    def raiseIt(self):
        raise self.exception

    def __serialized_dict__(self):
        """serialized form as a dictionary"""
        return {
            "__class__": "Pyro5.core._ExceptionWrapper",
            "exception": serializers.SerializerBase.class_to_dict(self.exception)
        }


# register the special serializers for the pyro objects with Serpent
serpent.register_class(URI, serializers.pyro_class_serpent_serializer)
serpent.register_class(_ExceptionWrapper, serializers.pyro_class_serpent_serializer)
serializers.SerializerBase.register_class_to_dict(URI, serializers.serialize_pyro_object_to_dict, serpent_too=False)
serializers.SerializerBase.register_class_to_dict(_ExceptionWrapper, _ExceptionWrapper.__serialized_dict__, serpent_too=False)


def resolve(uri: Union[str, URI], delay_time: float = 0.0) -> URI:
    """
    Resolve a 'magic' uri (PYRONAME, PYROMETA) into the direct PYRO uri.
    It finds a name server, and use that to resolve a PYRONAME uri into the direct PYRO uri pointing to the named object.
    If uri is already a PYRO uri, it is returned unmodified.
    You can consider this a shortcut function so that you don't have to locate and use a name server proxy yourself.
    Note: if you need to resolve more than a few names, consider using the name server directly instead of repeatedly
    calling this function, to avoid the name server lookup overhead from each call.
    You can set delay_time to the maximum number of seconds you are prepared to wait until a name registration
    becomes available in the nameserver.
    """
    if isinstance(uri, str):
        uri = URI(uri)
    elif not isinstance(uri, URI):
        raise TypeError("can only resolve Pyro URIs")
    if uri.protocol == "PYRO":
        return uri
    log.debug("resolving %s", uri)
    from . import nameserver   # doing it here to avoid circular import issues
    if uri.protocol == "PYRONAME":
        with locate_ns(uri.host, uri.port) as ns:
            return nameserver.lookup(ns, uri.object, delay_time)
    elif uri.protocol == "PYROMETA":
        with locate_ns(uri.host, uri.port) as ns:
            candidates = nameserver.yplookup(ns, uri.object, None, False, delay_time)
            if candidates:
                candidate = random.choice(list(candidates.values()))
                log.debug("resolved to candidate %s", candidate)
                return URI(candidate)
            raise errors.NamingError("no registrations available with desired metadata properties %s" % uri.object)
    else:
        raise errors.PyroError("invalid uri protocol")


def locate_ns(host: Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address] = "",
              port: Optional[int] = None, broadcast: bool = True) -> "client.Proxy":
    """Get a proxy for a name server somewhere in the network."""
    from . import client
    if not host:
        # first try localhost if we have a good chance of finding it there
        if config.NS_HOST in ("localhost", "::1") or config.NS_HOST.startswith("127."):
            if ":" in config.NS_HOST:  # ipv6
                hosts = ["[%s]" % config.NS_HOST]
            else:
                # Some systems have 127.0.1.1 in the hosts file assigned to the hostname,
                # so try this too (only if it's actually used as a valid ip address)
                try:
                    socket.gethostbyaddr("127.0.1.1")
                    hosts = [config.NS_HOST] if config.NS_HOST == "127.0.1.1" else [config.NS_HOST, "127.0.1.1"]
                except socket.error:
                    hosts = [config.NS_HOST]
            for host in hosts:
                uristring = "PYRO:%s@%s:%d" % (NAMESERVER_NAME, host, port or config.NS_PORT)
                log.debug("locating the NS: %s", uristring)
                proxy = client.Proxy(uristring)
                with contextlib.suppress(errors.PyroError):
                    proxy._pyroBind()
                    log.debug("located NS")
                    return proxy
        if config.PREFER_IP_VERSION == 6:
            broadcast = False   # ipv6 doesn't have broadcast. We should probably use multicast....
        if broadcast:
            # broadcast lookup
            if not port:
                port = config.NS_BCPORT
            log.debug("broadcast locate")
            sock = socketutil.create_bc_socket(reuseaddr=config.SOCK_REUSE, timeout=0.7)
            for _ in range(3):
                try:
                    for bcaddr in config.BROADCAST_ADDRS:
                        try:
                            sock.sendto(b"GET_NSURI", 0, (bcaddr, port))
                        except socket.error as x:
                            err = getattr(x, "errno", x.args[0])
                            # handle some errno's that some platforms like to throw:
                            if err not in socketutil.ERRNO_EADDRNOTAVAIL and err not in socketutil.ERRNO_EADDRINUSE:
                                raise
                    data, _ = sock.recvfrom(100)
                    sock.close()
                    text = data.decode("iso-8859-1")
                    log.debug("located NS: %s", text)
                    proxy = client.Proxy(text)
                    return proxy
                except socket.timeout:
                    continue
            with contextlib.suppress(OSError, socket.error):
                sock.shutdown(socket.SHUT_RDWR)
            sock.close()
            log.debug("broadcast locate failed, try direct connection on NS_HOST")
        else:
            log.debug("skipping broadcast lookup")
        # broadcast failed or skipped, try PYRO directly on specific host
        host = config.NS_HOST
        port = config.NS_PORT
    elif not isinstance(host, str):
        host = str(host)    # take care of the occasion where host is an ipaddress.IpAddress
    # pyro direct lookup
    port = config.NS_PORT if not port else port
    if URI.isUnixsockLocation(host):
        uristring = "PYRO:%s@%s" % (NAMESERVER_NAME, host)
    else:
        # if not a unix socket, check for ipv6
        if host and ":" in str(host):
            host = "[%s]" % host
        uristring = "PYRO:%s@%s:%d" % (NAMESERVER_NAME, host, port)
    uri = URI(uristring)
    log.debug("locating the NS: %s", uri)
    proxy = client.Proxy(uri)
    try:
        proxy._pyroBind()
        log.debug("located NS")
        return proxy
    except errors.PyroError as x:
        raise errors.NamingError("Failed to locate the nameserver") from x


def type_meta(class_or_object, prefix="class:"):
    """extracts type metadata from the given class or object, can be used as Name server metadata."""
    if hasattr(class_or_object, "__mro__"):
        return {prefix + c.__module__ + "." + c.__name__
                for c in class_or_object.__mro__ if c.__module__ not in ("builtins", "__builtin__")}
    if hasattr(class_or_object, "__class__"):
        return type_meta(class_or_object.__class__)
    return frozenset()