File: Base.py

package info (click to toggle)
pysrs 1.0.3-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 288 kB
  • sloc: python: 1,288; sh: 72; makefile: 23
file content (303 lines) | stat: -rw-r--r-- 10,984 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# $Log$
# Revision 1.4  2011/03/03 23:46:49  customdesigned
# Release 1.0
#
# Revision 1.3  2008/02/13 18:20:18  customdesigned
# Handle quoted localpart.
#
# Revision 1.2  2006/02/16 05:16:59  customdesigned
# Support SRS signing mode.
#
# Revision 1.1.1.2  2005/06/03 04:13:55  customdesigned
# Support sendmail socketmap
#
# Revision 1.3  2004/06/09 00:29:25  stuart
# Use hmac instead of straight sha
#
# Revision 1.2  2004/03/22 18:20:19  stuart
# Missing import
#
# Revision 1.1.1.1  2004/03/19 05:23:13  stuart
# Import to CVS
#
#
# AUTHOR
# Shevek
# CPAN ID: SHEVEK
# cpan@anarres.org
# http://www.anarres.org/projects/
#
# Translated to Python by stuart@bmsi.com
# http://bmsi.com/python/milter.html
#
# Portions Copyright (c) 2004 Shevek. All rights reserved.
# Portions Copyright (c) 2004 Business Management Systems. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Python itself.

from __future__ import print_function

import time
import hmac
try: from hashlib import sha1 as sha
except: import sha
import base64
import re
import SRS
import sys

BASE26 = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
BASE32 = BASE26 + '234567'
BASE64 = BASE26 + BASE26.lower() + '0123456789+/'

# We have two options. We can either encode an send date or an expiry
# date. If we encode a send date, we have the option of changing
# the expiry date later. If we encode an expiry date, we can send
# different expiry dates for different sources/targets, and we don't
# have to store them.

# Do NOT use BASE64 since the timestamp_check routine now explicit
# smashes case in the timestamp just in case there was a problem.

BASE = BASE32
# This checks for more than one bit set in the size.
# i.e. is the size a power of 2?
base = len(BASE)
if base & (base - 1):
  raise ValueError("Invalid base array of size %d" % base)
PRECISION = 60 * 60 * 24	# One day
TICKSLOTS = base * base	# Two chars

def parse_addr(sender):
    quotes = ''
    try:
      pos = sender.rindex('@')
      senduser = sender[:pos]
      sendhost = sender[pos+1:]
      if senduser.startswith('"') and senduser.endswith('"'):
        senduser = senduser[1:-1]
        quotes = '"'
    except ValueError:
      raise ValueError("Sender '%s' must contain exactly one @" % sender)
    return quotes,senduser,sendhost

class Base(object):
  def __init__(self,secret=None,maxage=SRS.SRSMAXAGE,
        hashlength=SRS.SRSHASHLENGTH,
        hashmin=None,separator='=',alwaysrewrite=False,ignoretimestamp=False,
        allowunsafesrs=False):
    if type(secret) == str:
      self.secret = (secret,)
    else:
      self.secret = secret
    self.maxage = maxage
    self.hashlength =hashlength
    if hashmin: self.hashmin = hashmin
    else: self.hashmin = hashlength
    self.separator = separator
    if not separator in ('-','+','='):
      raise ValueError('separator must be = - or +, not %s' % separator)
    self.alwaysrewrite = alwaysrewrite
    self.ignoretimestamp = ignoretimestamp
    self.allowunsafesrs = allowunsafesrs
    self.srs0re = re.compile(r'^%s[-+=]' % SRS.SRS0TAG,re.IGNORECASE)
    self.srs1re = re.compile(r'^%s[-+=]' % SRS.SRS1TAG,re.IGNORECASE)
    #self.ses0re = re.compile(r'^%s[-+=]' % SRS.SES0TAG,re.IGNORECASE)

  def warn(self,*msg):
    print('WARNING: ',' '.join(msg), file=sys.stderr)

  def sign(self,sender):
    """srsaddress = srs.sign(sender)

Map a sender address into the same sender and a cryptographic cookie.
Returns an SRS address to use for preventing bounce abuse.

There are alternative subclasses, some of which will return SRS
compliant addresses, some will simply return non-SRS but valid RFC821
addresses. """
    quotes,senduser,sendhost = parse_addr(sender)
    # Subclasses may override the compile() method.
    srsdata = self.compile(sendhost,senduser,srshost=sendhost)
    return '%s%s%s@%s' % (quotes,srsdata,quotes,sendhost)

  def forward(self,sender,alias,sign=False):
    """srsaddress = srs.forward(sender, alias)

Map a sender address into a new sender and a cryptographic cookie.
Returns an SRS address to use as the new sender.

There are alternative subclasses, some of which will return SRS
compliant addresses, some will simply return non-SRS but valid RFC821
addresses. """

    quotes,senduser,sendhost = parse_addr(sender)

    # We don't require alias to be a full address, just a domain will do
    aliashost = alias.split('@')[-1]

    if aliashost.lower() == sendhost.lower() and not self.alwaysrewrite:
      return '%s%s%s@%s' % (quotes,senduser,quotes,sendhost)

    # Subclasses may override the compile() method.
    if sign:
      srsdata = self.compile(sendhost,senduser,srshost=aliashost)
    else:
      srsdata = self.compile(sendhost,senduser)
    return '%s%s%s@%s' % (quotes,srsdata,quotes,aliashost)

  def reverse(self,address):
    """sender = srs->reverse(srsaddress)

Reverse the mapping to get back the original address. Validates all
cryptographic and timestamp information. Returns the original sender
address. This method will die if the address cannot be reversed."""

    quotes,user,host = parse_addr(address)

    sendhost,senduser = self.parse(user,srshost=host)
    return '%s%s%s@%s' % (quotes,senduser,quotes,sendhost)

  def compile(self,sendhost,senduser):
    """srsdata = srs.compile(host,user)

This method, designed to be overridden by subclasses, takes as
parameters the original host and user and must compile a new username
for the SRS transformed address. It is expected that this new username
will be joined on SRS.SRSSEP, and will contain a hash generated from
self.hash_create(...), and possibly a timestamp generated by
self.timestamp_create()."""
    raise NotImplementedError()

  def parse(self,srsuser):
    """host,user = srs.parse(srsuser)

This method, designed to be overridden by subclasses, takes an
SRS-transformed username as an argument, and must reverse the
transformation produced by compile(). It is required to verify any
hash and timestamp in the parsed data, using self.hash_verify(hash,
...) and self->timestamp_check(timestamp)."""
    raise NotImplementedError()

  def timestamp_create(self,ts=None):
    """timestamp = srs.timestamp_create(time)

Return a two character timestamp representing 'today', or time if
given. time is a Unix timestamp (seconds since the aeon).

This Python function has been designed to be agnostic as to base,
and in practice, base32 is used since it can be reversed even if a
remote MTA smashes case (in violation of RFC2821 section 2.4). The
agnosticism means that the Python uses division instead of rightshift,
but in Python that doesn't matter. C implementors should implement this
operation as a right shift by 5."""
    if not ts:
      ts = time.time()
    # Since we only mask in the bottom few bits anyway, we
    # don't need to take this modulo anything (e.g. @BASE^2).
    ts = int(ts // PRECISION)
    # print "Time is $time\n";
    mask = base - 1
    out = BASE[ts & mask]
    ts //= base	# Use right shift.
    return BASE[ts & mask]+out

  def timestamp_check(self,timestamp):
    """srs.timestamp_check(timestamp)

Return True if a timestamp is valid, False otherwise. There are 4096
possible timestamps, used in a cycle. At any time, $srs->{MaxAge}
timestamps in this cycle are valid, the last one being today. A
timestamp from the future is not valid, neither is a timestamp from
too far into the past. Of course if you go far enough into the future,
the cycle wraps around, and there are valid timestamps again, but the
likelihood of a random timestamp being valid is 4096/$srs->{MaxAge},
which is usually quite small: 1 in 132 by default."""
    if self.ignoretimestamp: return True
    ts = 0
    for d in timestamp.upper():	# LOOK OUT - USE BASE32
      ts = ts * base + BASE.find(d)
    now = (time.time() // PRECISION) % TICKSLOTS
    # print "Time is %d, Now is %d" % (ts,now)
    while now < ts: now += TICKSLOTS
    if now <= ts + self.maxage: return True
    return False

  def time_check(self,ts):
    """srs.time_check(time)

Similar to srs.timestamp_check(timestamp), but takes a Unix time, and
checks that an alias created at that Unix time is still valid. This is
designed for use by subclasses with storage backends."""
    return time.time() <= (ts + (self.maxage * PRECISION))

  def hash_create(self,*data):
    """srs.hash_create(data,...)

Returns a cryptographic hash of all data in data. Any piece of data
encoded into an address which must remain inviolate should be hashed,
so that when the address is reversed, we can check that this data has
not been tampered with. You must provide at least one piece of data
to this method (otherwise this system is both cryptographically weak
and there may be collision problems with sender addresses)."""

    secret = self.get_secret()
    assert secret, "Cannot create a cryptographic MAC without a secret"
    h = hmac.new(secret[0].encode(),b'',sha)
    for i in data:
      h.update(i.lower())
    hash = base64.encodestring(h.digest())
    return hash[:self.hashlength]

  def hash_verify(self,hash,*data):
    """srs.hash_verify(hash,data,...)

Verify that data has not been tampered with, given the cryptographic
hash previously output by srs->hash_create(). Returns True or False.
All known secrets are tried in order to see if the hash was created
with an old secret."""

    if len(hash) < self.hashmin: return False
    secret = self.get_secret()
    assert secret, "Cannot create a cryptographic MAC without a secret"
    hashes = []
    for s in secret:
      h = hmac.new(s.encode(),b'',sha)
      for i in data:
        h.update(i.lower())
      valid = base64.encodestring(h.digest())[:len(hash)]
      # We test all case sensitive matches before case insensitive
      # matches. While the risk of a case insensitive collision is
      # quite low, we might as well be careful.
      if valid == hash: return True
      hashes.append(valid)	# lowercase it later
    hash = hash.lower()
    for h in hashes:
      if hash == h.lower():
        self.warn("""SRS: Case insensitive hash match detected.
Someone smashed case in the local-part.""")
        return True
    return False;

  def set_secret(self,*args):
    """srs.set_secret(new,old,...)

Add a new secret to the rewriter. When an address is returned, all
secrets are tried to see if the hash can be validated. Don't use "foo",
"secret", "password", "10downing", "god" or "wednesday" as your secret."""
    self.secret = args

  def get_secret(self):
    "Return the list of secrets. These are secret. Don't publish them."
    return self.secret

  def separator(self):
    """srs.separator()

Return the initial separator, which follows the SRS tag. This is only
used as the initial separator, for the convenience of administrators
who wish to make srs0 and srs1 users on their mail servers and require
to use + or - as the user delimiter. All other separators in the SRS
address must be C<=>."""
    return self.separator