File: reporter.py

package info (click to toggle)
python-certbot 0.28.0-1~deb9u2
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 2,108 kB
  • sloc: python: 17,251; makefile: 174; sh: 44
file content (100 lines) | stat: -rw-r--r-- 3,547 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""Collects and displays information to the user."""
from __future__ import print_function

import collections
import logging
import sys
import textwrap

from six.moves import queue  # type: ignore  # pylint: disable=import-error
import zope.interface

from certbot import interfaces
from certbot import util


logger = logging.getLogger(__name__)


@zope.interface.implementer(interfaces.IReporter)
class Reporter(object):
    """Collects and displays information to the user.

    :ivar `queue.PriorityQueue` messages: Messages to be displayed to
        the user.

    """

    HIGH_PRIORITY = 0
    """High priority constant. See `add_message`."""
    MEDIUM_PRIORITY = 1
    """Medium priority constant. See `add_message`."""
    LOW_PRIORITY = 2
    """Low priority constant. See `add_message`."""

    _msg_type = collections.namedtuple('ReporterMsg', 'priority text on_crash')

    def __init__(self, config):
        self.messages = queue.PriorityQueue()
        self.config = config

    def add_message(self, msg, priority, on_crash=True):
        """Adds msg to the list of messages to be printed.

        :param str msg: Message to be displayed to the user.

        :param int priority: One of `HIGH_PRIORITY`, `MEDIUM_PRIORITY`,
            or `LOW_PRIORITY`.

        :param bool on_crash: Whether or not the message should be
            printed if the program exits abnormally.

        """
        assert self.HIGH_PRIORITY <= priority <= self.LOW_PRIORITY
        self.messages.put(self._msg_type(priority, msg, on_crash))
        logger.debug("Reporting to user: %s", msg)

    def print_messages(self):
        """Prints messages to the user and clears the message queue.

        If there is an unhandled exception, only messages for which
        ``on_crash`` is ``True`` are printed.

        """
        bold_on = False
        if not self.messages.empty():
            no_exception = sys.exc_info()[0] is None
            bold_on = sys.stdout.isatty()
            if not self.config.quiet:
                if bold_on:
                    print(util.ANSI_SGR_BOLD)
                print('IMPORTANT NOTES:')
            first_wrapper = textwrap.TextWrapper(
                initial_indent=' - ',
                subsequent_indent=(' ' * 3),
                break_long_words=False,
                break_on_hyphens=False)
            next_wrapper = textwrap.TextWrapper(
                initial_indent=first_wrapper.subsequent_indent,
                subsequent_indent=first_wrapper.subsequent_indent,
                break_long_words=False,
                break_on_hyphens=False)
        while not self.messages.empty():
            msg = self.messages.get()
            if self.config.quiet:
                # In --quiet mode, we only print high priority messages that
                # are flagged for crash cases
                if not (msg.priority == self.HIGH_PRIORITY and msg.on_crash):
                    continue
            if no_exception or msg.on_crash:
                if bold_on and msg.priority > self.HIGH_PRIORITY:
                    if not self.config.quiet:
                        sys.stdout.write(util.ANSI_SGR_RESET)
                        bold_on = False
                lines = msg.text.splitlines()
                print(first_wrapper.fill(lines[0]))
                if len(lines) > 1:
                    print("\n".join(
                        next_wrapper.fill(line) for line in lines[1:]))
        if bold_on and not self.config.quiet:
            sys.stdout.write(util.ANSI_SGR_RESET)