File: testsample.py

package info (click to toggle)
pymilter 0.9.5-3
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 560 kB
  • sloc: python: 1,924; ansic: 1,238; makefile: 30; sh: 17
file content (150 lines) | stat: -rw-r--r-- 4,422 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import unittest
import Milter
import sample
import mime
import rfc822
import StringIO

class TestMilter(sample.sampleMilter):

  _protocol = 0
  def __init__(self):
    self.logfp = open("test/milter.log","a")

  def log(self,*msg):
    for i in msg: print >>self.logfp, i,
    print >>self.logfp

  def replacebody(self,chunk):
    if self._body:
      self._body.write(chunk)
      self.bodyreplaced = True
    else:
      raise IOError,"replacebody not called from eom()"

  # FIXME: rfc822 indexing does not really reflect the way chg/add header
  # work for a milter
  def chgheader(self,field,idx,value):
    self.log('chgheader: %s[%d]=%s' % (field,idx,value))
    if value == '':
      del self._msg[field]
    else:
      self._msg[field] = value
    self.headerschanged = True

  def addheader(self,field,value):
    self.log('addheader: %s=%s' % (field,value))
    self._msg[field] = value
    self.headerschanged = True

  def feedMsg(self,fname):
    self._body = None
    self.bodyreplaced = False
    self.headerschanged = 0
    fp = open('test/'+fname,'r')
    msg = rfc822.Message(fp)
    rc = self.envfrom('<spam@advertisements.com>')
    if rc != Milter.CONTINUE: return rc
    rc = self.envrcpt('<victim@lamb.com>')
    if rc != Milter.CONTINUE: return rc
    line = None
    for h in msg.headers:
      if h[:1].isspace():
	line = line + h
	continue
      if not line:
	line = h
	continue
      s = line.split(': ',1)
      rc = self.header(s[0],s[1].strip())
      if rc != Milter.CONTINUE: return rc
      line = h
    if line:
      s = line.split(': ',1)
      rc = self.header(s[0],s[1])
      if rc != Milter.CONTINUE: return rc
    rc = self.eoh()
    if rc != Milter.CONTINUE: return rc
    while 1:
      buf = fp.read(8192)
      if len(buf) == 0: break
      rc = self.body(buf)
      if rc != Milter.CONTINUE: return rc
    self._msg = msg
    self._body = StringIO.StringIO()
    rc = self.eom()
    if self.bodyreplaced:
      body = self._body.getvalue()
    else:
      msg.rewindbody()
      body = msg.fp.read()
    self._body = StringIO.StringIO()
    self._body.writelines(msg.headers)
    self._body.write('\n')
    self._body.write(body)
    return rc

  def connect(self,host='localhost'):
    self._body = None
    self.bodyreplaced = False
    rc =  sample.sampleMilter.connect(self,host,1,0) 
    if rc != Milter.CONTINUE and rc != Milter.ACCEPT:
      self.close()
      return rc
    rc = self.hello('spamrelay')
    if rc != Milter.CONTINUE:
      self.close()
    return rc

class BMSMilterTestCase(unittest.TestCase):

  def testDefang(self,fname='virus1'):
    milter = TestMilter()
    rc = milter.connect()
    self.failUnless(rc == Milter.CONTINUE)
    rc = milter.feedMsg(fname)
    self.failUnless(rc == Milter.ACCEPT)
    self.failUnless(milter.bodyreplaced,"Message body not replaced")
    fp = milter._body
    open('test/'+fname+".tstout","w").write(fp.getvalue())
    #self.failUnless(fp.getvalue() == open("test/virus1.out","r").read())
    fp.seek(0)
    msg = mime.message_from_file(fp)
    s = msg.get_payload(1).get_payload()
    milter.log(s)
    milter.close()

  def testParse(self,fname='spam7'):
    milter = TestMilter()
    milter.connect('somehost')
    rc = milter.feedMsg(fname)
    self.failUnless(rc == Milter.ACCEPT)
    self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
    fp = milter._body
    open('test/'+fname+".tstout","w").write(fp.getvalue())
    milter.close()

  def testDefang2(self):
    milter = TestMilter()
    milter.connect('somehost')
    rc = milter.feedMsg('samp1')
    self.failUnless(rc == Milter.ACCEPT)
    self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
    rc = milter.feedMsg("virus3")
    self.failUnless(rc == Milter.ACCEPT)
    self.failUnless(milter.bodyreplaced,"Message body not replaced")
    fp = milter._body
    open("test/virus3.tstout","w").write(fp.getvalue())
    #self.failUnless(fp.getvalue() == open("test/virus3.out","r").read())
    rc = milter.feedMsg("virus6")
    self.failUnless(rc == Milter.ACCEPT)
    self.failUnless(milter.bodyreplaced,"Message body not replaced")
    self.failUnless(milter.headerschanged,"Message headers not adjusted")
    fp = milter._body
    open("test/virus6.tstout","w").write(fp.getvalue())
    milter.close()

def suite(): return unittest.makeSuite(BMSMilterTestCase,'test')

if __name__ == '__main__':
  unittest.main()