File: add_analyzers

package info (click to toggle)
ros-diagnostics 1.10.1%2Bds1-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,316 kB
  • sloc: cpp: 2,995; python: 2,456; xml: 225; sh: 20; makefile: 3
file content (76 lines) | stat: -rwxr-xr-x 3,160 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python3

NAME='add_analyzers'

import sys
import argparse
from bondpy import bondpy
from diagnostic_msgs.srv import AddDiagnostics
import rosparam
import rospy

class AddAnalyzers:

    def __init__(self, args):
        rospy.on_shutdown(self.remove_group)
        self.namespace = rospy.resolve_name(rospy.get_name())
        self.bond = None
        self.add_analyzers(args)

    def remove_group(self):
        if self.bond:
            self.bond.shutdown()

    def bond_broken(self):
        self._signal_shutdown(self.namespace + ' bond to diagnostic aggregator is broken')

    def add_analyzers(self, myargv):
        usage = """
        allows you to dynamically add a group to the diagnostic aggregator,
        which is automatically removed when this node dies. Analyzer configurations
        can either be provided directly, or assumed that they are already can be
        found on the ros parameter server (where roslaunch may have loaded them).
        """
        parser = argparse.ArgumentParser(description=usage)
        parser.add_argument('analyzer_yaml', nargs='?', default=None)
        parser.add_argument('-t', '--timeout', type=float, dest='timeout', default=None, help='time in seconds to wait for the diagnostic_agg service to come up before timing out. Default waits indefinitely')
        args = parser.parse_args(myargv[1:])

        if args.analyzer_yaml is None:
            # nothing to do - it will assume parameters are already loaded on
            # the parameter server (usually via roslaunch)
            pass
        else:
            paramlist = rosparam.load_file(args.analyzer_yaml, default_namespace=self.namespace)
            for params, ns in paramlist:
                rosparam.upload_params(ns, params)

        self.bond = bondpy.Bond("/diagnostics_agg/bond" + self.namespace,
                                self.namespace,
                                on_broken=self.bond_broken)
        self.bond.set_connect_timeout(rospy.Duration(120))

        try:
            rospy.wait_for_service('/diagnostics_agg/add_diagnostics', timeout=args.timeout)
            self.bond.start()
            add_diagnostics = rospy.ServiceProxy('/diagnostics_agg/add_diagnostics', AddDiagnostics)
            resp = add_diagnostics(load_namespace=self.namespace)
            if resp.success:
                rospy.loginfo(NAME + ' successfully added analyzers to diagnostic aggregator')
            else:
                self._signal_shutdown(NAME + ' did not add any analyzers to diagnostic aggregator: ' + resp.message)
        except rospy.service.ServiceException:
            self._signal_shutdown(NAME + ' service returned failure - missing aggregator or failed init of analyzer group?')
        except rospy.ROSException:
            self._signal_shutdown(NAME + ' add timed out while waiting for diagnostics_agg service, or ROS shutdown')

        rospy.spin()

    def _signal_shutdown(self, reason):
        if not rospy.is_shutdown():
            rospy.logerr(reason)
            rospy.signal_shutdown(reason)

if __name__ == '__main__':
    rospy.init_node(NAME)
    AddAnalyzers(rospy.myargv())