Async life and twitter

The project of the week, is something that I’ve been putting off for a very long time.  Which is to get something running on Extra that’s more than just a nothing site.  Part of the problem is that it’s a good domain name that I’ve had parked for a very long time, and it makes real $$ in parking revenue, which I would rather not endanger.

FYI — The real purpose of this post is to document the code fragment at the bottom…  Though for those other readers, take a look at how I’ve played around with some attributes of twitter feeds on extra.com [discussion threading and tagging].

One of the many ideas that I’ve had is to basically build a celeb following website, well after last week and building Notewave as a demonstration that it is possible to build an async chat style site.  My thoughts got bigger, so here’s what I needed:

  • Tornado for the async webserver, already build the django to tornado connector previously.
  • Twitter stream reader, which I have a few laying around.  Though they’re all built with the twisted framework, but while I’ve got a bunch wan’t to get out of the NIH habit so ended up using this twitter+twisted on github.
  • We’ll skip over all of the OAuth pain for some other twitter usages.

The original implementation had one process reading the twitter stream and then doing an HTTP post to the webserver to notify it that it had received a post from twitter.  This was nice, but I was now getting reports of 700K web requests, which was making my logs big and the ability to figure out if anybody was using the service just about impossible (ok, yes Google Analytics is there).  So, this mornings 5am project was to get AMQP back into the running.

I’ve used AMQP before — wrote a full scale web crawler that had a few components that utilized AMQP as the message system (it was actually AMQP + Thrift).  It worked and message passing systems are really very sweet to work with.  The challenge in this is that I had two different async frameworks (Twisted and Tornado) that I needed to get AMQP integrated with.

The Twisted one was pretty easy — there’s txAMQP which is “ok”, I’ve got a wrapper around it from my webcrawler that actually makes it easy to use.  The Tornado one was a bit more difficult, the challenge was that there is an AMQP + async python implementation, but it didn’t support Tornado as the server.  So, off to dig around through mailing lists, and other sources..

Finally found what I wanted with was AMPQ+Tornado+Pika as a unofficial port…  This worked great, except the documentation is so lacking!  Which really brings us to the point of this posting…  The quick integration for this project.

# Tornado listener
from pika.tornado_adapter import TornadoConnection
import pika
import json

class Handler(object) :
    def __init__(self, amqp=None, channel=None) :
        self.amqp = amqp
        self.channel = channel

    def startup(self) :
        channel = self.amqp.channel()
        channel.queue_declare(queue='extra_ui', durable=False, exclusive=False, auto_delete=True)
        channel.queue_bind(queue='extra_ui', exchange='celeb.tweet_ids')

        channel.basic_consume(self.recv, 'extra_ui')

    def recv(self, channel, method, header, body) :
        v = json.loads(body)

        from views.chat import post_notify
        post_notify(v['id'])

        channel.basic_ack(delivery_tag=method.delivery_tag)

def init() :
    handler = Handler()

    amqp = TornadoConnection(pika.ConnectionParameters('localhost',
    heartbeat = 10,
    credentials = pika.PlainCredentials('guest', 'guest')),
    callback=handler.startup)

    handler.amqp = amqp

Doing my quick blogpost code review, says that I could have done things much better…  Moved the ConnectionEstablishment into the Handler class, etc, etc.  But at the time I was more interested in getting things working at now 6:30am…  The big things I found was that the queue bits needed to be in the callback after the connection was established, otherwise RabbitMQ dropped things on the floor for out of order reasons.