Twisted XMLRPC proxy -
i'd make xmlrpc proxy balancer using twisted.
[xmlrpc server 1_1 8.8.8.8:8000] <--> [----------------------] <--- client [proxy example.com:8000] <--- client [xmlrpc server 1_2 9.9.9.9:8000] <--> [----------------------] <--- client
so there 2 xmlrpc instances represents same methods. need xmlrpc-proxy between instances , clients. 1 more thing - proxy should accept json calls (kind of http://example.com:8000/rpc2, http://example.com:8000/json).
right trying implement xmlrpc proxy calls. can't receive answer client although sendline() calling.
import argparse twisted.internet import protocol, reactor, defer, threads twisted.web import xmlrpc twisted.internet.task import loopingcall twisted.internet.defer import deferredqueue, deferred, inlinecallbacks twisted.protocols.basic import linereceiver import configfile bcsxmlrpc import xmlrpc_request_parser, xmlrpc_marshal customlogging import logging logging.getlogger().setlevel(logging.debug) class proxyclient(xmlrpc.proxy): def __init__(self, proxy_uri, user, timeout=30.0): self.proxy_uri = proxy_uri xmlrpc.proxy.__init__(self, url=proxy_uri, user=user, connecttimeout=timeout) @inlinecallbacks def call_api(self, name, *args): logging.debug(u"calling api: %s" % name) result = yield self.callremote(name, *args) proxy_pool.add(self.proxy_uri) defer.returnvalue(result) class request(object): def __init__(self, method, params, deferred): self.method = method self.params = params self.deferred = deferred class proxyserver(linereceiver): def datareceived(self, data): logging.pr(data) params, method = xmlrpc_request_parser(data) # got method name , arguments d = deferred() d.addcallbacks(self._send_reply, self._log_error) logging.debug(u"%s%s added queue" % (method, params)) queue.put(request(method, params, d)) def _send_reply(self, result): logging.ps(result) self.sendline(str(result)) def _log_error(self, error): logging.error(error) def connectionmade(self): logging.info(u"new client connected") def connectionlost(self, reason): logging.info(u"client connection lost: %s" % reason.geterrormessage()) class proxyserverfactory(protocol.factory): protocol = proxyserver def buildprotocol(self, addr): return proxyserver() @inlinecallbacks def _queue_execute_job(): if queue.pending , proxy_pool: proxy = proxy_pool.pop() request = yield queue.get() result = yield proxyclient(proxy, "").call_api(request.method, *list(request.params)) request.deferred.callback(result) if __name__ == "__main__": parser = argparse.argumentparser(description="run configuration") parser.add_argument('--config', help=u"configuration file name/path") config = configfile.proxyconfig(parser.parse_args().config) global proxy_pool proxy_pool = set() proxy_server in config.servers: proxy_pool.add(proxy_server) global queue queue = deferredqueue() lc2 = loopingcall(_queue_execute_job) lc2.start(1) logging.info(u"starting proxy @ port %s" % config.port) reactor.listentcp(config.port, proxyserverfactory()) reactor.run()
Comments
Post a Comment