File: test_service_pickle.py

package info (click to toggle)
rpyc 6.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,324 kB
  • sloc: python: 6,442; makefile: 122
file content (99 lines) | stat: -rw-r--r-- 3,524 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from __future__ import print_function
import sys
import pickle  # noqa
import timeit
import rpyc
import unittest
from tests import cfg_tests
try:
    import pandas as pd
    import numpy as np
    _pampy_import_failed = False
except Exception:
    pd = None
    np = None
    _pampy_import_failed = True


DF_ROWS = 2000
DF_COLS = 2500


class MyService(rpyc.Service):
    on_connect_called = False
    on_disconnect_called = False

    def on_connect(self, conn):
        self.on_connect_called = True

    def on_disconnect(self, conn):
        self.on_disconnect_called = True

    def exposed_write_data(self, dataframe):
        rpyc.classic.obtain(dataframe)

    def exposed_get(self):
        return np.random.rand(3, 3)

    def exposed_ping(self):
        return "pong"


@unittest.skipIf(_pampy_import_failed, "Pandas & numpy are not available")
class TestServicePickle(unittest.TestCase):
    """Issues #323 and #329 showed for large objects there is an excessive number of round trips.

    This test case should check the interrelations of
        + MAX_IO_CHUNK
        + min twrite
        + occurrence rate of socket timeout for other clients
    """
    config = {}

    def setUp(self):
        self.cfg = {'allow_pickle': True}
        self.server = rpyc.utils.server.ThreadedServer(MyService, port=0, protocol_config=self.cfg.copy())
        self.server.logger.quiet = False
        self.thd = self.server._start_in_thread()
        self.conn = rpyc.connect("localhost", self.server.port, config=self.cfg)
        self.conn2 = rpyc.connect("localhost", self.server.port, config=self.cfg)
        # globals are made available to timeit, prepare them
        cfg_tests.timeit['conn'] = self.conn
        cfg_tests.timeit['conn2'] = self.conn2
        cfg_tests.timeit['df'] = pd.DataFrame(np.random.rand(DF_ROWS, DF_COLS))

    def tearDown(self):
        self.conn.close()
        self.server.close()
        self.thd.join()
        cfg_tests.timeit.clear()

    def test_dataframe_pickling(self):
        # the proxy will sync w/ the pickle handle and default proto and provide this as the argument to pickle.load
        # By timing how long w/ out any round trips pickle.dumps and picke.loads takes, the overhead of RPyC protocol
        # can be found

        rpyc.core.channel.Channel.COMPRESSION_LEVEL = 1
        # rpyc.core.stream.SocketStream.MAX_IO_CHUNK = 8000
        level = rpyc.core.channel.Channel.COMPRESSION_LEVEL
        max_chunk = rpyc.core.stream.SocketStream.MAX_IO_CHUNK
        repeat = 3
        number = 1
        pickle_stmt = 'pickle.loads(pickle.dumps(cfg_tests.timeit["df"]))'
        write_stmt = 'rpyc.lib.spawn(cfg_tests.timeit["conn"].root.write_data, cfg_tests.timeit["df"]); '
        write_stmt += '[cfg_tests.timeit["conn2"].root.ping() for i in range(30)]'
        # write_stmt = 'cfg_tests.timeit["conn"].root.write_data(cfg_tests.timeit["df"])'
        t = timeit.Timer(pickle_stmt, globals=globals())
        tpickle = min(t.repeat(repeat, number))
        t = timeit.Timer(write_stmt, globals=globals())
        twrite = min(t.repeat(repeat, number))

        headers = ['sample', 'tpickle', 'twrite', 'bytes', 'level', 'max_chunk']  # noqa
        data = [repeat, tpickle, twrite, sys.getsizeof(cfg_tests.timeit['df']), level, max_chunk]
        data = [str(d) for d in data]
        # print(','.join(headers), file=open('/tmp/time.csv', 'a'))
        # print(','.join(data), file=open('/tmp/time.csv', 'a'))


if __name__ == "__main__":
    unittest.main()