File: async_driver.py

package info (click to toggle)
python-scrapli 2023.7.30-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,536 kB
  • sloc: python: 14,459; makefile: 72
file content (590 lines) | stat: -rw-r--r-- 24,334 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
"""scrapli.driver.generic.async_driver"""
import asyncio
from io import BytesIO
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

from scrapli.decorators import timeout_modifier
from scrapli.driver import AsyncDriver
from scrapli.driver.generic.base_driver import BaseGenericDriver
from scrapli.exceptions import ScrapliTimeout, ScrapliValueError
from scrapli.response import MultiResponse, Response

if TYPE_CHECKING:
    from scrapli.driver.generic.base_driver import (  # pragma:  no cover
        ReadCallback,
        ReadCallbackReturnable,
    )


class AsyncGenericDriver(AsyncDriver, BaseGenericDriver):
    def __init__(
        self,
        host: str,
        port: Optional[int] = None,
        auth_username: str = "",
        auth_password: str = "",
        auth_private_key: str = "",
        auth_private_key_passphrase: str = "",
        auth_strict_key: bool = True,
        auth_bypass: bool = False,
        auth_telnet_login_pattern: str = "",
        auth_password_pattern: str = "",
        auth_passphrase_pattern: str = "",
        timeout_socket: float = 15.0,
        timeout_transport: float = 30.0,
        timeout_ops: float = 30.0,
        comms_prompt_pattern: str = r"^\S{0,48}[#>$~@:\]]\s*$",
        comms_return_char: str = "\n",
        ssh_config_file: Union[str, bool] = False,
        ssh_known_hosts_file: Union[str, bool] = False,
        on_init: Optional[Callable[..., Any]] = None,
        on_open: Optional[Callable[..., Any]] = None,
        on_close: Optional[Callable[..., Any]] = None,
        transport: str = "system",
        transport_options: Optional[Dict[str, Any]] = None,
        channel_log: Union[str, bool, BytesIO] = False,
        channel_log_mode: str = "write",
        channel_lock: bool = False,
        logging_uid: str = "",
    ) -> None:
        super().__init__(
            host=host,
            port=port,
            auth_username=auth_username,
            auth_password=auth_password,
            auth_private_key=auth_private_key,
            auth_private_key_passphrase=auth_private_key_passphrase,
            auth_strict_key=auth_strict_key,
            auth_bypass=auth_bypass,
            auth_telnet_login_pattern=auth_telnet_login_pattern,
            auth_password_pattern=auth_password_pattern,
            auth_passphrase_pattern=auth_passphrase_pattern,
            timeout_socket=timeout_socket,
            timeout_transport=timeout_transport,
            timeout_ops=timeout_ops,
            comms_prompt_pattern=comms_prompt_pattern,
            comms_return_char=comms_return_char,
            ssh_config_file=ssh_config_file,
            ssh_known_hosts_file=ssh_known_hosts_file,
            on_init=on_init,
            on_open=on_open,
            on_close=on_close,
            transport=transport,
            transport_options=transport_options,
            channel_log=channel_log,
            channel_log_mode=channel_log_mode,
            channel_lock=channel_lock,
            logging_uid=logging_uid,
        )

    async def get_prompt(self) -> str:
        """
        Convenience method to fetch prompt from the underlying Channel object

        Args:
            N/A

        Returns:
            str: string of the current prompt

        Raises:
            N/A

        """
        prompt: str = await self.channel.get_prompt()
        return prompt

    @timeout_modifier
    async def _send_command(
        self,
        command: str,
        strip_prompt: bool = True,
        failed_when_contains: Optional[Union[str, List[str]]] = None,
        eager: bool = False,
        timeout_ops: Optional[float] = None,
    ) -> Response:
        """
        Send a command

        Private method so that we can handle `eager` w/out having to have that argument showing up
        in all the methods that super to the "normal" send_command method as we only ever want eager
        to be used for the plural options -- i.e. send_commands not send_command!

        Args:
            command: string to send to device in privilege exec mode
            strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)
            failed_when_contains: string or list of strings indicating failure if found in response
            eager: if eager is True we do not read until prompt is seen at each command sent to the
                channel. Do *not* use this unless you know what you are doing as it is possible that
                it can make scrapli less reliable!
            timeout_ops: timeout ops value for this operation; only sets the timeout_ops value for
                the duration of the operation, value is reset to initial value after operation is
                completed

        Returns:
            Response: Scrapli Response object

        Raises:
            ScrapliValueError: if _base_transport_args is None for some reason

        """
        # decorator cares about timeout_ops, but nothing else does, assign to _ to appease linters
        _ = timeout_ops

        if not self._base_transport_args:
            # should not happen! :)
            raise ScrapliValueError("driver _base_transport_args not set for some reason")

        response = self._pre_send_command(
            host=self._base_transport_args.host,
            command=command,
            failed_when_contains=failed_when_contains,
        )
        raw_response, processed_response = await self.channel.send_input(
            channel_input=command, strip_prompt=strip_prompt, eager=eager
        )
        return self._post_send_command(
            raw_response=raw_response, processed_response=processed_response, response=response
        )

    async def send_command(
        self,
        command: str,
        *,
        strip_prompt: bool = True,
        failed_when_contains: Optional[Union[str, List[str]]] = None,
        timeout_ops: Optional[float] = None,
    ) -> Response:
        """
        Send a command

        Args:
            command: string to send to device in privilege exec mode
            strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)
            failed_when_contains: string or list of strings indicating failure if found in response
            timeout_ops: timeout ops value for this operation; only sets the timeout_ops value for
                the duration of the operation, value is reset to initial value after operation is
                completed

        Returns:
            Response: Scrapli Response object

        Raises:
            N/A

        """
        response: Response = await self._send_command(
            command=command,
            strip_prompt=strip_prompt,
            failed_when_contains=failed_when_contains,
            timeout_ops=timeout_ops,
        )
        return response

    async def send_commands(
        self,
        commands: List[str],
        *,
        strip_prompt: bool = True,
        failed_when_contains: Optional[Union[str, List[str]]] = None,
        stop_on_failed: bool = False,
        eager: bool = False,
        timeout_ops: Optional[float] = None,
    ) -> MultiResponse:
        """
        Send multiple commands

        Args:
            commands: list of strings to send to device in privilege exec mode
            strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)
            failed_when_contains: string or list of strings indicating failure if found in response
            stop_on_failed: True/False stop executing commands if a command fails, returns results
                as of current execution
            eager: if eager is True we do not read until prompt is seen at each command sent to the
                channel. Do *not* use this unless you know what you are doing as it is possible that
                it can make scrapli less reliable!
            timeout_ops: timeout ops value for this operation; only sets the timeout_ops value for
                the duration of the operation, value is reset to initial value after operation is
                completed. Note that this is the timeout value PER COMMAND sent, not for the total
                of the commands being sent!

        Returns:
            MultiResponse: Scrapli MultiResponse object

        Raises:
            N/A

        """
        responses = self._pre_send_commands(commands=commands)
        for command in commands[:-1]:
            response = await self._send_command(
                command=command,
                strip_prompt=strip_prompt,
                failed_when_contains=failed_when_contains,
                timeout_ops=timeout_ops,
                eager=eager,
            )
            responses.append(response)
            if stop_on_failed and response.failed is True:
                # should we find the prompt here w/ get_prompt?? or just let subsequent operations
                # deal w/ finding that? future us problem? :)
                break
        else:
            # if we did *not* break (i.e. no failure and/or no stop_on_failed) send the last command
            # with eager = False -- this way we *always* find the prompt at the end of the commands
            response = await self._send_command(
                command=commands[-1],
                strip_prompt=strip_prompt,
                failed_when_contains=failed_when_contains,
                timeout_ops=timeout_ops,
                eager=False,
            )
            responses.append(response)

        return responses

    async def send_commands_from_file(
        self,
        file: str,
        *,
        strip_prompt: bool = True,
        failed_when_contains: Optional[Union[str, List[str]]] = None,
        stop_on_failed: bool = False,
        eager: bool = False,
        timeout_ops: Optional[float] = None,
    ) -> MultiResponse:
        """
        Send command(s) from file

        Args:
            file: string path to file
            strip_prompt: True/False strip prompt from returned output
            failed_when_contains: string or list of strings indicating failure if found in response
            stop_on_failed: True/False stop executing commands if a command fails, returns results
                as of current execution
            eager: if eager is True we do not read until prompt is seen at each command sent to the
                channel. Do *not* use this unless you know what you are doing as it is possible that
                it can make scrapli less reliable!
            timeout_ops: timeout ops value for this operation; only sets the timeout_ops value for
                the duration of the operation, value is reset to initial value after operation is
                completed. Note that this is the timeout value PER COMMAND sent, not for the total
                of the commands being sent!

        Returns:
            MultiResponse: Scrapli MultiResponse object

        Raises:
            N/A

        """
        commands = self._pre_send_from_file(file=file, caller="send_commands_from_file")

        return await self.send_commands(
            commands=commands,
            strip_prompt=strip_prompt,
            failed_when_contains=failed_when_contains,
            stop_on_failed=stop_on_failed,
            eager=eager,
            timeout_ops=timeout_ops,
        )

    @timeout_modifier
    async def send_and_read(
        self,
        channel_input: str,
        *,
        expected_outputs: Optional[List[str]] = None,
        strip_prompt: bool = True,
        failed_when_contains: Optional[Union[str, List[str]]] = None,
        timeout_ops: Optional[float] = None,
        read_duration: float = 2.5,
    ) -> Response:
        """
        Send an input and read outputs.

        Unlike "normal" scrapli behavior this method reads until the prompt(normal) OR until any of
        a list of expected outputs is seen, OR until the read duration is exceeded. This method does
        not care about/understand privilege levels. This *can* cause you some potential issues if
        not used carefully!

        Args:
            channel_input: input to send to the channel; intentionally named "channel_input" instead
                of "command" or "config" due to this method not caring about privilege levels
            expected_outputs: List of outputs to look for in device response; returns as soon as any
                of the outputs are seen
            strip_prompt: True/False strip prompt from returned output
            failed_when_contains: string or list of strings indicating failure if found in response
            timeout_ops: timeout ops value for this operation; only sets the timeout_ops value for
                the duration of the operation, value is reset to initial value after operation is
                completed
            read_duration:  float duration to read for

        Returns:
            Response: Scrapli Response object

        Raises:
            ScrapliValueError: if _base_transport_args is None for some reason

        """
        # decorator cares about timeout_ops, but nothing else does, assign to _ to appease linters
        _ = timeout_ops

        if not self._base_transport_args:
            # should not happen! :)
            raise ScrapliValueError("driver _base_transport_args not set for some reason")

        response = self._pre_send_command(
            host=self._base_transport_args.host,
            command=channel_input,
            failed_when_contains=failed_when_contains,
        )
        raw_response, processed_response = await self.channel.send_input_and_read(
            channel_input=channel_input,
            strip_prompt=strip_prompt,
            expected_outputs=expected_outputs,
            read_duration=read_duration,
        )
        return self._post_send_command(
            raw_response=raw_response, processed_response=processed_response, response=response
        )

    @timeout_modifier
    async def send_interactive(
        self,
        interact_events: Union[List[Tuple[str, str]], List[Tuple[str, str, bool]]],
        *,
        failed_when_contains: Optional[Union[str, List[str]]] = None,
        privilege_level: str = "",
        timeout_ops: Optional[float] = None,
        interaction_complete_patterns: Optional[List[str]] = None,
    ) -> Response:
        """
        Interact with a device with changing prompts per input.

        Used to interact with devices where prompts change per input, and where inputs may be hidden
        such as in the case of a password input. This can be used to respond to challenges from
        devices such as the confirmation for the command "clear logging" on IOSXE devices for
        example. You may have as many elements in the "interact_events" list as needed, and each
        element of that list should be a tuple of two or three elements. The first element is always
        the input to send as a string, the second should be the expected response as a string, and
        the optional third a bool for whether or not the input is "hidden" (i.e. password input)

        An example where we need this sort of capability:

        ```
        3560CX#copy flash: scp:
        Source filename []? test1.txt
        Address or name of remote host []? 172.31.254.100
        Destination username [carl]?
        Writing test1.txt
        Password:

        Password:
         Sink: C0644 639 test1.txt
        !
        639 bytes copied in 12.066 secs (53 bytes/sec)
        3560CX#
        ```

        To accomplish this we can use the following:

        ```
        interact = conn.channel.send_inputs_interact(
            [
                ("copy flash: scp:", "Source filename []?", False),
                ("test1.txt", "Address or name of remote host []?", False),
                ("172.31.254.100", "Destination username [carl]?", False),
                ("carl", "Password:", False),
                ("super_secure_password", prompt, True),
            ]
        )
        ```

        If we needed to deal with more prompts we could simply continue adding tuples to the list of
        interact "events".

        Args:
            interact_events: list of tuples containing the "interactions" with the device
                each list element must have an input and an expected response, and may have an
                optional bool for the third and final element -- the optional bool specifies if the
                input that is sent to the device is "hidden" (ex: password), if the hidden param is
                not provided it is assumed the input is "normal" (not hidden)
            failed_when_contains: list of strings that, if present in final output, represent a
                failed command/interaction
            privilege_level: ignored in this base class; for LSP reasons for subclasses
            timeout_ops: timeout ops value for this operation; only sets the timeout_ops value for
                the duration of the operation, value is reset to initial value after operation is
                completed. Note that this is the timeout value PER COMMAND sent, not for the total
                of the commands being sent!
            interaction_complete_patterns: list of patterns, that if seen, indicate the interactive
                "session" has ended and we should exit the interactive session.

        Returns:
            Response: scrapli Response object

        Raises:
            ScrapliValueError: if _base_transport_args is None for some reason

        """
        # decorator cares about timeout_ops, but nothing else does, assign to _ to appease linters
        _ = timeout_ops
        # privilege level only matters "up" in the network driver layer
        _ = privilege_level

        if not self._base_transport_args:
            # should not happen! :)
            raise ScrapliValueError("driver _base_transport_args not set for some reason")

        response = self._pre_send_interactive(
            host=self._base_transport_args.host,
            interact_events=interact_events,
            failed_when_contains=failed_when_contains,
        )
        raw_response, processed_response = await self.channel.send_inputs_interact(
            interact_events=interact_events,
            interaction_complete_patterns=interaction_complete_patterns,
        )
        return self._post_send_command(
            raw_response=raw_response, processed_response=processed_response, response=response
        )

    async def read_callback(  # noqa: C901
        self,
        callbacks: List["ReadCallback"],
        initial_input: Optional[str] = None,
        read_output: bytes = b"",
        read_delay: float = 0.1,
        read_timeout: float = -1.0,
    ) -> "ReadCallbackReturnable":
        r"""
        Read from a channel and react to the output with some callback.

        This method is kind of like an "advanced" send_interactive -- the idea is simple: send some
        "stuff" to the channel (optionally), and then read from the channel. Based on the output
        do something. The callbacks is a list of `ReadCallback` which is an object containing the
        actual callback to execute, some info about when to trigger that callback (also when *not*
        to trigger that callback), as well as some attributes to control the next (if desired)
        iteration of read_callback. You could in theory do basically everything with this method by
        chaining callbacks forever, but you probably don't want to do that for real!

        Example usage:

        ```
        from scrapli.driver.core import IOSXEDriver
        from scrapli.driver.generic.base_driver import ReadCallback
        from scrapli.driver.generic.sync_driver import GenericDriver

        device = {
            "host": "rtr1",
            "auth_strict_key": False,
            "ssh_config_file": True,
        }

        def callback_one(cls: GenericDriver, read_output: str):
            cls.acquire_priv("configuration")
            cls.channel.send_return()


        def callback_two(cls: GenericDriver, read_output: str):
            print(f"previous read output : {read_output}")

            r = cls.send_command("show run | i hostname")
            print(f"result: {r.result}")


        with IOSXEDriver(**device) as conn:
            callbacks = [
                ReadCallback(
                    contains="rtr1#",
                    callback=callback_one,
                    name="call1",
                    case_insensitive=False
                ),
                ReadCallback(
                    contains_re=r"^rtr1\(config\)#",
                    callback=callback_two,
                    complete=True,
                )
            ]
            conn.read_callback(callbacks=callbacks, initial_input="show run | i hostname")
        ```

        Args:
            callbacks: a list of ReadCallback objects
            initial_input: optional string to send to "kick off" the read_callback method
            read_output: optional bytes to append any new reads to
            read_delay: sleep interval between reads
            read_timeout: value to set the `transport_timeout` to for the duration of the reading
                portion of this method. If left default (-1.0) or set to anything below 0, the
                transport timeout value will be left alone (whatever the timeout_transport value is)
                otherwise, the provided value will be temporarily set as the timeout_transport for
                duration of the reading.

        Returns:
            ReadCallbackReturnable: either None or call to read_callback again

        Raises:
            ScrapliTimeout: if the read operation times out (base don the read_timeout value) during
                the read callback check.

        """
        if initial_input is not None:
            self.channel.write(channel_input=f"{initial_input}{self.comms_return_char}")
            return await self.read_callback(callbacks=callbacks, initial_input=None)

        original_transport_timeout = self.timeout_transport

        # if the read_timeout value is -1.0 or just less than 0, that indicates we should use
        # the "normal" transport timeout and not modify anything
        self.timeout_transport = read_timeout if read_timeout >= 0 else self.timeout_transport

        _read_delay = 0.1 if read_delay <= 0 else read_delay

        while True:
            try:
                read_output += await self.channel.read()
            except ScrapliTimeout as exc:
                self.timeout_transport = original_transport_timeout

                raise ScrapliTimeout("timeout during read in read_callback operation") from exc

            for callback in callbacks:
                _run_callback = callback.check(read_output=read_output)

                if (
                    callback.only_once is True
                    and callback._triggered is True  # pylint: disable=W0212
                ):
                    self.logger.warning(
                        f"callback {callback.name} matches but is set to 'only_once', "
                        "skipping this callback"
                    )

                    continue

                if _run_callback is True:
                    self.logger.info(f"callback {callback.name} matched, executing")

                    self.timeout_transport = original_transport_timeout

                    coro = callback.run(driver=self)
                    if coro is not None:
                        # should always be a coroutine in this case, this appeases mypy
                        await coro

                    if callback.complete:
                        self.logger.debug("callback complete is true, done with read_callback")
                        return None

                    if callback.reset_output:
                        read_output = b""

                    return await self.read_callback(
                        callbacks=callbacks,
                        initial_input=None,
                        read_output=read_output,
                        read_delay=callback.next_delay,
                        read_timeout=callback.next_timeout,
                    )

            await asyncio.sleep(_read_delay)