File: test_throttle.py

package info (click to toggle)
ros-nodelet-core 1.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 596 kB
  • sloc: cpp: 2,473; python: 367; xml: 171; makefile: 7
file content (40 lines) | stat: -rwxr-xr-x 1,013 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3

PKG = 'nodelet_topic_tools'
import roslib; roslib.load_manifest(PKG)

import unittest
import threading

import rospy

from std_msgs.msg import String

class TestNodeletThrottle(unittest.TestCase):
    def __init__(self, *args):
        super(TestNodeletThrottle, self).__init__(*args)

        self._lock = threading.RLock()

        self._msgs_rec = 0

        self._pub = rospy.Publisher('string_in', String)
        self._sub = rospy.Subscriber('string_out', String, self._cb)

    def _cb(self, msg):
        with self._lock:
            self._msgs_rec += 1

    def test_nodelet_throttle(self):
        for i in range(0,10):
            self._pub.publish(String('hello, world'))
            rospy.sleep(1.0)

        self.assert_(self._msgs_rec > 0, "No messages received from nodelet throttle on topic \"string_out\"")

if __name__ == '__main__':
    rospy.init_node('test_nodelet_throttle')

    import rostest
    rostest.rosrun(PKG, 'test_nodelet_throttle', TestNodeletThrottle)