1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
|
#!/usr/bin/env python3
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Simple IMAP4 client which displays the subjects of all messages in a
particular mailbox.
"""
import sys
from os import linesep as delimiter
from twisted.internet import defer, endpoints, protocol, reactor, ssl, stdio
from twisted.mail import imap4
from twisted.protocols import basic
from twisted.python import log, util
class TrivialPrompter(basic.LineReceiver):
delimiter = delimiter.encode("utf-8")
promptDeferred = None
def prompt(self, msg):
assert self.promptDeferred is None
self.display(msg)
self.promptDeferred = defer.Deferred()
return self.promptDeferred
def display(self, msg):
self.transport.write(msg.encode("utf-8"))
def lineReceived(self, line):
if self.promptDeferred is None:
return
d, self.promptDeferred = self.promptDeferred, None
d.callback(line.decode("utf-8"))
class SimpleIMAP4Client(imap4.IMAP4Client):
"""
A client with callbacks for greeting messages from an IMAP server.
"""
greetDeferred = None
def serverGreeting(self, caps):
self.serverCapabilities = caps
if self.greetDeferred is not None:
d, self.greetDeferred = self.greetDeferred, None
d.callback(self)
class SimpleIMAP4ClientFactory(protocol.ClientFactory):
usedUp = False
protocol = SimpleIMAP4Client
def __init__(self, username, onConn):
self.username = username
self.onConn = onConn
def buildProtocol(self, addr):
"""
Initiate the protocol instance. Since we are building a simple IMAP
client, we don't bother checking what capabilities the server has. We
just add all the authenticators twisted.mail has. Note: Gmail no
longer uses any of the methods below, it's been using XOAUTH since
2010.
"""
assert not self.usedUp
self.usedUp = True
p = self.protocol()
p.factory = self
p.greetDeferred = self.onConn
p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
p.registerAuthenticator(imap4.CramMD5ClientAuthenticator(self.username))
return p
def clientConnectionFailed(self, connector, reason):
d, self.onConn = self.onConn, None
d.errback(reason)
def cbServerGreeting(proto, username, password):
"""
Initial callback - invoked after the server sends us its greet message.
"""
# Hook up stdio
tp = TrivialPrompter()
stdio.StandardIO(tp)
# And make it easily accessible
proto.prompt = tp.prompt
proto.display = tp.display
# Try to authenticate securely
return (
proto.authenticate(password)
.addCallback(cbAuthentication, proto)
.addErrback(ebAuthentication, proto, username, password)
)
def ebConnection(reason):
"""
Fallback error-handler. If anything goes wrong, log it and quit.
"""
log.startLogging(sys.stdout)
log.err(reason)
return reason
def cbAuthentication(result, proto):
"""
Callback after authentication has succeeded.
Lists a bunch of mailboxes.
"""
return proto.list("", "*").addCallback(cbMailboxList, proto)
def ebAuthentication(failure, proto, username, password):
"""
Errback invoked when authentication fails.
If it failed because no SASL mechanisms match, offer the user the choice
of logging in insecurely.
If you are trying to connect to your Gmail account, you will be here!
"""
failure.trap(imap4.NoSupportedAuthentication)
return proto.prompt(
"No secure authentication available. Login insecurely? (y/N) "
).addCallback(cbInsecureLogin, proto, username, password)
def cbInsecureLogin(result, proto, username, password):
"""
Callback for "insecure-login" prompt.
"""
if result.lower() == "y":
# If they said yes, do it.
return proto.login(username, password).addCallback(cbAuthentication, proto)
return defer.fail(Exception("Login failed for security reasons."))
def cbMailboxList(result, proto):
"""
Callback invoked when a list of mailboxes has been retrieved.
"""
result = [e[2] for e in result]
s = "\n".join(["%d. %s" % (n + 1, m) for (n, m) in zip(range(len(result)), result)])
if not s:
return defer.fail(Exception("No mailboxes exist on server!"))
return proto.prompt(s + "\nWhich mailbox? [1] ").addCallback(
cbPickMailbox, proto, result
)
def cbPickMailbox(result, proto, mboxes):
"""
When the user selects a mailbox, "examine" it.
"""
mbox = mboxes[int(result or "1") - 1]
return proto.examine(mbox).addCallback(cbExamineMbox, proto)
def cbExamineMbox(result, proto):
"""
Callback invoked when examine command completes.
Retrieve the subject header of every message in the mailbox.
"""
return proto.fetchSpecific(
"1:*",
headerType="HEADER.FIELDS",
headerArgs=["SUBJECT"],
).addCallback(cbFetch, proto)
def cbFetch(result, proto):
"""
Finally, display headers.
"""
if result:
keys = sorted(result)
for k in keys:
proto.display(f"{k} {result[k][0][2]}")
else:
print("Hey, an empty mailbox!")
return proto.logout()
def cbClose(result):
"""
Close the connection when we finish everything.
"""
reactor.stop()
def main():
hostname = input("IMAP4 Server Hostname: ")
port = input("IMAP4 Server Port (the default is 143, 993 uses SSL): ") or "143"
port = int(port)
# Usernames are bytes.
username = input("IMAP4 Username: ").encode("ascii")
# Passwords are bytes.
password = util.getPassword("IMAP4 Password: ").encode("ascii")
onConn = (
defer.Deferred()
.addCallback(cbServerGreeting, username, password)
.addErrback(ebConnection)
.addBoth(cbClose)
)
factory = SimpleIMAP4ClientFactory(username, onConn)
endpoint = endpoints.HostnameEndpoint(reactor, hostname, port)
if port == 993:
contextFactory = ssl.optionsForClientTLS(
hostname=hostname,
)
endpoint = endpoints.wrapClientTLS(contextFactory, endpoint)
endpoint.connect(factory)
reactor.run()
if __name__ == "__main__":
main()
|