File: server.py

package info (click to toggle)
pyro4 4.82-2
  • links: PTS
  • area: main
  • in suites: bookworm
  • size: 2,528 kB
  • sloc: python: 17,736; makefile: 169; sh: 113; javascript: 62
file content (146 lines) | stat: -rw-r--r-- 5,284 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from __future__ import print_function
import select
import tempfile
import uuid
import io
import os
import threading
import zlib
import Pyro4
import Pyro4.core
import Pyro4.socketutil


datafiles = {}      # temporary files
datablobs = {}      # in-memory


@Pyro4.expose
class FileServer(object):
    def get_with_pyro(self, size):
        print("sending %d bytes" % size)
        data = b"x" * size
        return data

    def iterator(self, size):
        chunksize = size//100
        print("sending %d bytes via iterator, chunks of %d bytes" % (size, chunksize))
        data = b"x" * size
        i = 0
        while i < size:
            yield data[i:i+chunksize]
            i += chunksize

    def annotation_stream(self, with_checksum=False):
        # create a large temporary file
        f = tempfile.TemporaryFile()
        for _ in range(5000):
            f.write(b"1234567890!" * 1000)
        filesize = f.tell()
        f.seek(os.SEEK_SET, 0)
        # return the file data via annotation stream (remote iterator)
        print("transmitting file via annotations stream (%d bytes)..." % filesize)
        with f:
            annotation_size = 65000  # leave some room for Pyro's internal annotation chunks
            while True:
                chunk = f.read(annotation_size)
                if not chunk:
                    break
                # store the file data chunk in the FDAT response annotation,
                # and return the current file position and checksum (if asked).
                Pyro4.current_context.response_annotations = {"FDAT": chunk}
                yield f.tell(), zlib.crc32(chunk) if with_checksum else 0

    def prepare_file_blob(self, size):
        print("preparing file-based blob of size %d" % size)
        file_id = str(uuid.uuid4())
        f = tempfile.TemporaryFile()
        chunk = b"x" * 100000
        for _ in range(size//100000):
            f.write(chunk)
        f.write(b"x"*(size % 100000))
        f.flush()
        f.seek(0, io.SEEK_SET)
        # os.fsync(f)
        datafiles[file_id] = f
        blobsock_info = self._pyroDaemon.blobsocket.getsockname()  # return the port info for the blob socket as well
        return file_id, blobsock_info

    def prepare_memory_blob(self, size):
        print("preparing in-memory blob of size %d" % size)
        file_id = str(uuid.uuid4())
        datablobs[file_id] = b"x" * size
        blobsock_info = self._pyroDaemon.blobsocket.getsockname()  # return the port info for the blob socket as well
        return file_id, blobsock_info


class FileServerDaemon(Pyro4.core.Daemon):
    def __init__(self, host=None, port=0):
        super(FileServerDaemon, self).__init__(host, port)
        host, _ = self.transportServer.sock.getsockname()
        self.blobsocket = Pyro4.socketutil.createSocket(bind=(host, 0), timeout=Pyro4.config.COMMTIMEOUT, nodelay=False)
        print("Blob socket available on:", self.blobsocket.getsockname())
        
    def close(self):
        self.blobsocket.close()
        super(FileServerDaemon, self).close()

    def requestLoop(self, loopCondition=lambda: True):
        while loopCondition:
            rs = [self.blobsocket]
            rs.extend(self.sockets)
            rs, _, _ = select.select(rs, [], [], 3)
            daemon_events = []
            for sock in rs:
                if sock in self.sockets:
                    daemon_events.append(sock)
                elif sock is self.blobsocket:
                    self.handle_blob_connect(sock)
            if daemon_events:
                self.events(daemon_events)

    def handle_blob_connect(self, sock):
        csock, caddr = sock.accept()
        thread = threading.Thread(target=self.blob_client, args=(csock,))
        thread.daemon = True
        thread.start()

    def blob_client(self, csock):
        file_id = Pyro4.socketutil.receiveData(csock, 36).decode()
        print("{0} requesting file id {1}".format(csock.getpeername(), file_id))
        is_file, data = self.find_blob_data(file_id)
        if is_file:
            if hasattr(os, "sendfile"):
                print("...from file using sendfile()")
                out_fn = csock.fileno()
                in_fn = data.fileno()
                sent = 1
                offset = 0
                while sent:
                    sent = os.sendfile(out_fn, in_fn, offset, 512000)
                    offset += sent
            else:
                print("...from file using plain old read(); your os doesn't have sendfile()")
                while True:
                    chunk = data.read(512000)
                    if not chunk:
                        break
                    csock.sendall(chunk)
        else:
            print("...from memory")
            csock.sendall(data)
        csock.close()

    def find_blob_data(self, file_id):
        if file_id in datablobs:
            return False, datablobs.pop(file_id)
        elif file_id in datafiles:
            return True, datafiles.pop(file_id)
        else:
            raise KeyError("no data for given id")


with FileServerDaemon(host=Pyro4.socketutil.getIpAddress("")) as daemon:
    uri = daemon.register(FileServer, "example.filetransfer")
    print("Filetransfer server URI:", uri)
    daemon.requestLoop()