1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
/*
* SPDX-FileCopyrightText: All Contributors to the PyTango project
*
* SPDX-License-Identifier: LGPL-3.0-or-later
*/
#include "common_header.h"
#include "pyutils.h"
py::object from_cpp_str_to_pybind11_str(const std::string &in,
const char *encoding /*=NULL defaults to latin-1 */,
const char *errors /*="strict" */) {
return from_cpp_char_to_pybind11_str(in.c_str(), static_cast<Py_ssize_t>(in.size()), encoding, errors);
}
py::object from_cpp_char_to_pybind11_str(const char *in,
Py_ssize_t size /* =-1 */,
const char *encoding /*=NULL defaults to latin-1 */,
const char *errors /*="strict" */) {
if(size < 0) {
size = static_cast<Py_ssize_t>(strlen(in));
}
if(encoding == nullptr) {
return py::reinterpret_steal<py::object>(PyUnicode_DecodeLatin1(in, size, errors));
} else {
return py::reinterpret_steal<py::object>(PyUnicode_Decode(in, size, encoding, errors));
}
}
char *__copy_bytes_to_char(PyObject *in, Py_ssize_t *size) {
Py_buffer view;
if(PyObject_GetBuffer(in, &view, PyBUF_FULL_RO) < 0) {
raise_(PyExc_TypeError, "Can't translate python object to C char* - PyObject_GetBuffer failed");
}
Py_ssize_t view_len = view.len;
char *out = new char[static_cast<size_t>(view_len + 1)];
out[view_len] = '\0';
memcpy(out, static_cast<char *>(view.buf), static_cast<size_t>(view_len));
PyBuffer_Release(&view);
if(size != nullptr) {
*size = view_len;
}
return out;
}
// The result is a newly allocated buffer. It is the responsibility
// of the caller to manage the memory returned by this function
char *from_python_str_to_cpp_char(const py::object &in, Py_ssize_t *size_out, const bool utf_encoding) {
return from_python_str_to_cpp_char(in.ptr(), size_out, utf_encoding);
}
// The result is a newly allocated buffer. It is the responsibility
// of the caller to manage the memory returned by this function
char *from_python_str_to_cpp_char(PyObject *in, Py_ssize_t *size_out, const bool utf_encoding) {
char *out = nullptr;
if(PyUnicode_Check(in)) {
PyObject *bytes_in;
if(utf_encoding) {
bytes_in = PyUnicode_AsUTF8String(in);
} else {
bytes_in = EncodeAsLatin1(in);
}
out = __copy_bytes_to_char(bytes_in, size_out);
Py_DECREF(bytes_in);
} else if(PyBytes_Check(in) || PyByteArray_Check(in)) {
out = __copy_bytes_to_char(in, size_out);
} else {
raise_(PyExc_TypeError, "can't translate python object to C char*");
}
return out;
}
void throw_bad_type(const char *type, const char *source) {
TangoSys_OMemStream description;
description << "Incompatible argument type, expected type is : Tango::" << type << std::ends;
TangoSys_OMemStream origin;
origin << source << std::ends;
Tango::Except::throw_exception("API_IncompatibleCmdArgumentType", description.str(), origin.str());
}
// The out_array will be updated with a pointer to existing memory (e.g., Python's internal memory for
// a byte array). The caller gets a "view" of the memory and must not modify the memory.
void view_pybytes_as_char_array(const py::object &py_value, Tango::DevVarCharArray &out_array) {
CORBA::ULong nb;
PyObject *data_ptr = py_value.ptr();
if(PyUnicode_Check(data_ptr)) {
Py_ssize_t size;
CORBA::Octet *encoded_data = reinterpret_cast<CORBA::Octet *>(const_cast<char *>( // NOLINT(readability-redundant-casting)
PyUnicode_AsUTF8AndSize(data_ptr, &size)));
nb = static_cast<CORBA::ULong>(size);
out_array.replace(nb, nb, encoded_data, false);
}
else if(PyBytes_Check(data_ptr)) {
nb = static_cast<CORBA::ULong>(py::len(py_value));
CORBA::Octet *encoded_data = reinterpret_cast<CORBA::Octet *>(const_cast<char *>( // NOLINT(readability-redundant-casting)
PyBytes_AsString(data_ptr)));
out_array.replace(nb, nb, encoded_data, false);
} else if(PyByteArray_Check(data_ptr)) {
nb = static_cast<CORBA::ULong>(py::len(py_value));
CORBA::Octet *encoded_data = reinterpret_cast<CORBA::Octet *>(const_cast<char *>( // NOLINT(readability-redundant-casting)
PyByteArray_AsString(data_ptr)));
out_array.replace(nb, nb, encoded_data, false);
} else {
throw_bad_type(Tango::CmdArgTypeName[Tango::DEV_ENCODED], TANGO_EXCEPTION_ORIGIN);
}
}
bool is_method_defined(py::object &obj, const std::string &method_name) {
bool exists, is_method;
is_method_defined(obj, method_name, exists, is_method);
return exists && is_method;
}
void is_method_defined(py::object &obj, const std::string &method_name, bool &exists, bool &is_method) {
exists = py::hasattr(obj, method_name.c_str());
if(exists) {
py::object attr = obj.attr(method_name.c_str());
is_method = PyCallable_Check(attr.ptr()) == 1;
} else {
is_method = false;
}
}
|