File: test_security.py

package info (click to toggle)
pyzmq 27.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,984 kB
  • sloc: python: 15,189; ansic: 285; makefile: 169; sh: 85
file content (238 lines) | stat: -rw-r--r-- 7,773 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""Test libzmq security (libzmq >= 3.3.0)"""

# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.

import contextlib
import os
import time
from threading import Thread

import zmq
from zmq.utils import z85
from zmq_test_utils import PYPY, BaseZMQTestCase, SkipTest

USER = b"admin"
PASS = b"password"


class TestSecurity(BaseZMQTestCase):
    def setUp(self):
        if zmq.zmq_version_info() < (4, 0):
            raise SkipTest("security is new in libzmq 4.0")
        try:
            zmq.curve_keypair()
        except zmq.ZMQError:
            raise SkipTest("security requires libzmq to be built with CURVE support")
        super().setUp()

    def zap_handler(self):
        socket = self.context.socket(zmq.REP)
        socket.bind("inproc://zeromq.zap.01")
        try:
            msg = self.recv_multipart(socket)

            version, sequence, domain, address, identity, mechanism = msg[:6]
            if mechanism == b'PLAIN':
                username, password = msg[6:]
            elif mechanism == b'CURVE':
                msg[6]

            assert version == b"1.0"
            assert identity == b"IDENT"
            reply = [version, sequence]
            if (
                mechanism == b'CURVE'
                or (mechanism == b'PLAIN' and username == USER and password == PASS)
                or (mechanism == b'NULL')
            ):
                reply.extend(
                    [
                        b"200",
                        b"OK",
                        b"anonymous",
                        b"\5Hello\0\0\0\5World",
                    ]
                )
            else:
                reply.extend(
                    [
                        b"400",
                        b"Invalid username or password",
                        b"",
                        b"",
                    ]
                )
            socket.send_multipart(reply)
        finally:
            socket.close()

    @contextlib.contextmanager
    def zap(self):
        self.start_zap()
        time.sleep(0.5)  # allow time for the Thread to start
        try:
            yield
        finally:
            self.stop_zap()

    def start_zap(self):
        self.zap_thread = Thread(target=self.zap_handler)
        self.zap_thread.start()

    def stop_zap(self):
        self.zap_thread.join()

    def bounce(self, server, client, test_metadata=True):
        msg = [os.urandom(64), os.urandom(64)]
        client.send_multipart(msg)
        frames = self.recv_multipart(server, copy=False)
        recvd = list(map(lambda x: x.bytes, frames))

        try:
            if test_metadata and not PYPY:
                for frame in frames:
                    assert frame.get('User-Id') == 'anonymous'
                    assert frame.get('Hello') == 'World'
                    assert frame['Socket-Type'] == 'DEALER'
        except zmq.ZMQVersionError:
            pass

        assert recvd == msg
        server.send_multipart(recvd)
        msg2 = self.recv_multipart(client)
        assert msg2 == msg

    def test_null(self):
        """test NULL (default) security"""
        server = self.socket(zmq.DEALER)
        client = self.socket(zmq.DEALER)
        assert client.MECHANISM == zmq.NULL
        assert server.mechanism == zmq.NULL
        assert client.plain_server == 0
        assert server.plain_server == 0
        iface = 'tcp://127.0.0.1'
        port = server.bind_to_random_port(iface)
        client.connect(f"{iface}:{port}")
        self.bounce(server, client, False)

    def test_plain(self):
        """test PLAIN authentication"""
        server = self.socket(zmq.DEALER)
        server.identity = b'IDENT'
        client = self.socket(zmq.DEALER)
        assert client.plain_username == b''
        assert client.plain_password == b''
        client.plain_username = USER
        client.plain_password = PASS
        assert client.getsockopt(zmq.PLAIN_USERNAME) == USER
        assert client.getsockopt(zmq.PLAIN_PASSWORD) == PASS
        assert client.plain_server == 0
        assert server.plain_server == 0
        server.plain_server = True
        assert server.mechanism == zmq.PLAIN
        assert client.mechanism == zmq.PLAIN

        assert not client.plain_server
        assert server.plain_server

        with self.zap():
            iface = 'tcp://127.0.0.1'
            port = server.bind_to_random_port(iface)
            client.connect(f"{iface}:{port}")
            self.bounce(server, client)

    def skip_plain_inauth(self):
        """test PLAIN failed authentication"""
        server = self.socket(zmq.DEALER)
        server.identity = b'IDENT'
        client = self.socket(zmq.DEALER)
        self.sockets.extend([server, client])
        client.plain_username = USER
        client.plain_password = b'incorrect'
        server.plain_server = True
        assert server.mechanism == zmq.PLAIN
        assert client.mechanism == zmq.PLAIN

        with self.zap():
            iface = 'tcp://127.0.0.1'
            port = server.bind_to_random_port(iface)
            client.connect(f"{iface}:{port}")
            client.send(b'ping')
            server.rcvtimeo = 250
            self.assertRaisesErrno(zmq.EAGAIN, server.recv)

    def test_keypair(self):
        """test curve_keypair"""
        try:
            public, secret = zmq.curve_keypair()
        except zmq.ZMQError:
            raise SkipTest("CURVE unsupported")

        assert type(secret) == bytes
        assert type(public) == bytes
        assert len(secret) == 40
        assert len(public) == 40

        # verify that it is indeed Z85
        bsecret, bpublic = (z85.decode(key) for key in (public, secret))
        assert type(bsecret) == bytes
        assert type(bpublic) == bytes
        assert len(bsecret) == 32
        assert len(bpublic) == 32

    def test_curve_public(self):
        """test curve_public"""
        try:
            public, secret = zmq.curve_keypair()
        except zmq.ZMQError:
            raise SkipTest("CURVE unsupported")
        if zmq.zmq_version_info() < (4, 2):
            raise SkipTest("curve_public is new in libzmq 4.2")

        derived_public = zmq.curve_public(secret)

        assert type(derived_public) == bytes
        assert len(derived_public) == 40

        # verify that it is indeed Z85
        bpublic = z85.decode(derived_public)
        assert type(bpublic) == bytes
        assert len(bpublic) == 32

        # verify that it is equal to the known public key
        assert derived_public == public

    def test_curve(self):
        """test CURVE encryption"""
        server = self.socket(zmq.DEALER)
        server.identity = b'IDENT'
        client = self.socket(zmq.DEALER)
        self.sockets.extend([server, client])
        try:
            server.curve_server = True
        except zmq.ZMQError as e:
            # will raise EINVAL if no CURVE support
            if e.errno == zmq.EINVAL:
                raise SkipTest("CURVE unsupported")

        server_public, server_secret = zmq.curve_keypair()
        client_public, client_secret = zmq.curve_keypair()

        server.curve_secretkey = server_secret
        server.curve_publickey = server_public
        client.curve_serverkey = server_public
        client.curve_publickey = client_public
        client.curve_secretkey = client_secret

        assert server.mechanism == zmq.CURVE
        assert client.mechanism == zmq.CURVE

        assert server.get(zmq.CURVE_SERVER)
        assert not client.get(zmq.CURVE_SERVER)

        with self.zap():
            iface = 'tcp://127.0.0.1'
            port = server.bind_to_random_port(iface)
            client.connect(f"{iface}:{port}")
            self.bounce(server, client)