File: zmq_server.py

package info (click to toggle)
python-xrt 1.6.0%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 17,572 kB
  • sloc: python: 59,424; xml: 4,786; lisp: 4,082; sh: 22; javascript: 18; makefile: 17
file content (199 lines) | stat: -rw-r--r-- 7,424 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# -*- coding: utf-8 -*-
"""
.. _oclserver:

OpenCL Server
-------------

This example contains a Python script designed to run on a GPU server,
leveraging ZeroMQ (ZMQ) for efficient data transfer. The script acts as a
remote accelerator device, receiving data from a client Python script,
performing calculations on the GPU, and returning the results for plotting on
a local computer.

Users can seamlessly execute their scripts in their favorite IDE, offloading
resource-intensive calculations to a remote server over the network. The only
trade-off is the potential delay due to data transfer, which is outweighed by
the benefits when local computations take longer than data transfer time.
Furthermore, the local graphical user interface (GUI) remains responsive
without freezes or issues caused by high GPU/CPU loads. This script now
supports all acceleration scenarios:

* synchrotron sources,
* wave propagation,
* bent crystals,
* multilayer reflectivity.

Script Components
~~~~~~~~~~~~~~~~~

The GPU accelerator script is comprised of two files located at
``tests/raycing/RemoteOpenCLCalculation``:

1. ``zmq_server.py``: The server script is the main component, responsible for
   receiving data and getting kernel names from the client. It listens on a
   predefined port, processes the received package, executes the specified
   kernel on the GPU and sends the computed data back to the client. This
   server script can be executed independently or in conjunction with the
   queue manager.

2. ``queue_device.py``: The queue manager script facilitates the handling of
   multiple user requests and the distribution of computational tasks across
   multiple servers. It provides scalability and load balancing capabilities.
   The queue manager can be executed on the same machine as the server or on a
   dedicated node. However, when running the queue manager on a separate node,
   data transfer times may increase.

Running the Script
~~~~~~~~~~~~~~~~~~

To execute the GPU accelerator script, follow these steps:

Set up the GPU server environment with the necessary dependencies, including
pyzmq, xrt and underlying dependencies (numpy, scipy, matplotlib, pyopencl).
Start the server script, either as a standalone process or in conjunction with
the queue manager, based on your specific requirements.

Ensure that the *client* Python script is configured to connect to the correct
server (or queue manager) address and port:

``targetOpenCL="GPU_SERVER_ADDRESS:15559"``


"""

__author__ = "Roman Chernikov, Konstantin Klementiev"
__date__ = "6 Jul 2023"

import zmq
import sys
import os; sys.path.append(os.path.join('..', '..', '..'))  # analysis:ignore
import numpy as np
import xrt.backends.raycing as raycing
import xrt.backends.raycing.myopencl as mcl
from xrt.backends.raycing.oes import OE
import pickle
from datetime import datetime

raycing._VERBOSITY_ = 80
PYVERSION = int(sys.version[0])

# Please specify here the OpenCL device installed on the server
targetOpenCL = 'GPU'

# Replace 'localhost' with queue manager address if not on the same machine
# if qmAddress is None, the server will work in standalone mode
qmAddress = 'localhost'


def send_zipped_pickle(socket, obj, flags=0, protocol=2):
    """pickle an object and zip the pickle before sending it"""
    p = pickle.dumps(obj, protocol)
    # z = zlib.compress(p, 8)
    return socket.send(p, flags=flags)


def recv_zipped_pickle(socket, flags=0, protocol=2):
    """inverse of send_zipped_pickle"""
    z = socket.recv(flags)
    # p = zlib.decompress(z)
    return pickle.loads(z)


diffractKernels = ['integrate_kirchhoff']
materialsKernels = ['get_amplitude', 'get_amplitude_multilayer',
                    'get_amplitude_graded_multilayer',
                    'get_amplitude_graded_multilayer_tran',
                    'estimate_bent_width', 'get_amplitudes_pytte']
undulatorKernels = ['undulator', 'undulator_taper', 'undulator_nf',
                    'undulator_full', 'undulator_nf_full',
                    'get_trajectory_filament', 'custom_field_filament',
                    'get_trajectory', 'custom_field']
oesKernels = ['reflect_crystal']


def main():
    matCL32 = mcl.XRT_CL(
        r'materials.cl', precisionOpenCL='float32', targetOpenCL=targetOpenCL)
    matCL64 = mcl.XRT_CL(
        r'materials.cl', precisionOpenCL='float64', targetOpenCL=targetOpenCL)
    sourceCL32 = mcl.XRT_CL(
        r'undulator.cl', precisionOpenCL='float32', targetOpenCL=targetOpenCL)
    sourceCL64 = mcl.XRT_CL(
        r'undulator.cl', precisionOpenCL='float64', targetOpenCL=targetOpenCL)
    waveCL = mcl.XRT_CL(
        r'diffract.cl', precisionOpenCL='float64', targetOpenCL=targetOpenCL)

    OE32 = OE(precisionOpenCL='float32', targetOpenCL=targetOpenCL)
    OE64 = OE(precisionOpenCL='float64', targetOpenCL=targetOpenCL)
    oeCL32 = OE32.ucl
    oeCL64 = OE64.ucl

    context = zmq.Context()
    socket = context.socket(zmq.REP)

    port = "15559" if qmAddress is None else "15560"

    if qmAddress is None:
        socket.bind("tcp://*:%s" % port)
    else:
        socket.connect("tcp://%s:%s" % (qmAddress, port))

    while True:
        # message = socket.recv_pyobj()  # Python 3 only
        message = recv_zipped_pickle(socket)
        precision = 64
        reply = None
        dtstr = datetime.now().strftime("%Y-%m-%d, %H:%M:%S")

        if 'scalarArgs' in message.keys():
            for arg in message['scalarArgs']:
                if isinstance(arg, np.int32):
                    continue
                elif isinstance(arg, np.float32):
                    precision = 32
                    break
                elif isinstance(arg, np.float64):
                    break
                else:
                    continue
        else:
            print(dtstr, "ERROR: not a kernel")
            reply = ("ERROR", "not a kernel")

        if 'kernelName' in message.keys():
            kName = message['kernelName']
            if kName in diffractKernels:
                xrtClContext = waveCL
            elif kName in undulatorKernels:
                xrtClContext = sourceCL32 if precision == 32 else sourceCL64
            elif kName in materialsKernels:
                xrtClContext = matCL32 if precision == 32 else matCL64
            elif kName in oesKernels:
                xrtClContext = oeCL32 if precision == 32 else oeCL64
            else:
                print(dtstr, "ERROR: unknown kernel:", kName)
                reply = ("ERROR", "unknown kernel: " + kName)
        else:
            print(dtstr, "ERROR: not a kernel")
            reply = ("ERROR", "not a kernel")

        if reply is None:
            try:
                dtstr = datetime.now().strftime("%Y-%m-%d, %H:%M:%S")
                print("{2} Calculating '{0}' in {1}-bit".format(
                    kName, precision, dtstr))
                reply = xrtClContext.run_parallel(**message)
                dtstr = datetime.now().strftime("%Y-%m-%d, %H:%M:%S")
                print(dtstr, "Calculations complete. Sending back results")
            except Exception as e:
                dtstr = datetime.now().strftime("%Y-%m-%d, %H:%M:%S")
                print(dtstr, " ERROR while calculating a kernel:")
                print(e)
                reply = ("ERROR", "error while calculating a kernel")

        send_zipped_pickle(socket, reply, protocol=2)


if __name__ == "__main__":
    main()