File: policy.py

package info (click to toggle)
pymilter 1.0.6-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,100 kB
  • sloc: python: 3,371; ansic: 1,333; makefile: 34; sh: 8
file content (87 lines) | stat: -rw-r--r-- 2,396 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
try:
  try:
    from berkeleydb import db
  except:
    from bsddb3 import db
  class DB(object):
    def open(self,fname,mode):
      if mode == 'r': flags = db.DB_RDONLY
      else: raise RuntimeException('unsupported mode')
      self.f = db.DB()
      self.f.open(fname,flags=flags)
    def __contains__(self,key):
      return not not self.f.get(key)
    def __getitem__(self,key):
      v = self.f.get(key)
      if not v: raise KeyError(key)
      return v
    def close(self):
      self.f.close()
  def dbmopen(fname,mode):
    f = DB()
    f.open(fname,mode)
    return f
except ModuleNotFoundError: raise
except:
  import anydbm as dbm
  dbmopen = dbm.open

class MTAPolicy(object):
  "Get SPF policy by result from sendmail style access file."
  def __init__(self,sender,conf,access_file=None):
    if not access_file:
      access_file = conf.access_file
    self.use_nulls = conf.access_file_nulls
    try:
      self.use_colon = conf.access_file_colon
    except:
      self.use_colon = True
    self.sender = sender
    self.domain = sender.split('@')[-1].lower()
    self.acf = None
    self.access_file = access_file

  def close(self):
    if self.acf:
      self.acf.close()

  def __enter__(self): 
    self.acf = None
    if self.access_file:
      try:
        self.acf = dbmopen(self.access_file,'r')
      except:
        print('%s: Cannot open for reading'%self.access_file)
        raise
    return self
  def __exit__(self,t,v,b): self.close()

  def getPolicy(self,pfx):
    acf = self.acf
    if not acf: return None
    if self.use_nulls: sfx = b'\x00'
    else: sfx = b''
    if self.use_colon:
      sep = b':'
    else:
      sep = b'!'
    pfx = pfx.encode() + sep
    try:    # try with localpart@domain
      return acf[pfx + self.sender.encode() + sfx].rstrip(b'\x00').decode()
    except KeyError:
      try:  # try with domain
        d = self.domain.encode()
        k = pfx + d + sfx
        while not k in acf and b'.' in d:
          # check partial domains
          d = b'.'.join(d.split(b'.')[1:])
          k = pfx + b'.' + d + sfx
        return acf[k].rstrip(b'\x00').decode()
      except KeyError:
        try:    # try bare prefix
          return acf[pfx + sfx].rstrip(b'\x00').decode()
        except KeyError:
          try:
            return acf[pfx[:-1] + sfx].rstrip(b'\x00').decode()
          except KeyError:
            return None