File: bounce.py

package info (click to toggle)
python-flanker 0.9.15-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 17,976 kB
  • sloc: python: 9,308; makefile: 4
file content (80 lines) | stat: -rw-r--r-- 2,157 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from collections import deque
from contextlib import closing

import attr
import regex as re
import six
from attr.validators import instance_of
from six.moves import range

from flanker.mime.message.headers import MimeHeaders
from flanker.mime.message.headers.parsing import parse_stream


_HEADERS = ('Action',
            'Content-Description',
            'Diagnostic-Code',
            'Final-Recipient',
            'Received',
            'Remote-Mta',
            'Reporting-Mta',
            'Status')

_RE_STATUS = re.compile(r'\d\.\d+\.\d+', re.IGNORECASE)


@attr.s(frozen=True)
class Result(object):
    score = attr.ib(validator=instance_of(float))
    status = attr.ib(validator=instance_of(six.text_type))
    diagnostic_code = attr.ib(validator=instance_of(six.text_type))
    notification = attr.ib(validator=instance_of(six.text_type))

    def is_bounce(self, probability=0.3):
        return self.score > probability


def detect(message):
    headers = _collect_headers(message)
    return Result(score=len(headers) / float(len(_HEADERS)),
                  status=_get_status(headers),
                  diagnostic_code=headers.get('Diagnostic-Code', u''),
                  notification=_get_notification(message))


def _collect_headers(message):
    collected = deque()
    for p in message.walk(with_self=True):
        for h in _HEADERS:
            if h in p.headers:
                collected.append((h, p.headers[h]))
        if p.content_type.is_delivery_status():
            collected += _collect_headers_from_status(p.body)

    return MimeHeaders(collected)


def _collect_headers_from_status(body):
    out = deque()
    with closing(six.StringIO(body)) as stream:
        for i in range(3):
            out += parse_stream(stream)

    return out


def _get_status(headers):
    for v in headers.getall('Status'):
        if _RE_STATUS.match(v.strip()):
            return v

    return u''


def _get_notification(message):
    for part in message.walk():
        content_desc = part.headers.get('Content-Description', '').lower()
        if content_desc == 'notification':
            return part.body

    return u''