File: px2_shell.py

package info (click to toggle)
raritan-json-rpc-sdk 3.6.1%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 41,748 kB
  • sloc: cs: 162,629; perl: 85,818; python: 24,275; javascript: 5,937; makefile: 21
file content (102 lines) | stat: -rwxr-xr-x 3,058 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/python3
# SPDX-License-Identifier: BSD-3-Clause
#
# Copyright 2012 Raritan Inc. All rights reserved.

import time, datetime, sys, os, atexit, code, getpass
import rlcompleter
try:
    import readline
except:
    # no readline support, error message will be generated when used below
    pass

sys.path.append("pdu-python-api")
import raritan.rpc
from raritan.rpc import Agent, idl
from raritan.rpc import net, firmware, cfg, session, event
from raritan.rpc import pdumodel, tfw, test, sensors, usermgmt

def parse_url(url):
    scheme = 'https'
    username = 'admin'
    password = 'raritan'
    hostname = url

    parts = hostname.split('://')
    if len(parts) >= 2:
        (scheme, hostname) = parts

    parts = hostname.split('@')
    if len(parts) >= 2:
        (username, hostname) = parts

    parts = username.split(':')
    if len(parts) >= 2:
        (username, password) = parts

    return (scheme, hostname, username, password)

try:
    (scheme, hostname, username, password) = parse_url(sys.argv[1])
except:
    print("Usage: px2_shell.py [scheme://][user[:password]@]host")
    sys.exit(1)

try:
    readline.parse_and_bind('tab: complete')
    history = os.path.join(os.environ["HOME"], ".px2_shell_history")
    try:
        readline.read_history_file(history)
    except IOError:
        pass
    atexit.register(readline.write_history_file, history)
except:
    print("Sorry, no readline support!")

agent = Agent(scheme, hostname, username, password, disable_certificate_verification = True)

try:
    pdumodel.Pdu("/model/pdu/0", agent).getMetaData()
except raritan.rpc.HttpException as e:
    if "HTTP Error 451" in str(e):
        print("Changing the default password for user 'admin' is required.")
        admin = usermgmt.User("/auth/user/admin", agent)
        password = getpass.getpass()
        admin.setAccountPassword(password)
        agent.passwd = password

# selected sysrpc proxies
net_proxy = net.Net("/net", agent)
firmware_proxy = firmware.Firmware("/firmware", agent)
cfg_proxy = cfg.Cfg("/cfg", agent)
session_manager = session.SessionManager("/session", agent)
event_engine = event.Engine("/event_engine", agent)
event_service = event.Service("/eventservice", agent)

# selected modelrpc proxies
core = tfw.CoreCtrl("/model/core", agent)
unit = pdumodel.Unit("/model/unit", agent)
pdu = pdumodel.Pdu("/model/pdu/0", agent)
inlets = pdu.getInlets()
ocps = pdu.getOverCurrentProtectors()
outlets = pdu.getOutlets()

def poll_events(types = None):
    if types == None:
        types = [ raritan.rpc.idl.Event ]
    elif not isinstance(types, list):
        types = [ types ]
    channel = event_service.createChannel()
    channel.demandEventTypes(types)
    print("Polling for events, Ctrl-C to stop ...")
    try:
        while True:
            _, events = channel.pollEvents()
            ts = datetime.datetime.now().strftime("[%H:%M:%S]")
            for event in events:
                print("%s %s" % (ts, event))
    except KeyboardInterrupt:
        print()

code.interact(local=globals())