1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
from collections import deque
from contextlib import closing
import attr
import regex as re
import six
from attr.validators import instance_of
from six.moves import range
from flanker.mime.message.headers import MimeHeaders
from flanker.mime.message.headers.parsing import parse_stream
_HEADERS = ('Action',
'Content-Description',
'Diagnostic-Code',
'Final-Recipient',
'Received',
'Remote-Mta',
'Reporting-Mta',
'Status')
_RE_STATUS = re.compile(r'\d\.\d+\.\d+', re.IGNORECASE)
@attr.s(frozen=True)
class Result(object):
score = attr.ib(validator=instance_of(float))
status = attr.ib(validator=instance_of(six.text_type))
diagnostic_code = attr.ib(validator=instance_of(six.text_type))
notification = attr.ib(validator=instance_of(six.text_type))
def is_bounce(self, probability=0.3):
return self.score > probability
def detect(message):
headers = _collect_headers(message)
return Result(score=len(headers) / float(len(_HEADERS)),
status=_get_status(headers),
diagnostic_code=headers.get('Diagnostic-Code', u''),
notification=_get_notification(message))
def _collect_headers(message):
collected = deque()
for p in message.walk(with_self=True):
for h in _HEADERS:
if h in p.headers:
collected.append((h, p.headers[h]))
if p.content_type.is_delivery_status():
collected += _collect_headers_from_status(p.body)
return MimeHeaders(collected)
def _collect_headers_from_status(body):
out = deque()
with closing(six.StringIO(body)) as stream:
for i in range(3):
out += parse_stream(stream)
return out
def _get_status(headers):
for v in headers.getall('Status'):
if _RE_STATUS.match(v.strip()):
return v
return u''
def _get_notification(message):
for part in message.walk():
content_desc = part.headers.get('Content-Description', '').lower()
if content_desc == 'notification':
return part.body
return u''
|