File: zeromq_queue.py

package info (click to toggle)
plaso 20241006-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 673,228 kB
  • sloc: python: 91,831; sh: 557; xml: 97; makefile: 17; sql: 14; vhdl: 11
file content (156 lines) | stat: -rw-r--r-- 5,788 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the the zeromq queue."""

import unittest

from plaso.lib import errors
from plaso.multi_process import zeromq_queue

from tests import test_lib as shared_test_lib


class ZeroMQPullBindQueue(zeromq_queue.ZeroMQPullQueue):
  """A Plaso queue backed by a ZeroMQ PULL socket that binds to a port.

  This queue may only be used to pop items, not to push.
  """
  SOCKET_CONNECTION_TYPE = zeromq_queue.ZeroMQQueue.SOCKET_CONNECTION_BIND


class ZeroMQPushConnectQueue(zeromq_queue.ZeroMQPushQueue):
  """A Plaso queue backed by a ZeroMQ PUSH socket that connects to a port.

  This queue may only be used to push items, not to pop.
  """
  SOCKET_CONNECTION_TYPE = zeromq_queue.ZeroMQQueue.SOCKET_CONNECTION_CONNECT


class ZeroMQRequestBindQueue(zeromq_queue.ZeroMQRequestQueue):
  """A Plaso queue backed by a ZeroMQ REQ socket that binds to a port.

  This queue may only be used to pop items, not to push.
  """
  SOCKET_CONNECTION_TYPE = zeromq_queue.ZeroMQQueue.SOCKET_CONNECTION_BIND


class ZeroMQBufferedReplyConnectQueue(zeromq_queue.ZeroMQBufferedReplyQueue):
  """A Plaso queue backed by a ZeroMQ REP socket that connects to a port.

  This queue may only be used to pop items, not to push.
  """
  SOCKET_CONNECTION_TYPE = zeromq_queue.ZeroMQQueue.SOCKET_CONNECTION_CONNECT


class ZeroMQQueuesTest(shared_test_lib.BaseTestCase):
  """Tests for ZeroMQ queues."""

  # pylint: disable=protected-access

  _QUEUE_CLASSES = frozenset([
      zeromq_queue.ZeroMQPushBindQueue, ZeroMQPullBindQueue,
      ZeroMQRequestBindQueue])

  def _testItemTransferred(self, push_queue, pop_queue):
    """Tests than item can be transferred between two queues."""
    item = 'This is an item going from {0:s} to {1:s}.'.format(
        push_queue.name, pop_queue.name)
    push_queue.PushItem(item)
    popped_item = pop_queue.PopItem()
    self.assertEqual(item, popped_item)

  def testBufferedReplyQueue(self):
    """Tests for the buffered reply queue."""
    test_queue = zeromq_queue.ZeroMQBufferedReplyBindQueue(
        name='bufferedreply_bind', delay_open=False, linger_seconds=1)
    test_queue.PushItem('This is a test item.')
    test_queue.Close(abort=True)
    with self.assertRaises(errors.QueueAlreadyClosed):
      test_queue.PushItem('This shouldn\'t work')

  def testPushPullQueues(self):
    """Tests than an item can be transferred between push and pull queues."""
    push_queue = zeromq_queue.ZeroMQPushBindQueue(
        name='pushpull_pushbind', delay_open=False, linger_seconds=1)
    pull_queue = zeromq_queue.ZeroMQPullConnectQueue(
        name='pushpull_pullconnect', delay_open=False, port=push_queue.port,
        linger_seconds=1)
    self._testItemTransferred(push_queue, pull_queue)
    push_queue.Close()
    pull_queue.Close()
    pull_queue = ZeroMQPullBindQueue(
        name='pushpull_pullbind', delay_open=False, linger_seconds=1)
    push_queue = ZeroMQPushConnectQueue(
        name='pushpull_pushconnect', delay_open=False, port=pull_queue.port,
        linger_seconds=1)
    self._testItemTransferred(push_queue, pull_queue)
    push_queue.Close()
    pull_queue.Close()

  def testQueueStart(self):
    """Tests that delayed creation of ZeroMQ sockets occurs correctly."""
    for queue_class in self._QUEUE_CLASSES:
      queue_name = 'queuestart_{0:s}'.format(queue_class.__name__)
      test_queue = queue_class(
          name=queue_name, delay_open=True, linger_seconds=1)
      message = '{0:s} socket already exists.'.format(queue_name)
      self.assertIsNone(test_queue._zmq_socket, message)
      test_queue.Open()
      self.assertIsNotNone(test_queue._zmq_socket)
      test_queue.Close()

  def testRequestAndBufferedReplyQueues(self):
    """Tests REQ and buffered REP queue pairs."""
    reply_queue = zeromq_queue.ZeroMQBufferedReplyBindQueue(
        name='requestbufferedreply_replybind', delay_open=False,
        linger_seconds=1)
    request_queue = zeromq_queue.ZeroMQRequestConnectQueue(
        name='requestbufferedreply_requestconnect', delay_open=False,
        port=reply_queue.port, linger_seconds=1)
    self._testItemTransferred(reply_queue, request_queue)
    reply_queue.Close()
    request_queue.Close()
    request_queue = ZeroMQRequestBindQueue(
        name='requestbufferedreply_requestbind', delay_open=False,
        linger_seconds=1)
    reply_queue = ZeroMQBufferedReplyConnectQueue(
        name='requestbufferedreply_replyconnect', delay_open=False,
        port=request_queue.port, linger_seconds=0)
    self._testItemTransferred(reply_queue, request_queue)
    reply_queue.Close()
    request_queue.Close()

  def testEmptyBufferedQueues(self):
    """Tests the Empty method for buffered queues."""
    queue = zeromq_queue.ZeroMQBufferedReplyBindQueue(
        name='requestbufferedreply_replybind', delay_open=False,
        linger_seconds=1, buffer_max_size=3, timeout_seconds=2,
        buffer_timeout_seconds=1)
    try:
      while True:
        queue.PushItem('item', block=False)
    except errors.QueueFull:
      # Queue is now full
      pass

    with self.assertRaises(errors.QueueFull):
      queue.PushItem('item', block=False)

    queue.Empty()
    # We should now be able to push another item without an exception.
    queue.PushItem('item')
    queue.Empty()
    queue.Close()

  def testSocketCreation(self):
    """Tests that ZeroMQ sockets are created when a new queue is created."""
    for queue_class in self._QUEUE_CLASSES:
      queue_name = 'socket_creation_{0:s}'.format(queue_class.__name__)
      test_queue = queue_class(
          name=queue_name, delay_open=False, linger_seconds=1)
      self.assertIsNotNone(test_queue._zmq_socket)
      test_queue.Close()


if __name__ == '__main__':
  unittest.main()