SMTP Client for Tornado

Was looking around for a mail handler for Tornado, found it pretty amazing that it didn’t exist.  Since I’ve only written on commercial SMTP server and have been playing with just about everything in Tornado Web at this point I figured it wasn’t too much work to whip one out.  So, instead of using annoying little threads, here’s a fully async smtp client for Tornado.

Nothing fancy, just gets the job done.

Also available as a GitHub gist – https://gist.github.com/1358253

  1from tornado import ioloop
  2from tornado import iostream
  3import socket
  4
  5class Envelope(object):
  6    def __init__(self, sender, rcpt, body, callback):
  7        self.sender = sender
  8        self.rcpt   = rcpt[:]
  9        self.body   = body
 10        self.callback = callback
 11
 12class SMTPClient(object):
 13    CLOSED = -2
 14    CONNECTED = -1
 15    IDLE = 0
 16    EHLO = 1
 17    MAIL = 2
 18    RCPT = 3
 19    DATA = 4
 20    DATA_DONE = 5
 21    QUIT = 6
 22
 23    def __init__(self, host='localhost', port=25):
 24        self.host = host
 25        self.port = port
 26        self.msgs = []
 27        self.stream = None
 28        self.state = self.CLOSED
 29
 30    def send_message(self, msg, callback=None):
 31        """Message is a django style EmailMessage object"""
 32
 33        if not msg:
 34            return
 35
 36        self.msgs.append(Envelope(msg.from_email, msg.recipients(), msg.message().as_string(), callback))
 37
 38        self.begin()
 39
 40    def send(self, sender=None, rcpt=[], body="", callback=None):
 41        """Very simple sender, just take the necessary parameters to create an envelope"""
 42        self.msgs.append(Envelope(sender, rcpt, body, callback))
 43
 44        self.begin()
 45
 46    def begin(self):
 47        """Start the sending of a message, if we need a connection open it"""
 48        if not self.stream:
 49            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
 50            self.stream = iostream.IOStream(s)
 51            self.stream.connect((self.host, self.port), self.connected)
 52        else:
 53            self.work_or_quit(self.process)
 54
 55    def work_or_quit(self, callback=None):
 56        """
 57           callback is provided, for the startup case where we're not in the main processing loop
 58        """
 59        if self.state == self.IDLE:
 60            if self.msgs:
 61                self.state = self.MAIL
 62                self.stream.write('MAIL FROM: < %s>\r\n' % self.msgs[0].sender)
 63            else:
 64                self.state = self.QUIT
 65                self.stream.write('QUIT\r\n')
 66            if callback:
 67                self.stream.read_until('\r\n', callback)
 68
 69    def connected(self):
 70        """Socket connect callback"""
 71        self.state = self.CONNECTED
 72        self.stream.read_until('\r\n', self.process)
 73
 74    def process(self, data):
 75        # print self.state, data,
 76        code = int(data[0:3])
 77        if data[3] not in (' ', '\r', '\n'):
 78            self.stream.read_until('\r\n', self.process)
 79            return
 80
 81        if self.state == self.CONNECTED:
 82            if not 200 <= code < 300:
 83                return self.error("Unexpected status %d from CONNECT: %s" % (code, data.strip()))
 84            self.state = self.EHLO
 85            self.stream.write('EHLO localhost\r\n')
 86        elif self.state == self.EHLO:
 87            if not 200 <= code < 300:
 88                return self.error("Unexpected status %d from EHLO: %s" % (code, data.strip()))
 89            self.state = self.IDLE
 90            self.work_or_quit()
 91        elif self.state == self.MAIL:
 92            if not 200 <= code < 300:
 93                return self.error("Unexpected status %d from MAIL: %s" % (code, data.strip()))
 94            if self.msgs[0].rcpt:
 95                self.stream.write('RCPT TO: <%s>\r\n' % self.msgs[0].rcpt.pop(0))
 96            self.state = self.RCPT
 97        elif self.state == self.RCPT:
 98            if not 200 <= code < 300:
 99                return self.error("Unexpected status %d from RCPT: %s" % (code, data.strip()))
100            if self.msgs[0].rcpt:
101                self.stream.write('RCPT TO: <%s>;\r\n' % self.msgs[0].rcpt.pop(0))
102            else:
103                self.stream.write('DATA\r\n')
104                self.state = self.DATA
105        elif self.state == self.DATA:
106            if code not in (354,) :
107                return self.error("Unexpected status %d from DATA: %s" % (code, data.strip()))
108            self.stream.write(self.msgs[0].body)
109            if self.msgs[0].body[-2:] != '\r\n':
110                self.stream.write('\r\n')
111            self.stream.write('.\r\n')
112            self.state = self.DATA_DONE
113        elif self.state == self.DATA_DONE:
114            if not 200 <= code < 300:
115                return self.error("Unexpected status %d from DATA END: %s" % (code, data.strip()))
116            if self.msgs[0].callback:
117                self.msgs[0].callback(True)
118
119            self.msgs.pop(0)
120
121            self.state = self.IDLE
122            self.work_or_quit()
123        elif self.state == self.QUIT:
124            if not 200 <= code < 300:
125                return self.error("Unexpected status %d from QUIT: %s" % (code, data.strip()))
126            self.close()
127
128        if self.stream:
129            self.stream.read_until('\r\n', self.process)
130
131    def error(self, msg):
132        self.close()
133
134    def close(self):
135        for msg in self.msgs:
136            if msg.callback:
137                msg.callback(False)
138        self.stream.close()
139        self.stream = None
140        self.state = self.CLOSED
141
142if __name__ == '__main__':
143    client = SMTPClient('localhost', 25)
144    body = """Subject: Testing
145
146Just a test
147    """
148    client.send('foo@example.com', ['recipient@example.com'], body)
149    ioloop.IOLoop.instance().start()