File: unwind.py

package info (click to toggle)
mailcrypt 3.5.8-2
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 928 kB
  • ctags: 422
  • sloc: lisp: 4,439; python: 437; makefile: 208; sh: 170
file content (199 lines) | stat: -rwxr-xr-x 6,874 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#! /usr/bin/python

import GnuPGInterface
import string, re, sys

gnupg = GnuPGInterface.GnuPG()
gnupg.options.extra_args += ['--homedir', 'remailer-keys', '--batch']

# The keyring should hold private keys for all the remailers in the chain,
# and each one should have a passphrase equal to the name of the key.

def passphrase(bigname):
    return re.search(r'(\w+@\w+\.\w+)', bigname).group(1)

def unwind(recipient, message):
    """Unwind a message. Takes two parameters: the To: address of the
    message and the message body itself.

    This function will return a list of steps. Each element is a dict
    describing what happened on that particular step. The decryption and
    remailing are separate steps. The dict keys are as follows:

     recipient: the target of that step, as requested by the previous step
     encrypted: 1 if the 'Encrypted: PGP' flag was set, else non-existent
     anon-to: name in the Anon-To: header, if present
     subject: subject specified in a ## header, if present
     message: plaintext of the message (for the last step)

    Typical steps:
     recipient=rem1, encrypted=1  ->recurse BRANCH2
     recipient=rem1, anon-to=rem2  BRANCH3, recurse
     recipient=rem2, encrypted=1  ->recurse BRANCH2
     recipient=rem2, anon-to=user, subject=test  BRANCH3, recurse
     recipient=user, message=plaintext  : BRANCH1, terminate
 
    It throws an exception if anything about the message is incorrect:
     message doesn't start with ::\nEncrypted: PGP
     message isn't encrypted or is encrypted to the wrong key (not TO)
     decrypted message doesn't start with ::\nAnon-To:
    """
    # the encrypted message is handled by unwind1. It will look like:
    # ::\nEncrypted: PGP\n\n-----BEGIN PGP MESSAGE----\n etc

    # once decrypted, the message should look like:
    # ::\nAnon-To: dest@foo.com\n\n##\nSubject: test subject\n\nmessage body

    if 0:
        print "-----"
        print message
        print "-----"

    step = {}
    
    # see if we should end the recursion now
    if not re.search(r'^rem\d@test.test', recipient):
        # BRANCH 1
        # plaintext message
        step['recipient'] = recipient
        step['message'] = message
        return [step]
    message = string.split(message, "\n")
    if message[0] != '::':
        raise 'bad message', 'first line was not ::'
    if message[2] != '':
        raise 'bad message', 'third line was not blank'
    # see if it's encrypted
    if message[1] == 'Encrypted: PGP':
        # yes, decrypt it and recurse
        # BRANCH2
        # step is: recipient=rem1, encrypted=1
        crypttext = message[3:]
        assert(message[3] == '-----BEGIN PGP MESSAGE-----')
        # decrypt here
        gnupg.passphrase = passphrase(recipient) # passphrase == keyname
        p = gnupg.run(['--decrypt'],
                      create_fhs=['stdin', 'stdout'],
                      )
        p.handles['stdin'].write(string.join(crypttext,"\n"))
        p.handles['stdin'].close()
        plaintext = p.handles['stdout'].read()
        p.handles['stdout'].close()
        p.wait()
        step['recipient'] = recipient
        step['encrypted'] = 1
        return [step] + unwind(recipient, plaintext)

    # BRANCH3
    
    # message is now:
    #  ::\nAnon-To: remN@test.set (or dest@foo.com)\n\nmessage body
    # or
    #  ::\nAnon-To: dest@foo.com\n\n##\nSubject: subject\n\nmessage body
    
    r = re.search(r'^Anon-To: (.*)$', message[1])
    if not r:
        print "Bad Message, no Anon-To"
        print message
        raise 'bad message', "no anon-to"
    step['recipient'] = recipient
    step['anon-to'] = r.group(1)

    message = message[3:]
    # now: "message body", or "##\nSubject: sub\n\nmessage body"
    
    if message[0] == '##':
        # there are ## headers included
        r = re.search(r'^Subject: (.*)$', message[1])
        if not r:
            raise 'bad message', "## but no Subject: header"
        step['subject'] = r.group(1)
        if message[2] != '':
            raise 'bad message', "no blank line after ## headers"
        message = message[3:]
    # now: "message body"
    message = string.join(message, "\n")
    return [step] + unwind(step['anon-to'], message)

def insistEquals(one, two):
    if one != two:
        raise "results don't match expected", "'%s' != '%s'" % (one, two)


def test_chain(firsthop, crypttext, plaintext, recipient, chain, subject=None):
    """Verify that the crypttext message does indeed match the plaintext
    message, sent to a given recipient and encrypted to the given remailer
    chain."""

    # chain is a list of remailers with long names: rem1@test.test, etc

    # build up the list of what we expect to see at each step
    expected_chain = []
    for i in range(len(chain)-1):
        expected_chain.append({'recipient': chain[i],
                               'encrypted': 1,
                               })
        expected_chain.append({'recipient': chain[i],
                               'anon-to': chain[i+1],
                               })
    last = chain[len(chain)-1]
    expected_chain.append({'recipient': last,
                           'encrypted': 1,
                           })
    pentultimate = {'recipient': last,
                    'anon-to': recipient,
                    }
    if subject:
        pentultimate['subject'] = subject
    expected_chain.append(pentultimate)
    expected_chain.append({'recipient': recipient,
                           'message': plaintext,
                           })

    # unwind the messge
    insistEquals(firsthop, chain[0])
    results = unwind(firsthop, crypttext)

    # compare against expectations
    insistEquals(len(expected_chain), len(results))
    for i in range(len(results)):
        #print i, results[i], expected_chain[i]
        insistEquals(results[i], expected_chain[i])
    print "TEST CASE PASSED"
    


def test1():
    m1 = open("m1").read()
    chain = unwind("rem3@test.test", m1)
    for link in chain: print link

def test2():
    m1 = open("m1").read()
    chain = ["rem3", "rem2", "rem1", "rem1"]
    test_chain(firsthop="rem3@test.test", crypttext=m1,
               plaintext="test message\n",
               recipient="warner@lothar.com",
               chain=chain,
               subject="test subject")

def main():
    # argv is: ['recipient', 'chain1,chain2,chain3', 'subject']
    # plaintext is always "test message\n"
    # crypttext arrives on stdin
    recipient = sys.argv[1]
    chain = string.split(sys.argv[2], ',')
    subject = sys.argv[3]
    crypttext = sys.stdin.read()
    test_chain(firsthop=chain[0],
               crypttext=crypttext,
               plaintext="test message\n",
               recipient=recipient,
               chain=chain,
               subject=subject)
    
        
if __name__ == '__main__':
    main()