from twisted.internet import reactor, protocol, defer, threads from packetformat import PacketFormat import paatti_pb2 from messagetypes import messagetypes import threading class RPCCall: def __init__(self, callback, msgtype, requestclass): self.callback = callback self.msgtype = msgtype self.requestclass = requestclass def __call__(self, **kwargs): request = self.requestclass(**kwargs) return self.callback(self.msgtype, request) class RPCClient(PacketFormat): def __init__(self): for msgtype, cmd in messagetypes.items(): setattr(self, cmd.name, RPCCall(self.sendCommand, msgtype, cmd.request)) def connectionMade(self): if self.factory.connection_defer is not None: self.factory.connection_defer.callback(self) self.pending = [] self.Login(im_rpc_server = False) def messageReceived(self, msgtype, payload): if msgtype == paatti_pb2.Error: responseclass = paatti_pb2.ErrorResponse else: responseclass = messagetypes[msgtype].response response = responseclass.FromString(payload) self.pending.pop(0)(response) def sendCommand(self, msgtype, request): result = defer.Deferred() self.pending.append(result.callback) self.sendMessage(msgtype, request) return result class RPCClientFactory(protocol.ClientFactory): protocol = RPCClient connection_defer = None def waitForDeferred(deferred): event = threading.Event() resultstore = [] def callback(result, event = event, resultstore = resultstore): resultstore.append(result) event.set() deferred.addCallback(callback) while not event.wait(1): pass return resultstore[0] class ThreadedRPCClient(threading.Thread): '''Run twisted on another thread, provide blocking behaviour. For use in simple sequential scripts. ''' def __init__(self, host, port): threading.Thread.__init__(self) self.factory = RPCClientFactory() self.factory.connection_defer = defer.Deferred() reactor.connectTCP(host, port, self.factory) self.connection = None def connect_callback(connection): self.connection = connection self.factory.connection_defer.addCallback(connect_callback) for msgtype, cmd in messagetypes.items(): setattr(self, cmd.name, RPCCall(self.sendCommand, msgtype, cmd.request)) def waitForConnection(self): if self.connection is None: waitForDeferred(self.factory.connection_defer) def run(self): reactor.run(installSignalHandlers = False) def stop(self): reactor.callFromThread(reactor.stop) def sendCommand(self, msgtype, request): def callback(): return [self.connection.sendCommand(msgtype, request)] d = threads.blockingCallFromThread(reactor, callback)[0] return waitForDeferred(d) if __name__ == '__main__': import sys r = ThreadedRPCClient(sys.argv[1], 50140) def result_display(self, obj): if obj.__class__.__module__ == 'paatti_pb2': print repr(obj) print str(obj) r.start() try: print "Connecting to proxy..." r.waitForConnection() print "Waiting for RPC server..." r.Ping() print "All ready!" import IPython shell = IPython.Shell.IPShellEmbed() shell.IP.rc.quiet = True shell.IP.set_hook("result_display", result_display) shell(local_ns=locals()) finally: r.stop()