1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
#! /usr/bin/python
import GnuPGInterface
import string, re, sys
gnupg = GnuPGInterface.GnuPG()
gnupg.options.extra_args += ['--homedir', 'remailer-keys', '--batch']
# The keyring should hold private keys for all the remailers in the chain,
# and each one should have a passphrase equal to the name of the key.
def passphrase(bigname):
return re.search(r'(\w+@\w+\.\w+)', bigname).group(1)
def unwind(recipient, message):
"""Unwind a message. Takes two parameters: the To: address of the
message and the message body itself.
This function will return a list of steps. Each element is a dict
describing what happened on that particular step. The decryption and
remailing are separate steps. The dict keys are as follows:
recipient: the target of that step, as requested by the previous step
encrypted: 1 if the 'Encrypted: PGP' flag was set, else non-existent
anon-to: name in the Anon-To: header, if present
subject: subject specified in a ## header, if present
message: plaintext of the message (for the last step)
Typical steps:
recipient=rem1, encrypted=1 ->recurse BRANCH2
recipient=rem1, anon-to=rem2 BRANCH3, recurse
recipient=rem2, encrypted=1 ->recurse BRANCH2
recipient=rem2, anon-to=user, subject=test BRANCH3, recurse
recipient=user, message=plaintext : BRANCH1, terminate
It throws an exception if anything about the message is incorrect:
message doesn't start with ::\nEncrypted: PGP
message isn't encrypted or is encrypted to the wrong key (not TO)
decrypted message doesn't start with ::\nAnon-To:
"""
# the encrypted message is handled by unwind1. It will look like:
# ::\nEncrypted: PGP\n\n-----BEGIN PGP MESSAGE----\n etc
# once decrypted, the message should look like:
# ::\nAnon-To: dest@foo.com\n\n##\nSubject: test subject\n\nmessage body
if 0:
print "-----"
print message
print "-----"
step = {}
# see if we should end the recursion now
if not re.search(r'^rem\d@test.test', recipient):
# BRANCH 1
# plaintext message
step['recipient'] = recipient
step['message'] = message
return [step]
message = string.split(message, "\n")
if message[0] != '::':
raise 'bad message', 'first line was not ::'
if message[2] != '':
raise 'bad message', 'third line was not blank'
# see if it's encrypted
if message[1] == 'Encrypted: PGP':
# yes, decrypt it and recurse
# BRANCH2
# step is: recipient=rem1, encrypted=1
crypttext = message[3:]
assert(message[3] == '-----BEGIN PGP MESSAGE-----')
# decrypt here
gnupg.passphrase = passphrase(recipient) # passphrase == keyname
p = gnupg.run(['--decrypt'],
create_fhs=['stdin', 'stdout'],
)
p.handles['stdin'].write(string.join(crypttext,"\n"))
p.handles['stdin'].close()
plaintext = p.handles['stdout'].read()
p.handles['stdout'].close()
p.wait()
step['recipient'] = recipient
step['encrypted'] = 1
return [step] + unwind(recipient, plaintext)
# BRANCH3
# message is now:
# ::\nAnon-To: remN@test.set (or dest@foo.com)\n\nmessage body
# or
# ::\nAnon-To: dest@foo.com\n\n##\nSubject: subject\n\nmessage body
r = re.search(r'^Anon-To: (.*)$', message[1])
if not r:
print "Bad Message, no Anon-To"
print message
raise 'bad message', "no anon-to"
step['recipient'] = recipient
step['anon-to'] = r.group(1)
message = message[3:]
# now: "message body", or "##\nSubject: sub\n\nmessage body"
if message[0] == '##':
# there are ## headers included
r = re.search(r'^Subject: (.*)$', message[1])
if not r:
raise 'bad message', "## but no Subject: header"
step['subject'] = r.group(1)
if message[2] != '':
raise 'bad message', "no blank line after ## headers"
message = message[3:]
# now: "message body"
message = string.join(message, "\n")
return [step] + unwind(step['anon-to'], message)
def insistEquals(one, two):
if one != two:
raise "results don't match expected", "'%s' != '%s'" % (one, two)
def test_chain(firsthop, crypttext, plaintext, recipient, chain, subject=None):
"""Verify that the crypttext message does indeed match the plaintext
message, sent to a given recipient and encrypted to the given remailer
chain."""
# chain is a list of remailers with long names: rem1@test.test, etc
# build up the list of what we expect to see at each step
expected_chain = []
for i in range(len(chain)-1):
expected_chain.append({'recipient': chain[i],
'encrypted': 1,
})
expected_chain.append({'recipient': chain[i],
'anon-to': chain[i+1],
})
last = chain[len(chain)-1]
expected_chain.append({'recipient': last,
'encrypted': 1,
})
pentultimate = {'recipient': last,
'anon-to': recipient,
}
if subject:
pentultimate['subject'] = subject
expected_chain.append(pentultimate)
expected_chain.append({'recipient': recipient,
'message': plaintext,
})
# unwind the messge
insistEquals(firsthop, chain[0])
results = unwind(firsthop, crypttext)
# compare against expectations
insistEquals(len(expected_chain), len(results))
for i in range(len(results)):
#print i, results[i], expected_chain[i]
insistEquals(results[i], expected_chain[i])
print "TEST CASE PASSED"
def test1():
m1 = open("m1").read()
chain = unwind("rem3@test.test", m1)
for link in chain: print link
def test2():
m1 = open("m1").read()
chain = ["rem3", "rem2", "rem1", "rem1"]
test_chain(firsthop="rem3@test.test", crypttext=m1,
plaintext="test message\n",
recipient="warner@lothar.com",
chain=chain,
subject="test subject")
def main():
# argv is: ['recipient', 'chain1,chain2,chain3', 'subject']
# plaintext is always "test message\n"
# crypttext arrives on stdin
recipient = sys.argv[1]
chain = string.split(sys.argv[2], ',')
subject = sys.argv[3]
crypttext = sys.stdin.read()
test_chain(firsthop=chain[0],
crypttext=crypttext,
plaintext="test message\n",
recipient=recipient,
chain=chain,
subject=subject)
if __name__ == '__main__':
main()
|