If you have a few minutes and speak python & twisted, it would be useful to have an extra set of eyes on this section of code. The basic idea of this is to be a reconnecting thrift client, such that I can just write simple client.function(a,b,c) calls without having to worry about if there is or isn’t a client and it will queue reconnect as needed.
from thrift.transport import TTwisted
from thrift.protocol import TBinaryProtocol
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet import defer, reactor
from twisted.python import failure
from collections import deque
from redback import log
class ClientBusy(Exception):
pass
class ClientDead(Exception):
pass
class InvalidThriftRequest(Exception):
pass
class ManagedThriftRequest(object):
def __init__(self, method, *args, **kw) :
self.method = method
self.args = args
self.kw = kw
class ManagedClient(object) :
def __init__(self, factory) :
self.__factory = factory
def _is_connected(self) :
return self.__factory.is_connected()
def __getattr__(self, name) :
if hasattr(self.__factory.client_class, name) :
def f(*args, **kw) :
return self.__factory.pushRequest(ManagedThriftRequest(name, *args, **kw))
return f
raise InvalidThriftRequest("Cant find method: %s" % name)
class ManagedThriftClientProtocol(TTwisted.ThriftClientProtocol):
def __init__(self, client_class, iprot_factory, oprot_factory=None):
TTwisted.ThriftClientProtocol.__init__(self, client_class, iprot_factory, oprot_factory)
self.client_class = client_class
self.deferred = None
self.alive = False
def connectionMade(self):
log.debug(self, "Connection made to", "%s.%s" % (self.client_class.__module__, self.client_class.__name__), "[%s]:%s" % self.transport.addr)
self.alive = True
TTwisted.ThriftClientProtocol.connectionMade(self)
self.client.protocol = self
self.factory.clientIdle(self)
def connectionLost(self, reason=None):
log.debug(self, "Connection lost to", "%s.%s" % (self.client_class.__module__, self.client_class.__name__), "[%s]:%s" % self.transport.addr)
self.alive = False
try :
TTwisted.ThriftClientProtocol.connectionLost(self, reason)
self.factory.clientGone(self)
except Exception, e :
log.error(self.connectionLost, e)
def _complete(self, res, request, dfd):
self.deferred = None
if isinstance(res, failure.Failure) and dfd :
self.factory.pushRequest(request, dfd)
else :
if dfd :
dfd.callback(res)
self.factory.clientIdle(self)
return res
def submitRequest(self, request, dfd):
if not self.alive :
raise ClientBusy
if not self.deferred :
fun = getattr(self.client, request.method, None)
if not fun:
raise InvalidThriftRequest("No such method as : %s" % request.method)
else :
try :
d = fun(*request.args, **request.kw)
except Exception, e :
log.error(self.submitRequest, "calling : ", request.method, e)
self.deferred = d
d.addBoth(self._complete, request, dfd)
return d
else:
raise ClientBusy
class ManagedClientFactory(ReconnectingClientFactory):
maxDelay = 5
thriftFactory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory
protocol = ManagedThriftClientProtocol
submitLoopSleep = 0
client_class = None
def __init__(self, client_class=None):
self._stack = deque()
self._protos = defer.DeferredQueue()
self.deferred = defer.Deferred()
self.client_class = client_class or self.client_class
self.client = ManagedClient(self)
def _errback(self, reason=None):
if self.deferred :
self.deferred.errback(reason)
self.deferred = None
def _callback(self, value=None):
if self.deferred :
self.deferred.callback(value)
self.deferred = None
def clientConnectionFailed(self, connector, reason):
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
self._errback(reason)
def buildProtocol(self, addr) :
self.resetDelay()
p = self.protocol(self.client_class, self.thriftFactory())
p.factory = self
#self._protos.put(p)
return p
def clientIdle(self, proto) :
self._callback(True)
if proto.alive :
self._protos.put(proto)
def clientGone(self, proto):
pass
#if proto in self._protos :
# self._protos.remove(proto)
def _protoErr(self, proto):
pass
#import traceback
#traceback.print_stack()
#print "IN PROTO ERR", proto
def _protoReady(self, proto):
if proto.deferred :
log.msg(self._protoReady, "Proto currently active!")
return
if not proto.alive :
log.msg(self._protoReady, "Proto currently dead!")
return
try:
request, deferred = self._stack.popleft()
except defer.QueueUnderflow :
pass
d = proto.submitRequest(request, deferred)
return d
def pushRequest(self, request, din=None) :
d = din or defer.Deferred()
self._stack.append((request, d))
dfd = self._protos.get()
dfd.addCallback(self._protoReady)
dfd.addErrback(self._protoErr)
return d
def shutdown(self) :
"""Shutdown this factory"""
self.stopTrying()
for p in self._protos:
if p.transport:
p.transport.loseConnection()</pre>