# -*- coding: utf-8 -*- """ Trac Plugin for Monotone Copyright 2006-2008 Thomas Moschny (thomas.moschny@gmx.de) {{{ This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA }}} """ from trac.versioncontrol.api import Repository, Node, Changeset, \ IRepositoryConnector, NoSuchNode, NoSuchChangeset from trac.wiki import IWikiSyntaxProvider from trac.util import shorten_line, escape from trac.util.datefmt import format_datetime as format_datetime_trac from trac.core import Component, implements, TracError from trac.config import Option, ListOption from tracmtn.automate import MTN, AutomateException from tracmtn.util import get_oldpath, get_parent, Memoize from tracmtn.cache import get_cache from cStringIO import StringIO from time import strptime import re try: from threading import Lock except ImportError: from dummy_threading import Lock #IGNORE:E0611 try: from trac.versioncontrol.web_ui import IPropertyRenderer except ImportError: IPropertyRenderer = None try: from trac.util.datefmt import utc except ImportError: utc = None DATE_RULE = re.compile(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}') REVID_RULE = re.compile(r'^[0-9a-f]{40}$') INTERFACE_VERSIONS = { '2.0': '0.26', '2.1': '0.27', '2.2': '0.28', '3.0': '0.29', '3.1': '0.30', '4.0': '0.31-0.33', '4.1': '0.34', '4.3': '0.35', '5.0': '0.36', '6.0': '0.37-0.38', '7.0': '0.39-0.40', '8.0': '0.41', '9.0': '0.42-0.43', '10.0': '0.44', '11.0': '0.45', '12.0': '0.46-0.47', } class MultipleChangesets(TracError): def __init__(self, rev): TracError.__init__( self, "Multiple revisions found for '%s'" % rev, title="Ambiguous changeset Number") class MonotoneConnector(Component): """ Provides this plugin's functionality. """ implements(IRepositoryConnector, IWikiSyntaxProvider) # Configuration options mtn_binary = Option('mtn', 'mtn_binary', default='/usr/bin/mtn', doc='''Full path to the monotone binary.''') cachespec = Option('mtn', 'cachespec', default='localmem', doc='''Select a caching mechanism.''') xtracerts = ListOption('mtn', 'xtracerts', doc='''List of user certs to be displayed.''') # IRepositoryConnector methods def __init__(self): Component.__init__(self) self.mtn_procs = {} self.version = None self.lock = Lock() def get_supported_types(self): """ Return the types of version control systems that are supported. Yields `(repotype, priority)` pairs, where `repotype` is used to match against the configured `[trac] repository_type` value in TracIni. If multiple provider match a given type, the `priority` is used to choose between them (highest number is highest priority). """ yield ("mtn", 0) def get_revprops(self): """ Gets the user-defined configuration options for displaying non-standard revision certs. """ revprops = {} for cert in self.xtracerts: section = 'mtn-cert-%s' % cert revprops[cert] = [ self.config.get(section, 'name', cert), self.config.get(section, 'text', '%s'), self.config.getbool(section, 'wikiflag', True), self.config.get(section, 'htmlclass', None) or None # FIXME: Config bug? Returns '' if unset instead of None. ] return revprops def get_repository(self, type, path, authname): """ Return a Repository instance for the given repository type and dir. """ self.lock.acquire() try: # note: we don't use type or authname, therefore we can always # return the same Repository object for the same database path try: mtn = self.mtn_procs[path] except KeyError: mtn = MTN(path, self.log, self.mtn_binary) self.mtn_procs[path] = mtn repos = MonotoneRepository( mtn, path, self.log, self.cachespec, self.get_revprops()) # this is the main entry point for users of this plugin, so let's set # version information here if not self.version: interface = repos.get_interface_version() binary = INTERFACE_VERSIONS.get(interface, None) self.version = "interface: %s" % interface if binary: self.version += ", binary: %s (guessed)" % binary try: self.env.systeminfo.append(('Monotone', self.version)) except AttributeError: pass # no systeminfo in 0.10 return repos finally: self.lock.release() # IWikiSyntaxProvider methods def get_wiki_syntax(self): """ Link revid (optionally prefixed by 'r') to the revision changeset. """ yield (r'\br?(?P[0-9a-f]{40}\b)', lambda formatter, match, fullmatch: self._format_link(formatter, 'cset', fullmatch.group('revid'), match)) def get_link_resolvers(self): """ Add the cset namespace. """ yield('cset', self._format_link) yield('chgset', self._format_link) yield('branch', self._format_link) # branch head yield('revtag', self._format_link) # Internal methods def _format_link(self, formatter, ns, rev, label): """ Format a changeset link. """ repos = self.env.get_repository() if ns == 'branch': rev = "h:" + rev elif ns == 'revtag': rev = "t:" + rev try: changeset = repos.get_changeset(rev) return '%s' \ % (escape(shorten_line(changeset.message)), formatter.href.changeset(rev), label) except TracError, e: return '%s' \ % (str(e), formatter.href.changeset(rev), label) if IPropertyRenderer: from genshi.builder import tag class CsetPropertyRenderer(Component): implements(IPropertyRenderer) def match_property(self, name, mode): if mode == 'revprop' and name in ('Parents', 'Children', 'Branches', 'Tags'): return 4 return 0 def render_property(self, name, mode, context, props): fragments = [] repos = self.env.get_repository() for val in props[name]: if name in ('Branches', 'Tags'): # don't create links here fragments.append(tag(val)) else: changeset = repos.get_changeset(val) fragments.append(tag.a(val, class_="changeset", title=shorten_line(changeset.message), href=context.href.changeset(val))) return tag([tag(f, ', ') for f in fragments[:-1]], fragments[-1]) # Datetime handling changed somewhere between 0.10 and 0.11. We try to # support both variants for a while. if utc: from datetime import datetime def parse_datetime(raw): """ Convert a monotone date string into a datetime object. """ return datetime(tzinfo=utc, *strptime(raw, "%Y-%m-%dT%H:%M:%S")[:6]) def format_datetime(t): """ Convert a datetime object into an monotone date string. """ return format_datetime_trac(t, "%Y-%m-%dT%H:%M:%S", tzinfo=utc) else: # legacy from calendar import timegm def parse_datetime(raw): """ Convert a monotone date string into a unix timestamp. """ return timegm(strptime(raw + " UTC","%Y-%m-%dT%H:%M:%S %Z")) def format_datetime(t): """ Convert a unix timestamp into an monotone date string. """ return format_datetime_trac(t, "%Y-%m-%dT%H:%M:%S", gmt=True) def dates(certvals): """ Parse the raw dates and return a sorted list. """ result = [] for rawdate in certvals: # strip the date before parsing rawdate = DATE_RULE.search(rawdate).group() result.append(parse_datetime(rawdate)) result.sort() return result class CachedMTN(object): def __init__(self, mtn, cachespec): self.mtn = mtn self.cachespec = cachespec self.parents = Memoize(self.mtn.parents, self._get_cache) self.manifest = Memoize(self.mtn.manifest, self._get_cache) self.certs = Memoize(self.mtn.certs, self._get_cache) self.file_length = Memoize(self.mtn.file_length, self._get_cache) self.changesets = Memoize(self.mtn.changesets, self._get_cache) def _get_cache(self, realm): return get_cache(realm, self.cachespec) def close(self): self.parents.close() self.manifest.close() self.certs.close() self.file_length.close() self.changesets.close() def __getattr__(self, attr): return getattr(self.mtn, attr) class MonotoneRepository(Repository): """ Represents a Monotone repository. """ def __init__(self, mtn, path, log, cachespec, revpropspec = None): Repository.__init__(self, 'mtn:%s' % path, None, log) self.mtn = CachedMTN(mtn, cachespec) self.revpropspec = revpropspec or {} def close(self): """Close the connection to the repository.""" self.mtn.close() def get_changeset(self, rev): """ Retrieve a Changeset object that describes the changes made in revision 'rev'. """ rev = self.normalize_rev(rev) return self._get_changeset(rev) def _get_changeset(self, rev): """ Like get_changeset, but skips the revision normalization. """ try: return MonotoneChangeset(self.mtn, rev, self.revpropspec) except AutomateException: raise NoSuchChangeset(rev) def get_changesets(self, start, stop): """ Generate Changesets belonging to the given time period (start, stop). """ for rev in self.mtn.select('l:%s/e:%s' % (format_datetime(start), format_datetime(stop))): yield self._get_changeset(rev) def get_node(self, path, rev=None): """ Retrieve a Node (directory or file) from the repository at the given path. If the rev parameter is specified, the version of the node at that revision is returned, otherwise the latest version of the node is returned. """ # Note: in an mtn repository, there might be many file # hierarchies, so it makes no sense to ask for the latest # version of a path. # FIXME: normalize_rev can be skipped when called by ourselves rev = self.normalize_rev(rev) path = self.normalize_path(path) try: return MonotoneNode(self.mtn, rev, path) except AutomateException: raise NoSuchChangeset(rev) def sync(self, feedback=None): """Perform a sync of the repository cache, if relevant. If given, `feedback` must be a callback taking a `rev` parameter. The backend will call this function for each `rev` it decided to synchronize, once the synchronization changes are committed to the cache. """ pass def sync_changeset(self, rev): """Resync the repository cache for the given `rev`, if relevant.""" pass def get_oldest_rev(self): """ Return the oldest revision stored in the repository. Here: Return the oldest root. """ roots = dict([(self.get_dates(rev)[0], rev) for rev in self.mtn.roots()]) dates = roots.keys() dates.sort() return roots[dates[0]] def get_youngest_rev(self): """ Return the youngest revision in the repository. Here: Return the youngest leave. """ leaves = dict([(self.get_dates(rev)[-1], rev) for rev in self.mtn.leaves()]) dates = leaves.keys() dates.sort() return leaves[dates[-1]] # FIXME: for next_rev|previous_rev should not return parents or children, # but next or previous revisions according to commit time def previous_rev(self, rev): """ Return the revision immediately preceding the specified revision. """ # note: returning only one parent parents = self.mtn.parents(rev) parents.sort() return parents and parents[0] or None def next_rev(self, rev, path=''): """ Return the revision immediately following the specified revision. """ # note: ignoring path for now # note: returning only one child children = self.mtn.children(rev) children.sort() return children and children[0] or None def rev_older_than(self, rev1, rev2): """ Return True if rev1 is older than rev2, i.e. if rev1 comes before rev2 in the revision sequence. """ return self.get_dates(rev1)[0] < self.get_dates(rev2)[-1] def get_path_history(self, path, rev=None, limit=None): """ Retrieve all the revisions containing this path (no newer than 'rev'). The result format should be the same as the one of Node.get_history() """ #for hist in MonotoneNode(self.mtn, rev, path): # yield hist raise NotImplementedError def normalize_path(self, path): """ Return a canonical representation of path in the repos. We strip trailing slashes except for the root. """ return '/' + (path and path.strip('/') or '') def normalize_rev(self, rev): """Return a canonical representation of a revision. It's up to the backend to decide which string values of `rev` (usually provided by the user) should be accepted, and how they should be normalized. Some backends may for instance want to match against known tags or branch names. In addition, if `rev` is `None` or '', the youngest revision should be returned. """ if rev is None or isinstance(rev, basestring) and \ rev.lower() in ('', 'none', 'latest', 'youngest'): return self.youngest_rev if not REVID_RULE.match(rev): # doesn't look like a hash, pass to mtn's select revs = self.mtn.select(rev) if len(revs) < 1: raise NoSuchChangeset(rev) elif len(revs) > 1: raise MultipleChangesets(rev) rev = revs[0] return rev.encode('ascii') def short_rev(self, rev): """ Return a compact representation of a revision in the repos. """ return rev[0:4] + ".." def get_changes(self, old_path, old_rev, new_path, new_rev, ignore_ancestry=1): """ Generator that yields change tuples (old_node, new_node, kind, change) for each node change between the two arbitrary (path,rev) pairs. The old_node is assumed to be None when the change is an ADD, the new_node is assumed to be None when the change is a DELETE. """ raise NotImplementedError def get_tags(self, rev): """ Generate a list of known tags, as (name, rev) pairs. `rev` might be needed in order to retrieve the tags, but in general it's best to produce all known tags. """ return self.mtn.tags() def get_branches(self, rev): """ Generate a list of known branches, as (name, rev) pairs. `rev` might be needed in order to retrieve the branches, but in general it's best to produce all known branches. """ return self.mtn.non_merged_branches() def get_quickjump_entries(self, from_rev): """ Generate a list of interesting places in the repositoy. `rev` might be used to restrict the list of available location, but in general it's best to produce all known locations. The generated results must be of the form (category, name, path, rev). """ result = [] for name, rev in self.get_branches(from_rev): result.append(('branches', name, '/', rev)) for name, rev in self.get_tags(from_rev): result.append(('tags', name, '/', rev)) return result def get_dates(self, rev): """ Parse the date certs and return a sorted list of trac compatible dates for rev. """ return dates(self.mtn.certs(rev).get('date', [])) def get_interface_version(self): """ Returns the automation interface version string. """ return self.mtn.get_interface_version() class MonotoneNode(Node): """ Represents a directory or file in the repository at a given revision. """ def __init__(self, mtn, rev, path, manifest = None): self.mtn = mtn self.manifest = manifest or self.mtn.manifest(rev) if not path in self.manifest: raise NoSuchNode(path, rev) self.content_id = self.manifest[path][1] self.created_path = path self.created_rev = rev kind = self.manifest[path][0] # 'file' or 'dir' if kind == Node.FILE: # FIXME: we can't handle multiple marks rev = self.mtn.content_changed(rev, path)[0] # trac bug, or at least problematic behavior: in the # browser window, Node.path is used for the link behind # the path, but Node.rev is not, so better don't set # Node.path #marked = self.mtn.roster(rev)[1][curr.ident] #path = marked.name Node.__init__(self, path, rev, kind) def get_content(self): """ Return a stream for reading the content of the node. This method will return None for directories. The returned object should provide a read([len]) function. """ if self.isdir: return None return StringIO(self.mtn.get_file(self.content_id)) def get_entries(self): """ Generator that yields the immediate child entries of a directory, in no particular order. If the node is a file, this method returns None. """ if self.isfile: return def ischild(path): """ Returns true, if we are parent of path. """ return get_parent(path) == self.path for path in filter(ischild, self.manifest.keys()): # IGNORE:W0141 yield MonotoneNode(self.mtn, self.rev, path, self.manifest) def get_history(self, limit=None): """ Generator that yields (path, rev, chg) tuples, one for each revision in which the node was changed. This generator will follow copies and moves of a node (if the underlying version control system supports that), which will be indicated by the first element of the tuple (i.e. the path) changing. Starts with an entry for the current revision. """ # FIXME: this is only a stub yield (self.path, self.rev, None) def get_properties(self): """ Returns a dictionary containing the properties (meta-data) of the node. The set of properties depends on the version control system. """ return self.manifest[self.path][2] def get_content_length(self): if self.isdir: return None return self.mtn.file_length(self.content_id) def get_content_type(self): if self.isdir: return None return '' def get_last_modified(self): # fixme: might be to pessimistic return dates(self.mtn.certs(self.rev).get('date', []))[-1] class MonotoneChangeset(Changeset): """ Represents the set of changes in one revision. """ # changesets are retrieved via MonotoneRepository.get_changeset() def __init__(self, mtn, rev, revpropspec = None): self.certs = mtn.certs(rev) self.messages = self.certs.get('changelog', ['-']) self.authors = self.certs.get('author', ['-']) self.dates = dates(self.certs.get('date', [])) self.branches = self.certs.get('branch', []) self.tags = self.certs.get('tag', []) # multiple dates not supported, so pick the first date date = self.dates[0] # Trac doesn't support multiple authors author = ', '.join(self.authors) # concatenate the commit messages message = '\n----\n'.join(self.messages) Changeset.__init__(self, rev, message, author, date) self.mtn = mtn self.revpropspec = revpropspec or {} def get_changes(self): """ Generator that produces a tuple for every change in the changeset The tuple will contain `(path, kind, change, base_path, base_rev)`, where `change` can be one of Changeset.ADD, Changeset.COPY, Changeset.DELETE, Changeset.EDIT or Changeset.MOVE, and `kind` is one of Node.FILE or Node.DIRECTORY. The `path` is the targeted path for the `change` (which is the ''deleted'' path for a DELETE change). The `base_path` and `base_rev` are the source path and rev for the action (`None` and `-1` in the case of an ADD change). """ # We do not closely implement that api, for example, we don't # know the kind of a deleted or renamed node. for changeset in self.mtn.changesets(self.rev): oldrev = changeset.oldrev # deletions for oldpath in changeset.deleted: yield oldpath, None, Changeset.DELETE, oldpath, oldrev # pure renames for (path, oldpath) in changeset.renamed.iteritems(): yield path, None, Changeset.MOVE, oldpath, oldrev # additions for (path, kind) in changeset.added.iteritems(): yield path, kind, Changeset.ADD, None, -1 # patches for path in changeset.patched: oldpath = get_oldpath(path, changeset.renamed) yield path, Node.FILE, Changeset.EDIT, oldpath, oldrev if IPropertyRenderer: def get_properties(self): properties = {} parents = self.mtn.parents(self.rev) if parents: properties['Parents'] = parents children = self.mtn.children(self.rev) if children: properties['Children'] = children if self.branches: properties['Branches'] = self.branches if self.tags: properties['Tags'] = self.tags # FIXME: add user-defined revision properties return properties else: # legacy def get_properties(self): """ Generator that provides additional metadata for this changeset. Each additional property is a 4 element tuple: * `name` is the name of the property, * `text` its value * `wikiflag` indicates whether the `text` should be interpreted as wiki text or not * `htmlclass` enables to attach special formatting to the displayed property, e.g. `'author'`, `'time'`, `'message'` or `'changeset'`. """ # multiple authors and messages are concatenated, # additional dates are not printed for parent in self.mtn.parents(self.rev): yield('Parent', '[cset:%s]' % parent, True, 'changeset') for child in self.mtn.children(self.rev): yield('Child', '[cset:%s]' % child, True, 'changeset') for branch in self.branches: #yield('Branch', '[branch:%s]' % branch, True, 'changeset') yield('Branch', branch, False, 'changeset') for tag in self.tags: yield('Tag', '[revtag:%s]' % tag, True, 'changeset') # user-defined revision properties to be displayed for cert, spec in self.revpropspec.iteritems(): for certvalue in self.certs.get(cert, []): yield(spec[0], spec[1] % certvalue, spec[2], spec[3])