File: winrm.py

package info (click to toggle)
mercurial 7.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 44,824 kB
  • sloc: python: 206,444; ansic: 56,415; tcl: 3,715; sh: 1,797; lisp: 1,483; cpp: 864; makefile: 752; javascript: 649; xml: 36
file content (87 lines) | stat: -rw-r--r-- 2,081 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# winrm.py - Interact with Windows Remote Management (WinRM)
#
# Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

# no-check-code because Python 3 native.

import logging
import pprint
import time

from pypsrp.client import Client
from pypsrp.powershell import (
    PowerShell,
    PSInvocationState,
    RunspacePool,
)
import requests.exceptions


logger = logging.getLogger(__name__)


def wait_for_winrm(host, username, password, timeout=180, ssl=False):
    """Wait for the Windows Remoting (WinRM) service to become available.

    Returns a ``psrpclient.Client`` instance.
    """

    end_time = time.time() + timeout

    while True:
        try:
            client = Client(
                host,
                username=username,
                password=password,
                ssl=ssl,
                connection_timeout=5,
            )
            client.execute_ps("Write-Host 'Hello, World!'")
            return client
        except requests.exceptions.ConnectionError:
            if time.time() >= end_time:
                raise

            time.sleep(1)


def format_object(o):
    if isinstance(o, str):
        return o

    try:
        o = str(o)
    except (AttributeError, TypeError):
        o = pprint.pformat(o.extended_properties)

    return o


def run_powershell(client, script):
    with RunspacePool(client.wsman) as pool:
        ps = PowerShell(pool)
        ps.add_script(script)

        ps.begin_invoke()

        while ps.state == PSInvocationState.RUNNING:
            ps.poll_invoke()
            for o in ps.output:
                print(format_object(o))

            ps.output[:] = []

        ps.end_invoke()

        for o in ps.output:
            print(format_object(o))

        if ps.state == PSInvocationState.FAILED:
            raise Exception(
                'PowerShell execution failed: %s'
                % ' '.join(map(format_object, ps.streams.error))
            )