File: test.py

package info (click to toggle)
pymilter 1.0.6-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,100 kB
  • sloc: python: 3,371; ansic: 1,333; makefile: 34; sh: 8
file content (247 lines) | stat: -rw-r--r-- 7,772 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
## @package Milter.test
# A test framework for milters

from __future__ import print_function
import mime
try:
  from io import BytesIO
except:
  from StringIO import StringIO as BytesIO
import Milter

Milter.NOREPLY = Milter.CONTINUE

## Test mixin for unit testing %milter applications.
# This mixin overrides many Milter.MilterBase methods
# with stub versions that simply record what was done.
# @deprecated Use Milter.test.TestCtx
# @since 0.9.8
class TestBase(object):

  def __init__(self,logfile='test/milter.log'):
    self._protocol = 0
    self.logfp = open(logfile,"a")
    ## The MAIL FROM for the current email being fed to the %milter
    self._sender = None
    ## List of recipients deleted
    self._delrcpt = []
    ## List of recipients added
    self._addrcpt = []
    ## Macros defined
    self._macros = { }
    ## The message body.
    self._body = None
    ## True if the %milter replaced the message body.
    self._bodyreplaced = False
    ## True if the %milter changed any headers.
    self._headerschanged = False
    ## True if the %milter changed the envelope from.
    self._envfromchanged = False
    ## Reply codes and messages set by the %milter
    self._reply = None
    ## The rfc822 message object for the current email being fed to the %milter.
    self._msg = None
    ## The protocol stage for macros returned
    self._stage = None
    ## The macros returned by protocol stage
    self._symlist = [ None, None, None, None, None, None, None ]

  def _close(self):
    if self.logfp:
      self.logfp.close()
      self.logfp = None

  def log(self,*msg):
    for i in msg: print(i,file=self.logfp,end=None)
    print(file=self.logfp)

  ## Set a macro value.
  # These are retrieved by the %milter with getsymval.
  # @param name the macro name, as passed to getsymval
  # @param val the macro value
  def setsymval(self,name,val):
    self._macros[name] = val
    
  def getsymval(self,name):
    stage = self._stage
    if stage is not None and stage >= 0:
      syms = self._symlist[stage]
      if syms is not None and name not in syms:
        return None
    return self._macros.get(name,None)

  def replacebody(self,chunk):
    if self._body:
      self._body.write(chunk)
      self._bodyreplaced = True
    else:
      raise IOError("replacebody not called from eom()")

  def chgfrom(self,sender,params=None):
    if not self._body:
      raise IOError("chgfrom not called from eom()")
    self.log('chgfrom: sender=%s' % (sender))
    self._envfromchanged = True
    self._sender = sender

  # TODO: write implement quarantine()
  def quarantine(self,reason):
    raise NotImplemented

  # TODO: measure time between milter calls
  def progress(self):
    pass

  # FIXME: rfc822 indexing does not really reflect the way chg/add header
  # work for a %milter
  def chgheader(self,field,idx,value):
    if not self._body:
      raise IOError("chgheader not called from eom()")
    self.log('chgheader: %s[%d]=%s' % (field,idx,value))
    if value == '':
      del self._msg[field]
    else:
      self._msg[field] = value
    self._headerschanged = True

  def addheader(self,field,value,idx=-1):
    if not self._body:
      raise IOError("addheader not called from eom()")
    self.log('addheader: %s=%s' % (field,value))
    self._msg[field] = value
    self._headerschanged = True

  def delrcpt(self,rcpt):
    if not self._body:
      raise IOError("delrcpt not called from eom()")
    self._delrcpt.append(rcpt)

  def addrcpt(self,rcpt):
    if not self._body:
      raise IOError("addrcpt not called from eom()")
    self._addrcpt.append(rcpt)

  ## Save the reply codes and messages in self._reply.
  def setreply(self,rcode,xcode,*msg):
    self._reply = (rcode,xcode) + msg

  def setsymlist(self,stage,macros):
    if not self._actions & Milter.SETSYMLIST:
      raise DisabledAction("SETSYMLIST")
    if self._stage != -1:
      raise RuntimeError("setsymlist may only be called from negotiate")
    # not used yet, but just for grins we save the data
    a = []
    for m in macros:
      try:
        m = m.encode('utf8')
      except: pass
      try:
        m = m.split(b' ')
      except: pass
      a += m
    if len(a) > 5:
      raise ValueError('setsymlist limited to 5 macros by MTA')
    if self._symlist[stage] is not None:
      raise ValueError('setsymlist already called for stage:'+stage)
    print('setsymlist',stage,a)
    self._symlist[stage] = set(a)

  ## Feed a file like object to the %milter.  Calls envfrom, envrcpt for
  # each recipient, header for each header field, body for each body 
  # block, and finally eom.  A return code from the %milter other than
  # CONTINUE returns immediately with that return code.
  #
  # This is a convenience method, a test could invoke the callbacks
  # in sequence on its own - and for some complex tests, this may
  # be necessary.
  # @param fp the file with rfc2822 message stream
  # @param sender the MAIL FROM
  # @param rcpt RCPT TO - additional recipients may follow
  def feedFile(self,fp,sender="spam@adv.com",rcpt="victim@lamb.com",*rcpts):
    self._body = None
    self._bodyreplaced = False
    self._headerschanged = False
    self._reply = None
    self._sender = '<%s>'%sender
    msg = mime.message_from_file(fp)
    # envfrom
    self._stage = Milter.M_ENVFROM
    rc = self.envfrom(self._sender)
    self._stage = None
    if rc != Milter.CONTINUE: return rc
    # envrcpt
    for rcpt in (rcpt,) + rcpts:
      self._stage = Milter.M_ENVRCPT
      rc = self.envrcpt('<%s>'%rcpt)
      self._stage = None
      if rc != Milter.CONTINUE: return rc
    # data
    self._stage = Milter.M_DATA
    rc = self.data()
    self._stage = None
    if rc != Milter.CONTINUE: return rc
    # header
    for h,val in msg.items():
      rc = self.header_bytes(h,val.encode())
      if rc != Milter.CONTINUE: return rc
    # eoh
    self._stage = Milter.M_EOH
    rc = self.eoh()
    self._stage = None
    if rc != Milter.CONTINUE: return rc
    # body
    header,body = msg.as_bytes().split(b'\n\n',1)
    bfp = BytesIO(body)
    while 1:
      buf = bfp.read(8192)
      if len(buf) == 0: break
      rc = self.body(buf)
      if rc != Milter.CONTINUE: return rc
    self._msg = msg
    self._body = BytesIO()
    self._stage = Milter.M_EOM
    rc = self.eom()
    self._stage = None
    if self._bodyreplaced:
      body = self._body.getvalue()
    self._body = BytesIO()
    self._body.write(header)
    self._body.write(b'\n\n')
    self._body.write(body)
    self.close()
    self._close()
    return rc

  ## Feed an email contained in a file to the %milter.
  # This is a convenience method that invokes @link #feedFile feedFile @endlink.
  # @param sender MAIL FROM
  # @param rcpts RCPT TO, multiple recipients may be supplied
  def feedMsg(self,fname,sender="spam@adv.com",*rcpts):
    with open('test/'+fname,'rb') as fp:
      return self.feedFile(fp,sender,*rcpts)

  ## Call the connect and helo callbacks.
  # The helo callback is not called if connect does not return CONTINUE.
  # @param host the hostname passed to the connect callback
  # @param helo the hostname passed to the helo callback
  # @param ip the IP address passed to the connect callback
  def connect(self,host='localhost',helo='spamrelay',ip='1.2.3.4'):
    self._body = None
    self._bodyreplaced = False
    self._setctx(None)
    opts = [ Milter.CURR_ACTS,~0,0,0 ]
    self._stage = -1
    rc = self.negotiate(opts)
    self._stage = Milter.M_CONNECT
    rc =  super(TestBase,self).connect(host,1,(ip,1234)) 
    if rc != Milter.CONTINUE:
      self._stage = None
      self.close()
      return rc
    self._stage = Milter.M_HELO
    rc = self.hello(helo)
    self._stage = None
    if rc != Milter.CONTINUE:
      self.close()
    return rc