File: ssh_autodetect.py

package info (click to toggle)
netmiko 1.4.3-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 556 kB
  • sloc: python: 3,020; makefile: 4
file content (265 lines) | stat: -rw-r--r-- 9,692 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
"""
The ssh_autodetect module is used to auto-detect the netmiko device_type to use to further initiate
a new SSH connection with a remote host. This auto-detection is based on a unique class called
**SSHDetect**.

Notes
-----

The **SSHDetect** class is instantiated using the same parameters than a standard Netmiko
connection (see the *netmiko.ssh_dispatacher.ConnectHandler* function). The only acceptable value
for the 'device_type' argument is 'autodetect'.

The auto-detection is solely based on the *SSH_MAPPER_BASE* dictionary. The keys are the name of
the 'device_type' supported for auto-detection and the value is another dictionary describing how
to handle the auto-detection.

* "cmd" : The command to send to the remote device. **The command output must not require paging.**
* "search_patterns" : A list of regex to compare with the output of the command
* "priority" : An integer (0-99) which specifies the confidence of the match above
* "dispatch" : The function to call to try the autodetection (per default SSHDetect._autodetect_std)

Examples
--------

# Auto-detection section
>>> from netmiko.ssh_autodetect import SSHDetect
>>> from netmiko.ssh_dispatcher import ConnectHandler
>>> remote_device = {'device_type': 'autodetect',
                     'host': 'remote.host',
                     'username': 'test',
                     'password': 'foo'}
>>> guesser = SSHDetect(**remote_device)
>>> best_match = guesser.autodetect()
>>> print(best_match) # Name of the best device_type to use further
>>> print(guesser.potential_matches) # Dictionary of the whole matching result

# Netmiko connection creation section
>>> remote_device['device_type'] = best_match
>>> connection = ConnectHandler(**remote_device)
"""
from __future__ import unicode_literals

import re
import time
from netmiko.ssh_dispatcher import ConnectHandler
from netmiko.base_connection import BaseConnection


# 'dispatch' key is the SSHDetect method to call. dispatch key will be popped off dictionary
# remaining keys indicate kwargs that will be passed to dispatch method.
SSH_MAPPER_BASE = {
    'alcatel_aos': {
        "cmd": "show system",
        "search_patterns": ["Alcatel-Lucent"],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
    'alcatel_sros': {
        "cmd": "show version | match ALCATEL",
        "search_patterns": ["TiMOS"],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
    'arista_eos': {
        "cmd": "show version | inc rist",
        "search_patterns": ["Arista"],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
    'cisco_ios': {
        "cmd": "show version | inc Cisco",
        "search_patterns": [
           "Cisco IOS Software",
           "Cisco Internetwork Operating System Software"
        ],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
    'cisco_asa': {
        "cmd": "show version | inc Cisco",
        "search_patterns": ["Cisco Adaptive Security Appliance", "Cisco ASA"],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
    'cisco_nxos': {
        "cmd": "show version | inc Cisco",
        "search_patterns": ["Cisco Nexus Operating System", "NX-OS"],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
    'cisco_xr': {
        "cmd": "show version | inc Cisco",
        "search_patterns": ["Cisco IOS XR"],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
    'huawei': {
        "cmd": "display version | inc Huawei",
        "search_patterns": ["Huawei Technologies", "Huawei Versatile Routing Platform Software"],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
    'juniper_junos': {
        "cmd": "show version | match JUNOS",
        "search_patterns": ["JUNOS Software Release", "JUNOS .+ Software"],
        "priority": 99,
        "dispatch": "_autodetect_std",
    },
}


class SSHDetect(object):
    """
    The SSHDetect class tries to automatically guess the device type running on the SSH remote end.
    Be careful that the kwargs 'device_type' must be set to 'autodetect', otherwise it won't work at
    all.

    Parameters
    ----------
    *args : list
        The same *args that you might provide to the netmiko.ssh_dispatcher.ConnectHandler.
    *kwargs : dict
        The same *kwargs that you might provide to the netmiko.ssh_dispatcher.ConnectHandler.

    Attributes
    ----------
    connection : netmiko.terminal_server.TerminalServer
        A basic connection to the remote SSH end.
    potential_matches: dict
        Dict of (device_type, accuracy) that is populated through an interaction with the
        remote end.

    Methods
    -------
    autodetect()
        Try to determine the device type.
    """

    def __init__(self, *args, **kwargs):
        """
        Constructor of the SSHDetect class
        """
        if kwargs["device_type"] != "autodetect":
            raise ValueError("The connection device_type must be 'autodetect'")
        self.connection = ConnectHandler(*args, **kwargs)
        # Call the _test_channel_read() in base to clear initial data
        output = BaseConnection._test_channel_read(self.connection)
        self.initial_buffer = output
        self.potential_matches = {}
        self._results_cache = {}

    def autodetect(self):
        """
        Try to guess the best 'device_type' based on patterns defined in SSH_MAPPER_BASE

        Returns
        -------
        best_match : str or None
            The device type that is currently the best to use to interact with the device
        """
        for device_type, autodetect_dict in SSH_MAPPER_BASE.items():
            tmp_dict = autodetect_dict.copy()
            call_method = tmp_dict.pop("dispatch")
            autodetect_method = getattr(self, call_method)
            accuracy = autodetect_method(**tmp_dict)
            if accuracy:
                self.potential_matches[device_type] = accuracy
                if accuracy >= 99:  # Stop the loop as we are sure of our match
                    best_match = sorted(self.potential_matches.items(), key=lambda t: t[1],
                                        reverse=True)
                    self.connection.disconnect()
                    return best_match[0][0]

        if not self.potential_matches:
            self.connection.disconnect()
            return None

        best_match = sorted(self.potential_matches.items(), key=lambda t: t[1], reverse=True)
        self.connection.disconnect()
        return best_match[0][0]

    def _send_command(self, cmd=""):
        """
        Handle reading/writing channel directly. It is also sanitizing the output received.

        Parameters
        ----------
        cmd : str, optional
            The command to send to the remote device (default : "", just send a new line)

        Returns
        -------
        output : str
            The output from the command sent
        """
        self.connection.write_channel(cmd + "\n")
        time.sleep(1)
        output = self.connection._read_channel_timing()
        output = self.connection.strip_ansi_escape_codes(output)
        output = self.connection.strip_backspaces(output)
        return output

    def _send_command_wrapper(self, cmd):
        """
        Send command to the remote device with a caching feature to avoid sending the same command
        twice based on the SSH_MAPPER_BASE dict cmd key.

        Parameters
        ----------
        cmd : str
            The command to send to the remote device after checking cache.

        Returns
        -------
        response : str
            The response from the remote device.
        """
        cached_results = self._results_cache.get(cmd)
        if not cached_results:
            response = self._send_command(cmd)
            self._results_cache[cmd] = response
            return response
        else:
            return cached_results

    def _autodetect_std(self, cmd="", search_patterns=None, re_flags=re.I, priority=99):
        """
        Standard method to try to auto-detect the device type. This method will be called for each
        device_type present in SSH_MAPPER_BASE dict ('dispatch' key). It will attempt to send a
        command and match some regular expression from the ouput for each entry in SSH_MAPPER_BASE
        ('cmd' and 'search_pattern' keys).

        Parameters
        ----------
        cmd : str
            The command to send to the remote device after checking cache.
        search_patterns : list
            A list of regular expression to look for in the command's output (default: None).
        re_flags: re.flags, optional
            Any flags from the python re module to modify the regular expression (default: re.I).
        priority: int, optional
            The confidence the match is right between 0 and 99 (default: 99).
        """
        invalid_responses = [
            r'% Invalid input detected',
            r'syntax error, expecting',
            r'Error: Unrecognized command',
            r'%Error'
        ]
        if not cmd or not search_patterns:
            return 0
        try:
            response = self._send_command_wrapper(cmd)
            # Look for error conditions in output
            for pattern in invalid_responses:
                match = re.search(pattern, response, flags=re.I)
                if match:
                    return 0
            for pattern in search_patterns:
                match = re.search(pattern, response, flags=re_flags)
                if match:
                    return priority
        except Exception:
            return 0
        return 0