File: __init__.py

package info (click to toggle)
raritan-json-rpc-sdk 4.0.20%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 57,236 kB
  • sloc: cs: 223,121; perl: 117,786; python: 26,872; javascript: 6,544; makefile: 27
file content (159 lines) | stat: -rw-r--r-- 5,268 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# SPDX-License-Identifier: BSD-3-Clause
#
# Copyright 2022 Raritan Inc. All rights reserved.
#
# This is an auto-generated file.

#
# Section generated by IdlC from "RawConfiguration.idl"
#

import raritan.rpc
from raritan.rpc import Interface, Structure, ValueObject, Enumeration, typecheck, DecodeException
import raritan.rpc.event

import raritan.rpc.rawcfg


# interface
class RawConfiguration(Interface):
    idlType = "rawcfg.RawConfiguration:1.0.1"

    # enumeration
    class Status(Enumeration):
        idlType = "rawcfg.RawConfiguration_1_0_1.Status:1.0.0"
        values = ["UNKNOWN", "UPLOAD_FAILED", "UPDATE_PENDING", "UPDATE_OK", "UPDATE_FAILED"]

    Status.UNKNOWN = Status(0)
    Status.UPLOAD_FAILED = Status(1)
    Status.UPDATE_PENDING = Status(2)
    Status.UPDATE_OK = Status(3)
    Status.UPDATE_FAILED = Status(4)

    class _getStatus(Interface.Method):
        name = 'getStatus'

        @staticmethod
        def encode():
            args = {}
            return args

        @staticmethod
        def decode(rsp, agent):
            status = raritan.rpc.rawcfg.RawConfiguration.Status.decode(rsp['status'])
            timeStamp = raritan.rpc.Time.decode(rsp['timeStamp'])
            typecheck.is_enum(status, raritan.rpc.rawcfg.RawConfiguration.Status, DecodeException)
            typecheck.is_time(timeStamp, DecodeException)
            return (status, timeStamp)

    # value object
    class RawConfigDownloadedEvent(raritan.rpc.event.UserEvent):
        idlType = "rawcfg.RawConfiguration_1_0_1.RawConfigDownloadedEvent:1.0.0"

        def __init__(self, actUserName, actIpAddr, source):
            super(raritan.rpc.rawcfg.RawConfiguration.RawConfigDownloadedEvent, self).__init__(actUserName, actIpAddr, source)

        def encode(self):
            json = super(raritan.rpc.rawcfg.RawConfiguration.RawConfigDownloadedEvent, self).encode()
            return json

        @classmethod
        def decode(cls, json, agent):
            obj = cls(
                # for event.UserEvent
                actUserName = json['actUserName'],
                actIpAddr = json['actIpAddr'],
                # for idl.Event
                source = Interface.decode(json['source'], agent),
            )
            return obj

        def listElements(self):
            elements = []
            elements = elements + super(raritan.rpc.rawcfg.RawConfiguration.RawConfigDownloadedEvent, self).listElements()
            return elements

    # value object
    class RawConfigUpdatedEvent(raritan.rpc.event.UserEvent):
        idlType = "rawcfg.RawConfiguration_1_0_1.RawConfigUpdatedEvent:1.0.0"

        def __init__(self, actUserName, actIpAddr, source):
            super(raritan.rpc.rawcfg.RawConfiguration.RawConfigUpdatedEvent, self).__init__(actUserName, actIpAddr, source)

        def encode(self):
            json = super(raritan.rpc.rawcfg.RawConfiguration.RawConfigUpdatedEvent, self).encode()
            return json

        @classmethod
        def decode(cls, json, agent):
            obj = cls(
                # for event.UserEvent
                actUserName = json['actUserName'],
                actIpAddr = json['actIpAddr'],
                # for idl.Event
                source = Interface.decode(json['source'], agent),
            )
            return obj

        def listElements(self):
            elements = []
            elements = elements + super(raritan.rpc.rawcfg.RawConfiguration.RawConfigUpdatedEvent, self).listElements()
            return elements
    def __init__(self, target, agent):
        super(RawConfiguration, self).__init__(target, agent)
        self.getStatus = RawConfiguration._getStatus(self)

# from raritan/rpc/rawcfg/__extend__.py
def upload(agent, data):
    """
    Method to upload raw config files

    **parameters**, **return**

    :param agent: An agent instance for the device where the config should be uploaded
    :param data: The binary data of the raw config file
    :return: return upload response

    **Example**
        :Example:

        from raritan import rpc
        from raritan.rpc import rawcfg

        agent = rpc.Agent("https", "my-pdu.example.com", "admin", "raritan")

        # read file in binary mode
        cfgFile = open("config.txt", "rb")
        # upload
        code = rawcfg.upload(agent, cfgFile.read())
        # view code
        print(code)

    """
    target = "cgi-bin/raw_config_update.cgi"
    formdata = [dict(data=data, filename="config.txt", formname="config_file", mimetype="application/octet-stream")]
    response = agent.form_data_file(target, formdata)
    return response["headers"].get("X-Response-Code")

def download(agent):
    """
    Method to download the configuration data

    **parameters**

    :param agent: An agent instance from the device where the raw configuration data should be downloaded
    :return: returns the raw configuration data

    **Example**
        :Example:

        from raritan import rpc
        from raritan.rpc import rawcfg

        agent = rpc.Agent("https", "my-pdu.example.com", "admin", "raritan")
        # download
        raw_cfg = rawcfg.download(agent)
        print(raw_cfg)
    """
    target = "cgi-bin/raw_config_download.cgi"
    return agent.get(target)