File: pyutils.cpp

package info (click to toggle)
pytango 10.1.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,304 kB
  • sloc: python: 27,795; cpp: 16,150; sql: 252; sh: 152; makefile: 43
file content (133 lines) | stat: -rw-r--r-- 5,169 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
 * SPDX-FileCopyrightText: All Contributors to the PyTango project
 *
 * SPDX-License-Identifier: LGPL-3.0-or-later
 */

#include "common_header.h"
#include "pyutils.h"

py::object from_cpp_str_to_pybind11_str(const std::string &in,
                                        const char *encoding /*=NULL defaults to latin-1 */,
                                        const char *errors /*="strict" */) {
    return from_cpp_char_to_pybind11_str(in.c_str(), static_cast<Py_ssize_t>(in.size()), encoding, errors);
}

py::object from_cpp_char_to_pybind11_str(const char *in,
                                         Py_ssize_t size /* =-1 */,
                                         const char *encoding /*=NULL defaults to latin-1 */,
                                         const char *errors /*="strict" */) {
    if(size < 0) {
        size = static_cast<Py_ssize_t>(strlen(in));
    }
    if(encoding == nullptr) {
        return py::reinterpret_steal<py::object>(PyUnicode_DecodeLatin1(in, size, errors));
    } else {
        return py::reinterpret_steal<py::object>(PyUnicode_Decode(in, size, encoding, errors));
    }
}

char *__copy_bytes_to_char(PyObject *in, Py_ssize_t *size) {
    Py_buffer view;

    if(PyObject_GetBuffer(in, &view, PyBUF_FULL_RO) < 0) {
        raise_(PyExc_TypeError, "Can't translate python object to C char* - PyObject_GetBuffer failed");
    }

    Py_ssize_t view_len = view.len;

    char *out = new char[static_cast<size_t>(view_len + 1)];
    out[view_len] = '\0';
    memcpy(out, static_cast<char *>(view.buf), static_cast<size_t>(view_len));

    PyBuffer_Release(&view);

    if(size != nullptr) {
        *size = view_len;
    }

    return out;
}

// The result is a newly allocated buffer. It is the responsibility
// of the caller to manage the memory returned by this function
char *from_python_str_to_cpp_char(const py::object &in, Py_ssize_t *size_out, const bool utf_encoding) {
    return from_python_str_to_cpp_char(in.ptr(), size_out, utf_encoding);
}

// The result is a newly allocated buffer. It is the responsibility
// of the caller to manage the memory returned by this function
char *from_python_str_to_cpp_char(PyObject *in, Py_ssize_t *size_out, const bool utf_encoding) {
    char *out = nullptr;
    if(PyUnicode_Check(in)) {
        PyObject *bytes_in;
        if(utf_encoding) {
            bytes_in = PyUnicode_AsUTF8String(in);
        } else {
            bytes_in = EncodeAsLatin1(in);
        }
        out = __copy_bytes_to_char(bytes_in, size_out);
        Py_DECREF(bytes_in);
    } else if(PyBytes_Check(in) || PyByteArray_Check(in)) {
        out = __copy_bytes_to_char(in, size_out);
    } else {
        raise_(PyExc_TypeError, "can't translate python object to C char*");
    }
    return out;
}

void throw_bad_type(const char *type, const char *source) {
    TangoSys_OMemStream description;
    description << "Incompatible argument type, expected type is : Tango::" << type << std::ends;

    TangoSys_OMemStream origin;
    origin << source << std::ends;

    Tango::Except::throw_exception("API_IncompatibleCmdArgumentType", description.str(), origin.str());
}

// The out_array will be updated with a pointer to existing memory (e.g., Python's internal memory for
// a byte array). The caller gets a "view" of the memory and must not modify the memory.
void view_pybytes_as_char_array(const py::object &py_value, Tango::DevVarCharArray &out_array) {
    CORBA::ULong nb;
    PyObject *data_ptr = py_value.ptr();

    if(PyUnicode_Check(data_ptr)) {
        Py_ssize_t size;
        CORBA::Octet *encoded_data = reinterpret_cast<CORBA::Octet *>(const_cast<char *>( // NOLINT(readability-redundant-casting)
            PyUnicode_AsUTF8AndSize(data_ptr, &size)));
        nb = static_cast<CORBA::ULong>(size);
        out_array.replace(nb, nb, encoded_data, false);
    }

    else if(PyBytes_Check(data_ptr)) {
        nb = static_cast<CORBA::ULong>(py::len(py_value));
        CORBA::Octet *encoded_data = reinterpret_cast<CORBA::Octet *>(const_cast<char *>( // NOLINT(readability-redundant-casting)
            PyBytes_AsString(data_ptr)));
        out_array.replace(nb, nb, encoded_data, false);
    } else if(PyByteArray_Check(data_ptr)) {
        nb = static_cast<CORBA::ULong>(py::len(py_value));
        CORBA::Octet *encoded_data = reinterpret_cast<CORBA::Octet *>(const_cast<char *>( // NOLINT(readability-redundant-casting)
            PyByteArray_AsString(data_ptr)));
        out_array.replace(nb, nb, encoded_data, false);
    } else {
        throw_bad_type(Tango::CmdArgTypeName[Tango::DEV_ENCODED], TANGO_EXCEPTION_ORIGIN);
    }
}

bool is_method_defined(py::object &obj, const std::string &method_name) {
    bool exists, is_method;
    is_method_defined(obj, method_name, exists, is_method);
    return exists && is_method;
}

void is_method_defined(py::object &obj, const std::string &method_name, bool &exists, bool &is_method) {
    exists = py::hasattr(obj, method_name.c_str());

    if(exists) {
        py::object attr = obj.attr(method_name.c_str());
        is_method = PyCallable_Check(attr.ptr()) == 1;
    } else {
        is_method = false;
    }
}