File: cyzmq.pyx

package info (click to toggle)
pyzmq 20.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,228 kB
  • sloc: python: 14,051; ansic: 941; cpp: 315; makefile: 179; sh: 32
file content (61 lines) | stat: -rw-r--r-- 1,678 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#cython: language_level=3str

"""Using pyzmq from Cython"""


from libc.string cimport memcpy
from cpython cimport array
cimport numpy as np
from zmq cimport libzmq, Socket

import array
import sys
import time
from threading import Thread

import zmq

cpdef cython_sender(str url, int n):
    """Use entirely low-level libzmq APIs to send messages"""
    cdef void* ctx
    cdef void* s
    cdef int rc

    # create context and socket with libzmq
    ctx = libzmq.zmq_ctx_new()
    assert ctx != NULL, zmq.strerror(zmq.zmq_errno())
    s = libzmq.zmq_socket(ctx, libzmq.ZMQ_PUSH)
    assert s != NULL, zmq.strerror(zmq.zmq_errno())
    cdef bytes burl = url.encode("utf8")
    rc = libzmq.zmq_connect(s, burl)
    assert rc >= 0, zmq.strerror(zmq.zmq_errno())

    cdef libzmq.zmq_msg_t msg
    cdef array.array buf = array.array('i', [1])
    cdef int sz = 4
    start = time.perf_counter()
    for i in range(n):
        buf.data.as_ints[0] = i
        libzmq.zmq_msg_init_size(&msg, sz)
        memcpy(libzmq.zmq_msg_data(&msg), buf.data.as_chars, sz)
        libzmq.zmq_msg_send(&msg, s, 0)
        libzmq.zmq_msg_close(&msg)
    stop = time.perf_counter()
    # send a final message with the timer measurement
    buf = array.array('d', [stop - start])
    sz = 8
    libzmq.zmq_msg_init_size(&msg, sz)
    memcpy(libzmq.zmq_msg_data(&msg), buf.data.as_chars, sz)
    libzmq.zmq_msg_send(&msg, s, 0)
    libzmq.zmq_msg_close(&msg)

    # cleanup sockets
    libzmq.zmq_close(s)
    libzmq.zmq_term(ctx)


cpdef mixed_receiver(Socket s, int n):
    """Use mixed Cython APIs to recv messages"""
    cdef void* c_sock = s.handle
    for i in range(n):
        msg = s.recv()