File: connection.cpp

package info (click to toggle)
pytango 10.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 10,216 kB
  • sloc: python: 28,206; cpp: 16,380; sql: 255; sh: 82; makefile: 43
file content (137 lines) | stat: -rw-r--r-- 5,364 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
 * SPDX-FileCopyrightText: All Contributors to the PyTango project
 *
 * SPDX-License-Identifier: LGPL-3.0-or-later
 */

#include "precompiled_header.hpp"
#include "pytgutils.h"
#include "callback.h"

namespace PyConnection
{

static Tango::DeviceData
    command_inout(Tango::Connection &self, const std::string &cmd_name, const Tango::DeviceData &argin)
{
    AutoPythonAllowThreads guard;
    return self.command_inout(const_cast<std::string &>(cmd_name), const_cast<Tango::DeviceData &>(argin));
}

static long command_inout_asynch_id(Tango::Connection &self,
                                    const std::string &cmd_name,
                                    const Tango::DeviceData &argin,
                                    bool forget)
{
    AutoPythonAllowThreads guard;
    return self.command_inout_asynch(
        const_cast<std::string &>(cmd_name), const_cast<Tango::DeviceData &>(argin), forget);
}

static Tango::DeviceData command_inout_reply(Tango::Connection &self, long id)
{
    AutoPythonAllowThreads guard;
    return self.command_inout_reply(id);
}

static Tango::DeviceData command_inout_reply(Tango::Connection &self, long id, long timeout)
{
    AutoPythonAllowThreads guard;
    return self.command_inout_reply(id, timeout);
}

static void command_inout_asynch_cb(bopy::object py_self,
                                    const std::string &cmd_name,
                                    const Tango::DeviceData &argin,
                                    bopy::object py_cb)
{
    Tango::Connection *self = bopy::extract<Tango::Connection *>(py_self);
    PyCallBackAutoDie *cb = bopy::extract<PyCallBackAutoDie *>(py_cb);
    cb->set_autokill_references(py_cb, py_self);

    try
    {
        AutoPythonAllowThreads guard;
        self->command_inout_asynch(const_cast<std::string &>(cmd_name), const_cast<Tango::DeviceData &>(argin), *cb);
    }
    catch(...)
    {
        cb->unset_autokill_references();
        throw;
    }
}

static void get_asynch_replies(Tango::Connection &self)
{
    AutoPythonAllowThreads guard;
    self.get_asynch_replies();
}

static void get_asynch_replies(Tango::Connection &self, long call_timeout)
{
    AutoPythonAllowThreads guard;
    self.get_asynch_replies(call_timeout);
}

bopy::str get_fqdn()
{
    std::string fqdn;
    Tango::Connection::get_fqdn(fqdn);
    return bopy::str(fqdn.c_str());
}
} // namespace PyConnection

void export_connection()
{
    bopy::class_<Tango::Connection, boost::noncopyable> Connection("Connection", bopy::no_init);

    Connection.def("dev_name", bopy::pure_virtual(&Tango::Connection::dev_name))
        .def(
            "get_db_host", &Tango::Connection::get_db_host, bopy::return_value_policy<bopy::copy_non_const_reference>())
        .def(
            "get_db_port", &Tango::Connection::get_db_port, bopy::return_value_policy<bopy::copy_non_const_reference>())
        .def("get_db_port_num", &Tango::Connection::get_db_port_num)
        .def("get_from_env_var", &Tango::Connection::get_from_env_var)
        .def("get_fqdn", &PyConnection::get_fqdn)
        .staticmethod("get_fqdn")
        .def("is_dbase_used", &Tango::Connection::is_dbase_used)
        .def("get_dev_host",
             &Tango::Connection::get_dev_host,
             bopy::return_value_policy<bopy::copy_non_const_reference>())
        .def("get_dev_port",
             &Tango::Connection::get_dev_port,
             bopy::return_value_policy<bopy::copy_non_const_reference>())
        .def("connect", &Tango::Connection::connect)
        .def("reconnect", &Tango::Connection::reconnect)
        .def("get_idl_version", &Tango::Connection::get_idl_version)
        .def("set_timeout_millis", &Tango::Connection::set_timeout_millis)
        .def("get_timeout_millis", &Tango::Connection::get_timeout_millis)
        .def("get_source", &Tango::Connection::get_source)
        .def("set_source", &Tango::Connection::set_source)
        .def("get_transparency_reconnection", &Tango::Connection::get_transparency_reconnection)
        .def("set_transparency_reconnection", &Tango::Connection::set_transparency_reconnection)
        .def("__command_inout", &PyConnection::command_inout)
        .def("__command_inout_asynch_id", &PyConnection::command_inout_asynch_id)
        .def("__command_inout_asynch_cb", &PyConnection::command_inout_asynch_cb)
        .def("command_inout_reply_raw",
             (Tango::DeviceData(*)(Tango::Connection &, long)) & PyConnection::command_inout_reply)
        .def("command_inout_reply_raw",
             (Tango::DeviceData(*)(Tango::Connection &, long, long)) & PyConnection::command_inout_reply)

        //
        // Asynchronous methods
        //

        .def("get_asynch_replies", (void (*)(Tango::Connection &)) & PyConnection::get_asynch_replies)
        .def("get_asynch_replies", (void (*)(Tango::Connection &, long)) & PyConnection::get_asynch_replies)
        .def("cancel_asynch_request", &Tango::Connection::cancel_asynch_request)
        .def("cancel_all_polling_asynch_request", &Tango::Connection::cancel_all_polling_asynch_request)

        //
        // Control access related methods
        //

        .def("get_access_control", &Tango::Connection::get_access_control)
        .def("set_access_control", &Tango::Connection::set_access_control)
        .def("get_access_right", &Tango::Connection::get_access_right);
}