File: ServiceTest3.py

package info (click to toggle)
robotraconteur 1.2.7-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 101,380 kB
  • sloc: cpp: 1,149,268; cs: 87,653; java: 58,127; python: 26,897; ansic: 356; sh: 152; makefile: 90; xml: 51
file content (50 lines) | stat: -rw-r--r-- 1,696 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from __future__ import absolute_import

import RobotRaconteur as RR


class asynctestroot_impl(object):

    def RRServiceObjectInit(self, ctx, service_path):
        self._node = ctx.GetNode()

    def async_get_d1(self, handler):
        self._node.PostToThreadPool(lambda: handler(8.5515, None))

    def async_set_d1(self, value, handler):
        if value != 3.0819:
            self._node.PostToThreadPool(
                lambda: handler(RR.InvalidArgumentException()))
            return
        self._node.PostToThreadPool(lambda: handler(None))

    def async_get_err(self, handler):
        self._node.PostToThreadPool(lambda: handler(
            None, RR.InvalidArgumentException("Test message 1")))

    def async_set_err(self, val, handler):
        self._node.PostToThreadPool(
            lambda: handler(RR.InvalidOperationException("")))

    def async_f1(self, handler):
        self._node.PostToThreadPool(lambda: handler(None))

    def async_f2(self, a, handler):
        if a != 247:
            self._node.PostToThreadPool(
                lambda: handler(RR.InvalidArgumentException()))
            return
        self._node.PostToThreadPool(lambda: handler(None))

    def async_f3(self, a, b, handler):
        res = int(a + b)
        self._node.PostToThreadPool(lambda: handler(res, None))

    def async_err_func(self, handler):
        self._node.PostToThreadPool(
            lambda: handler(RR.InvalidOperationException()))

    def async_err_func2(self, handler):
        asynctestexp = self._node.GetExceptionType(
            "com.robotraconteur.testing.TestService5.asynctestexp")
        self._node.PostToThreadPool(lambda: handler(None, asynctestexp("")))