File: __init__.py

package info (click to toggle)
ruby-spamcheck 1.10.1-2
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 668 kB
  • sloc: python: 1,261; ruby: 484; makefile: 54; sh: 13
file content (205 lines) | stat: -rw-r--r-- 6,905 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""Logic to perform spam/ham classification"""
import sys
from typing import Any, List

from google.protobuf.json_format import MessageToDict
from vyper import v

from api.v1.spamcheck_pb2 import SpamVerdict
from app import event, logger, queue, data_store
from server.interceptors import SpamCheckContext

log = logger.logger

classifiers = v.get_string("ml_classifiers")
if classifiers:
    sys.path.append(classifiers)

# pylint: disable=too-few-public-methods
class Spammable:
    """Base class for spammable types."""

    allow_list = v.get("filter.allow_list")
    deny_list = v.get("filter.deny_list")
    allowed_domains = set(v.get("filter.allowed_domains"))

    # Currently maximum allowed value is conditional allow to limit false positives.
    max_verdict = SpamVerdict.CONDITIONAL_ALLOW

    _inference_scores = {
        0.9: SpamVerdict.BLOCK,
        0.5: SpamVerdict.DISALLOW,
        0.4: SpamVerdict.CONDITIONAL_ALLOW,
        0.0: SpamVerdict.ALLOW,
    }

    _verdict_rankings = {
        SpamVerdict.ALLOW: 1,
        SpamVerdict.CONDITIONAL_ALLOW: 2,
        SpamVerdict.DISALLOW: 3,
        SpamVerdict.BLOCK: 4,
    }

    _verdict_mapping = {
        "ALLOW": SpamVerdict.ALLOW,
        "CONDITIONAL_ALLOW": SpamVerdict.CONDITIONAL_ALLOW,
        "DISALLOW": SpamVerdict.DISALLOW,
        "BLOCK": SpamVerdict.BLOCK,
    }

    def __init__(
        self, spammable: Any, context: SpamCheckContext, classifier: None
    ) -> None:
        self.context = context
        self.spammable = spammable
        self.classifier = classifier

    @classmethod
    def set_max_verdict(cls):
        """Set the maximum verdict for the spammable class."""
        key = f"max_{cls.__name__}_verdict".lower()
        verdict_value = v.get_string(key).upper()

        try:
            cls.max_verdict = cls._verdict_mapping[verdict_value]
        except KeyError:
            valid_args = ', '.join(cls._verdict_mapping.keys())
            log.fatal(f"Max verdict must be in [{valid_args}]. Got: {verdict_value}")

    @property
    def spammable(self) -> Any:
        """spam.Spammable: The spammable to analyze for spam"""
        return self._spammable

    @spammable.setter
    def spammable(self, spammable: Any):
        self._spammable = spammable
        self._email_allowed = self.email_allowed(spammable.user.emails)
        if spammable.project:
            self._project_allowed = self.project_allowed(spammable.project.project_id)
        else:
            self._project_allowed = True

    def verdict(self) -> SpamVerdict:
        """Analyze the spammable and determine if spam.

        Returns:
            SpamVerdict
        """

        # If the project is not allowed then this may be an indication that the model
        # does not generalize well to the spammables in that project. In this case we will
        # circumvent evaluating the spammable.
        if not self._project_allowed:
            return self._verdict(SpamVerdict.NOOP, 0.0, "project not allowed", False)
        if not self.classifier:
            return self._verdict(SpamVerdict.NOOP, 0.0, "classifier not loaded", False)

        spammable_dict = self.to_dict()
        confidence = self.classifier.score(spammable_dict)
        data_store.save(self.type(), spammable_dict, confidence)

        if self._email_allowed:
            return self._verdict(SpamVerdict.ALLOW, confidence, "email allowed", True)

        verdict = self.calculate_verdict(confidence)

        return self._verdict(verdict, confidence, "ml inference score", True)

    def calculate_verdict(self, confidence: float) -> SpamVerdict:
        """Convert an ML confidence value to a spam verdict.

        Args:
            confidence (float): The ML confidence value

        Returns:
            SpamVerdict
        """
        for threshold, vdict in self._inference_scores.items():
            if confidence >= threshold:
                return self._maximum_verdict(vdict)

        return SpamVerdict.NOOP

    def _verdict(
        self, verdict: int, confidence: float, reason: str, evaluated: bool
    ) -> SpamVerdict:
        fields = {
            "correlation_id": str(self.context.correlation_id),
            "metric": "spamcheck_verdicts",
            "spammable_type": self.type(),
            "email_allowlisted": self._email_allowed,
            "project_allowed": self._project_allowed,
            "project_path": self._spammable.project.project_path,
            "project_id": self._spammable.project.project_id,
            "user_name": self._spammable.user.username,
            "user_in_project": self._spammable.user_in_project,
            "verdict": SpamVerdict.Verdict.Name(verdict),
            "reason": reason,
            "confidence": confidence,
            "evaluated": evaluated,
        }
        log.info("Verdict calculated", extra=fields)
        if verdict not in (SpamVerdict.ALLOW, SpamVerdict.NOOP):
            evnt = event.Event(event.VERDICT, fields)
            queue.publish(evnt)
        return SpamVerdict(
            verdict=verdict, score=confidence, reason=reason, evaluated=evaluated
        )

    def project_allowed(self, project_id: int) -> bool:
        """Determine if a project should be tested for spam.

        Args:
            project_id (int): The GitLab project ID

        Returns:
            bool
        """
        if len(self.allow_list) != 0:
            if self.allow_list.get(project_id) is not None:
                return True
            return False

        if len(self.deny_list) != 0:
            if self.deny_list.get(project_id) is not None:
                return False
            return True

        return True

    def email_allowed(self, emails: List) -> bool:
        """Determine if a user email should be exempt from spam checking.

        Args:
            emails (list): A list of Emails represented by protobuf objects

        Returns:
            bool
        """
        for email in emails:
            if not "@" in email.email:
                continue
            domain = email.email.split("@")[-1]
            if email.verified and domain in self.allowed_domains:
                return True
        return False

    def type(self) -> str:
        """Get the string representation of the spammable type."""
        return type(self).__name__.lower()

    def to_dict(self) -> dict:
        """Return the dictionary representation of the spammable."""
        spammable_dict = MessageToDict(self._spammable)
        spammable_dict["correlation_id"] = str(self.context.correlation_id)
        return spammable_dict

    def _maximum_verdict(self, verdict: SpamVerdict) -> SpamVerdict:
        max_verdict = self._verdict_rankings[self.max_verdict]
        current_verdict = self._verdict_rankings[verdict]

        if max_verdict < current_verdict:
            return self.max_verdict

        return verdict