File: server.py

package info (click to toggle)
pyro5 5.16-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,124 kB
  • sloc: python: 14,328; makefile: 161; sh: 66; javascript: 62
file content (144 lines) | stat: -rw-r--r-- 5,219 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import select
import tempfile
import uuid
import io
import os
import threading
import zlib
from Pyro5.api import expose, current_context, Daemon, config
import Pyro5.socketutil


datafiles = {}      # temporary files
datablobs = {}      # in-memory


@expose
class FileServer(object):
    def get_with_pyro(self, size):
        print("sending %d bytes" % size)
        data = b"x" * size
        return data

    def iterator(self, size):
        chunksize = size//100
        print("sending %d bytes via iterator, chunks of %d bytes" % (size, chunksize))
        data = b"x" * size
        i = 0
        while i < size:
            yield data[i:i+chunksize]
            i += chunksize

    def annotation_stream(self, with_checksum=False):
        # create a large temporary file
        f = tempfile.TemporaryFile()
        for _ in range(5000):
            f.write(b"1234567890!" * 1000)
        filesize = f.tell()
        f.seek(os.SEEK_SET, 0)
        # return the file data via annotation stream (remote iterator)
        annotation_size = 500000
        print("transmitting file via annotations stream (%d bytes in chunks of %d)..." % (filesize, annotation_size))
        with f:
            while True:
                chunk = f.read(annotation_size)
                if not chunk:
                    break
                # store the file data chunk in the FDAT response annotation,
                # and return the current file position and checksum (if asked).
                current_context.response_annotations = {"FDAT": chunk}
                yield f.tell(), zlib.crc32(chunk) if with_checksum else 0

    def prepare_file_blob(self, size):
        print("preparing file-based blob of size %d" % size)
        file_id = str(uuid.uuid4())
        f = tempfile.TemporaryFile()
        chunk = b"x" * 100000
        for _ in range(size//100000):
            f.write(chunk)
        f.write(b"x"*(size % 100000))
        f.flush()
        f.seek(0, io.SEEK_SET)
        # os.fsync(f)
        datafiles[file_id] = f
        blobsock_info = self._pyroDaemon.blobsocket.getsockname()  # return the port info for the blob socket as well
        return file_id, blobsock_info

    def prepare_memory_blob(self, size):
        print("preparing in-memory blob of size %d" % size)
        file_id = str(uuid.uuid4())
        datablobs[file_id] = b"x" * size
        blobsock_info = self._pyroDaemon.blobsocket.getsockname()  # return the port info for the blob socket as well
        return file_id, blobsock_info


class FileServerDaemon(Daemon):
    def __init__(self, host=None, port=0):
        super(FileServerDaemon, self).__init__(host, port)
        host = self.transportServer.sock.getsockname()[0]
        self.blobsocket = Pyro5.socketutil.create_socket(bind=(host, 0), timeout=config.COMMTIMEOUT, nodelay=False)
        print("Blob socket available on:", self.blobsocket.getsockname())

    def close(self):
        self.blobsocket.close()
        super(FileServerDaemon, self).close()

    def requestLoop(self, loopCondition=lambda: True):
        while loopCondition:
            rs = [self.blobsocket]
            rs.extend(self.sockets)
            rs, _, _ = select.select(rs, [], [], 3)
            daemon_events = []
            for sock in rs:
                if sock in self.sockets:
                    daemon_events.append(sock)
                elif sock is self.blobsocket:
                    self.handle_blob_connect(sock)
            if daemon_events:
                self.events(daemon_events)

    def handle_blob_connect(self, sock):
        csock, caddr = sock.accept()
        thread = threading.Thread(target=self.blob_client, args=(csock,))
        thread.daemon = True
        thread.start()

    def blob_client(self, csock):
        file_id = Pyro5.socketutil.receive_data(csock, 36).decode()
        print("{0} requesting file id {1}".format(csock.getpeername(), file_id))
        is_file, data = self.find_blob_data(file_id)
        if is_file:
            if hasattr(os, "sendfile"):
                print("...from file using sendfile()")
                out_fn = csock.fileno()
                in_fn = data.fileno()
                sent = 1
                offset = 0
                while sent:
                    sent = os.sendfile(out_fn, in_fn, offset, 512000)
                    offset += sent
            else:
                print("...from file using plain old read(); your os doesn't have sendfile()")
                while True:
                    chunk = data.read(512000)
                    if not chunk:
                        break
                    csock.sendall(chunk)
        else:
            print("...from memory")
            csock.sendall(data)
        csock.close()

    def find_blob_data(self, file_id):
        if file_id in datablobs:
            return False, datablobs.pop(file_id)
        elif file_id in datafiles:
            return True, datafiles.pop(file_id)
        else:
            raise KeyError("no data for given id")


with FileServerDaemon(host=Pyro5.socketutil.get_ip_address("")) as daemon:
    uri = daemon.register(FileServer, "example.filetransfer")
    print("Filetransfer server URI:", uri)
    daemon.requestLoop()