File: vectorlist.py

package info (click to toggle)
weevely 4.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,336 kB
  • sloc: python: 7,732; php: 1,035; sh: 53; makefile: 2
file content (201 lines) | stat: -rw-r--r-- 6,460 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
The module `core.vectorlist` defines a `VectorList` object, normally used
to store the module vectors.

Module class executes `_register_vectors()` at init to initialize the `VectorList`
object as `self.vectors` module attribute.

The methods exposed by VectorList can be used to get the result of a
given vector execution with `get_result()`, get all the results of a bunch of
vectors with `get_results()`, or get the result of the first vector that
response in the way we want with `find_first_result()`.

"""

from core.vectors import Os
from mako.template import Template
from core.weexceptions import DevException
from core.loggers import log, dlog
from core import modules
import utils
from core import messages

class VectorList(list):

    def __init__(self, session, module_name):

        self.session = session
        self.module_name = module_name

        list.__init__(self)

    def find_first_result(self, names = [], format_args = {}, condition = None, store_result = False, store_name = ''):
        """ Execute all the vectors and return the first result matching the given condition.

        Return the name and the result of the first vector execution response that satisfy
        the given condition.

        With unspecified names, execute all the vectors. Optionally store results.

        Exceptions triggered checking condition function are catched and logged.

        Args:
            names (list of str): The list of names of vectors to execute.

            format_args (dict): The arguments dictionary used to format the vectors with.

            condition (function): The function or lambda to check certain conditions on result.
            Must returns boolean.

            store_result (bool): Store as result.

            store_name (str): Store the found vector name in the specified argument.

        Returns:
            Tuple. Contains the vector name and execution result in the
            `( vector_name, result )` form.

        """

        if not callable(condition):
            raise DevException(messages.vectors.wrong_condition_type)
        if not isinstance(store_name, str):
            raise DevException(messages.vectors.wrong_store_name_type)

        for vector in self:

            # Skip with wrong vectors
            if not self._os_match(vector.target): continue

            # Clean names filter from empty objects
            names = [ n for n in names if n ]

            # Skip if names filter is passed but current vector is missing
            if names and not any(n in vector.name for n in names): continue

            # Add current vector name
            format_args['current_vector'] = vector.name

            # Run
            result = vector.run(format_args)

            # See if condition is verified
            try:
                condition_result = condition(result)
            except Exception as e:
                import traceback; log.info(traceback.format_exc())
                log.debug(messages.vectorlist.vector_s_triggers_an_exc % vector.name)

                condition_result = False

            # Eventually store result or vector name
            if condition_result:
                if store_result:
                    self.session[self.module_name]['results'][vector.name] = result
                if store_name:
                    self.session[self.module_name]['stored_args'][store_name] = vector.name

                return vector.name, result

        return None, None

    def get_result(self, name, format_args = {}, store_result = False):
        """Execute one vector and return the result.

        Run the vector with specified name. Optionally store results.

        Args:
            name (str): The name of vector to execute.

            format_args (dict): The arguments dictionary used to format the vectors with.

            store_result (bool): Store result in session.

        Returns:
            Object. Contains the vector execution result.

        """

        vector = self.get_by_name(name)

        if vector and self._os_match(vector.target):

            # Add current vector name
            format_args['current_vector'] = vector.name

            result = vector.run(format_args)

            if store_result:
                self.session[self.module_name]['results'][name] = result

            return result


    def get_results(self, names = [], format_args = {}, results_to_store = [ ]):
        """Execute all the vectors and return the results.

        With unspecified names, execute all the vectors. Optionally store results.
        Returns a dictionary with results.

        Args:
            names (list of str): The list of names of vectors to execute.

            format_args (dict): The arguments dictionary used to format the vectors with.

            results_to_store (list of str): The list of names of the vectors which
            store the execution result.

        Returns:
            Dictionary. Contains all the vector results in the
            `{ vector_name : result }` form.
        """

        response = {}

        for vector in self:

            if not self._os_match(vector.target): continue

            if names and not any(x in vector.name for x in names): continue

            # Add current vector name
            format_args['current_vector'] = vector.name

            response[vector.name] = vector.run(format_args)

            if not any(x in vector.name for x in results_to_store): continue

            self.session[self.module_name]['results'][vector.name] = response[vector.name]

        return response

    def _os_match(self, os):
        """Check if vector os is compatible with the remote os."""

        os_string = self.session['system_info']['results'].get('os')

        # If os_string is not set, just return True and continue
        if not os_string: return True

        os_current = Os.WIN if os_string.lower().startswith('win') else Os.NIX

        return os in (os_current, Os.ANY)

    def get_by_name(self, name):
        """Get the vector object by name.

        Args:
            name (str): the name of the requested vector.

        Returns:
            Vector object.
        """
        return next((v for v in self if v.name == name), None)

    def get_names(self):
        """Get the vectors names.

        Returns:
            List of strings. Contain vectors names.
        """
        return [ v.name for v in self ]