Tagged with twisted

Async life and twitter

The project of the week, is something that I’ve been putting off for a very long time.  Which is to get something running on Extra that’s more than just a nothing site.  Part of the problem is that it’s a good domain name that I’ve had parked for a very long time, and it makes real $$ in parking revenue, which I would rather not endanger.

FYI — The real purpose of this post is to document the code fragment at the bottom…  Though for those other readers, take a look at how I’ve played around with some attributes of twitter feeds on extra.com [discussion threading and tagging].

One of the many ideas that I’ve had is to basically build a celeb following website, well after last week and building Notewave as a demonstration that it is possible to build an async chat style site.  My thoughts got bigger, so here’s what I needed:

  • Tornado for the async webserver, already build the django to tornado connector previously.
  • Twitter stream reader, which I have a few laying around.  Though they’re all built with the twisted framework, but while I’ve got a bunch wan’t to get out of the NIH habit so ended up using this twitter+twisted on github.
  • We’ll skip over all of the OAuth pain for some other twitter usages.

The original implementation had one process reading the twitter stream and then doing an HTTP post to the webserver to notify it that it had received a post from twitter.  This was nice, but I was now getting reports of 700K web requests, which was making my logs big and the ability to figure out if anybody was using the service just about impossible (ok, yes Google Analytics is there).  So, this mornings 5am project was to get AMQP back into the running.

I’ve used AMQP before — wrote a full scale web crawler that had a few components that utilized AMQP as the message system (it was actually AMQP + Thrift).  It worked and message passing systems are really very sweet to work with.  The challenge in this is that I had two different async frameworks (Twisted and Tornado) that I needed to get AMQP integrated with.

The Twisted one was pretty easy — there’s txAMQP which is “ok”, I’ve got a wrapper around it from my webcrawler that actually makes it easy to use.  The Tornado one was a bit more difficult, the challenge was that there is an AMQP + async python implementation, but it didn’t support Tornado as the server.  So, off to dig around through mailing lists, and other sources..

Finally found what I wanted with was AMPQ+Tornado+Pika as a unofficial port…  This worked great, except the documentation is so lacking!  Which really brings us to the point of this posting…  The quick integration for this project.

# Tornado listener
from pika.tornado_adapter import TornadoConnection
import pika
import json

class Handler(object) :
    def __init__(self, amqp=None, channel=None) :
        self.amqp    = amqp
        self.channel = channel

    def startup(self) :
        channel = self.amqp.channel()
        channel.queue_declare(queue='extra_ui', durable=False, exclusive=False, auto_delete=True)
        channel.queue_bind(queue='extra_ui', exchange='celeb.tweet_ids')

        channel.basic_consume(self.recv, 'extra_ui')

    def recv(self, channel, method, header, body) :
        v = json.loads(body)

        from views.chat import post_notify
        post_notify(v['id'])

        channel.basic_ack(delivery_tag=method.delivery_tag)

def init() :
    handler = Handler()

    amqp = TornadoConnection(pika.ConnectionParameters('localhost',
                                                       heartbeat = 10,
                                                       credentials = pika.PlainCredentials('guest', 'guest')),
                             callback=handler.startup)

    handler.amqp = amqp

Doing my quick blogpost code review, says that I could have done things much better…  Moved the ConnectionEstablishment into the Handler class, etc, etc.  But at the time I was more interested in getting things working at now 6:30am…  The big things I found was that the queue bits needed to be in the callback after the connection was established, otherwise RabbitMQ dropped things on the floor for out of order reasons.

Tagged , , ,

Twisted code review…

If you have a few minutes and speak python & twisted, it would be useful to have an extra set of eyes on this section of code. The basic idea of this is to be a reconnecting thrift client, such that I can just write simple client.function(a,b,c) calls without having to worry about if there is or isn’t a client and it will queue reconnect as needed.

from thrift.transport import TTwisted
from thrift.protocol import TBinaryProtocol
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet import defer, reactor
from twisted.python import failure
from collections import deque
from redback import log

class ClientBusy(Exception):
    pass

class ClientDead(Exception):
    pass

class InvalidThriftRequest(Exception):
    pass

class ManagedThriftRequest(object):
    def __init__(self, method, *args, **kw) :
        self.method = method
        self.args   = args
        self.kw     = kw

class ManagedClient(object) :
    def __init__(self, factory) :
        self.__factory = factory

    def _is_connected(self) :
        return self.__factory.is_connected()

    def __getattr__(self, name) :
        if hasattr(self.__factory.client_class, name) :
            def f(*args, **kw) :
                return self.__factory.pushRequest(ManagedThriftRequest(name, *args, **kw))
            return f
        raise InvalidThriftRequest("Cant find method: %s" % name)

class ManagedThriftClientProtocol(TTwisted.ThriftClientProtocol):
    def __init__(self, client_class, iprot_factory, oprot_factory=None):
        TTwisted.ThriftClientProtocol.__init__(self, client_class, iprot_factory, oprot_factory)
        self.client_class = client_class
        self.deferred = None
        self.alive    = False

    def connectionMade(self):
        log.debug(self, "Connection made to", "%s.%s" % (self.client_class.__module__, self.client_class.__name__), "[%s]:%s" % self.transport.addr)
        self.alive = True
        TTwisted.ThriftClientProtocol.connectionMade(self)
        self.client.protocol = self
        self.factory.clientIdle(self)

    def connectionLost(self, reason=None):
        log.debug(self, "Connection lost to", "%s.%s" % (self.client_class.__module__, self.client_class.__name__), "[%s]:%s" % self.transport.addr)
        self.alive = False
        try :
            TTwisted.ThriftClientProtocol.connectionLost(self, reason)
            self.factory.clientGone(self)
        except Exception, e :
            log.error(self.connectionLost, e)

    def _complete(self, res, request, dfd):
        self.deferred = None
        if isinstance(res, failure.Failure) and dfd :
            self.factory.pushRequest(request, dfd)
        else :
            if dfd :
                dfd.callback(res)
            self.factory.clientIdle(self)
        return res

    def submitRequest(self, request, dfd):
        if not self.alive :
            raise ClientBusy
        if not self.deferred :
            fun = getattr(self.client, request.method, None)
            if not fun:
                raise InvalidThriftRequest("No such method as : %s" % request.method)
            else :
                try :
                    d = fun(*request.args, **request.kw)
                except Exception, e :
                    log.error(self.submitRequest, "calling : ", request.method, e)
            self.deferred = d
            d.addBoth(self._complete, request, dfd)
            return d
        else:
            raise ClientBusy

class ManagedClientFactory(ReconnectingClientFactory):
    maxDelay        = 5
    thriftFactory   = TBinaryProtocol.TBinaryProtocolAcceleratedFactory
    protocol        = ManagedThriftClientProtocol
    submitLoopSleep = 0
    client_class    = None

    def __init__(self, client_class=None):
        self._stack        = deque()
        self._protos       = defer.DeferredQueue()
        self.deferred      = defer.Deferred()
        self.client_class  = client_class or self.client_class
        self.client        = ManagedClient(self)

    def _errback(self, reason=None):
        if self.deferred :
            self.deferred.errback(reason)
            self.deferred = None

    def _callback(self, value=None):
        if self.deferred :
            self.deferred.callback(value)
            self.deferred = None

    def clientConnectionFailed(self, connector, reason):
        ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
        self._errback(reason)

    def buildProtocol(self, addr) :
        self.resetDelay()
        p = self.protocol(self.client_class, self.thriftFactory())

        p.factory = self
        #self._protos.put(p)
        return p

    def clientIdle(self, proto) :
        self._callback(True)
        if proto.alive :
            self._protos.put(proto)

    def clientGone(self, proto):
        pass
        #if proto in self._protos :
        #    self._protos.remove(proto)

    def _protoErr(self, proto):
        pass
        #import traceback
        #traceback.print_stack()
        #print "IN PROTO ERR", proto

    def _protoReady(self, proto):
        if proto.deferred :
            log.msg(self._protoReady, "Proto currently active!")
            return
        if not proto.alive :
            log.msg(self._protoReady, "Proto currently dead!")
            return

        try:
            request, deferred = self._stack.popleft()
        except defer.QueueUnderflow :
            pass

        d = proto.submitRequest(request, deferred)
        return d

    def pushRequest(self, request, din=None) :
        d = din or defer.Deferred()
        self._stack.append((request, d))

        dfd = self._protos.get()
        dfd.addCallback(self._protoReady)
        dfd.addErrback(self._protoErr)
        return d

    def shutdown(self) :
        """Shutdown this factory"""

        self.stopTrying()
        for p in self._protos:
            if p.transport:
                p.transport.loseConnection()</pre>
Tagged , ,