# -*- coding: utf-8 -*- """ License: BSD (c) 2005-2008 ::: Alec Thomas (alec@swapoff.org) (c) 2009 ::: www.CodeResort.com - BV Network AS (simon-code@bvnetwork.no) """ import datetime import time import xmlrpclib import genshi from trac.core import * from trac.perm import PermissionError from trac.resource import ResourceNotFound from trac.util.datefmt import utc from trac.util.text import to_unicode from trac.web.api import RequestDone from tracrpc.api import XMLRPCSystem, IRPCProtocol, Binary, \ RPCError, MethodNotFound, ProtocolException, ServiceException from tracrpc.util import empty, prepare_docs __all__ = ['XmlRpcProtocol'] def to_xmlrpc_datetime(dt): """ Convert a datetime.datetime object to a xmlrpclib DateTime object """ return xmlrpclib.DateTime(dt.utctimetuple()) def from_xmlrpc_datetime(data): """Return datetime (in utc) from XMLRPC datetime string (is always utc)""" t = list(time.strptime(data.value, "%Y%m%dT%H:%M:%S")[0:6]) return apply(datetime.datetime, t, {'tzinfo': utc}) class XmlRpcProtocol(Component): r""" There should be XML-RPC client implementations available for all popular programming languages. Example call using `curl`: {{{ user: ~ > cat body.xml wiki.getPage WikiStart user: ~ > curl -H "Content-Type: application/xml" --data @body.xml ${req.abs_href.rpc()} = Welcome to.... }}} The following snippet illustrates how to perform authenticated calls in Python. {{{ >>> from xmlrpclib import ServerProxy >>> p = ServerProxy('${req.abs_href.login('rpc').replace('://', '://%s:your_password@' % authname)}') >>> p.system.getAPIVersion() [${', '.join(rpc.version.split('.'))}] }}} """ implements(IRPCProtocol) # IRPCProtocol methods def rpc_info(self): return ('XML-RPC', prepare_docs(self.__doc__)) def rpc_match(self): # Legacy path xmlrpc provided for backwards compatibility: # Using this order to get better docs yield ('rpc', 'application/xml') yield ('xmlrpc', 'application/xml') yield ('rpc', 'text/xml') yield ('xmlrpc', 'text/xml') def parse_rpc_request(self, req, content_type): """ Parse XML-RPC requests.""" try: args, method = xmlrpclib.loads( req.read(int(req.get_header('Content-Length')))) except Exception, e: self.log.debug("RPC(xml) parse error: %s", to_unicode(e)) raise ProtocolException(xmlrpclib.Fault(-32700, to_unicode(e))) else : self.log.debug("RPC(xml) call by '%s', method '%s' with args: %s" \ % (req.authname, method, repr(args))) args = self._normalize_xml_input(args) return {'method' : method, 'params' : args} def send_rpc_result(self, req, result): """Send the result of the XML-RPC call back to the client.""" rpcreq = req.rpc method = rpcreq.get('method') self.log.debug("RPC(xml) '%s' result: %s" % ( method, repr(result))) result = tuple(self._normalize_xml_output([result])) self._send_response(req, xmlrpclib.dumps(result, methodresponse=True), rpcreq['mimetype']) def send_rpc_error(self, req, e): """Send an XML-RPC fault message back to the caller""" rpcreq = req.rpc fault = None if isinstance(e, ProtocolException): fault = e._exc elif isinstance(e, ServiceException): e = e._exc elif isinstance(e, MethodNotFound): fault = xmlrpclib.Fault(-32601, to_unicode(e)) elif isinstance(e, PermissionError): fault = xmlrpclib.Fault(403, to_unicode(e)) elif isinstance(e, ResourceNotFound): fault = xmlrpclib.Fault(404, to_unicode(e)) if fault is not None : self._send_response(req, xmlrpclib.dumps(fault), rpcreq['mimetype']) else : self.log.error(e) import traceback from tracrpc.util import StringIO out = StringIO() traceback.print_exc(file = out) self.log.error(out.getvalue()) err_code = hasattr(e, 'code') and e.code or 1 method = rpcreq.get('method') self._send_response(req, xmlrpclib.dumps( xmlrpclib.Fault(err_code, "'%s' while executing '%s()'" % (str(e), method)))) # Internal methods def _send_response(self, req, response, content_type='application/xml'): response = to_unicode(response).encode("utf-8") req.send_response(200) req.send_header('Content-Type', content_type) req.send_header('Content-Length', len(response)) req.end_headers() req.write(response) raise RequestDone def _normalize_xml_input(self, args): """ Normalizes arguments (at any level - traversing dicts and lists): 1. xmlrpc.DateTime is converted to Python datetime 2. tracrpc.api.Binary => xmlrpclib.Binary 2. String line-endings same as from web (`\n` => `\r\n`) """ new_args = [] for arg in args: # self.env.log.debug("arg %s, type %s" % (arg, type(arg))) if isinstance(arg, xmlrpclib.DateTime): new_args.append(from_xmlrpc_datetime(arg)) elif isinstance(arg, xmlrpclib.Binary): arg.__class__ = Binary new_args.append(arg) elif isinstance(arg, basestring): new_args.append(arg.replace("\n", "\r\n")) elif isinstance(arg, dict): for key, val in arg.items(): arg[key], = self._normalize_xml_input([val]) new_args.append(arg) elif isinstance(arg, (list, tuple)): new_args.append(self._normalize_xml_input(arg)) else: new_args.append(arg) return new_args def _normalize_xml_output(self, result): """ Normalizes and converts output (traversing it): 1. None => '' 2. datetime => xmlrpclib.DateTime 3. Binary => xmlrpclib.Binary 4. genshi.builder.Fragment|genshi.core.Markup => unicode """ new_result = [] for res in result: if isinstance(res, datetime.datetime): new_result.append(to_xmlrpc_datetime(res)) elif isinstance(res, Binary): res.__class__ = xmlrpclib.Binary new_result.append(res) elif res is None or res is empty: new_result.append('') elif isinstance(res, (genshi.builder.Fragment, \ genshi.core.Markup)): new_result.append(to_unicode(res)) elif isinstance(res, dict): for key, val in res.items(): res[key], = self._normalize_xml_output([val]) new_result.append(res) elif isinstance(res, list) or isinstance(res, tuple): new_result.append(self._normalize_xml_output(res)) else: new_result.append(res) return new_result