Source code for rdflib.plugins.serializers.turtle

"""
Turtle RDF graph serializer for RDFLib.
See <http://www.w3.org/TeamSubmission/turtle/> for syntax specification.
"""

from collections import defaultdict

from rdflib.term import BNode, Literal, URIRef
from rdflib.exceptions import Error
from rdflib.serializer import Serializer
from rdflib.namespace import RDF, RDFS

__all__ = ['RecursiveSerializer', 'TurtleSerializer']


[docs]class RecursiveSerializer(Serializer): topClasses = [RDFS.Class] predicateOrder = [RDF.type, RDFS.label] maxDepth = 10 indentString = u" "
[docs] def __init__(self, store): super(RecursiveSerializer, self).__init__(store) self.stream = None self.reset()
[docs] def addNamespace(self, prefix, uri): self.namespaces[prefix] = uri
[docs] def checkSubject(self, subject): """Check to see if the subject should be serialized yet""" if ((self.isDone(subject)) or (subject not in self._subjects) or ((subject in self._topLevels) and (self.depth > 1)) or (isinstance(subject, URIRef) and (self.depth >= self.maxDepth))): return False return True
[docs] def isDone(self, subject): """Return true if subject is serialized""" return subject in self._serialized
[docs] def orderSubjects(self): seen = {} subjects = [] for classURI in self.topClasses: members = list(self.store.subjects(RDF.type, classURI)) members.sort() for member in members: subjects.append(member) self._topLevels[member] = True seen[member] = True recursable = [ (isinstance(subject, BNode), self._references[subject], subject) for subject in self._subjects if subject not in seen] recursable.sort() subjects.extend([subject for (isbnode, refs, subject) in recursable]) return subjects
[docs] def preprocess(self): for triple in self.store.triples((None, None, None)): self.preprocessTriple(triple)
[docs] def preprocessTriple(self, (s, p, o)): self._references[o]+=1 self._subjects[s] = True
[docs] def reset(self): self.depth = 0 self.lists = {} self.namespaces = {} self._references = defaultdict(int) self._serialized = {} self._subjects = {} self._topLevels = {} for prefix, ns in self.store.namespaces(): self.addNamespace(prefix, ns)
[docs] def buildPredicateHash(self, subject): """ Build a hash key by predicate to a list of objects for the given subject """ properties = {} for s, p, o in self.store.triples((subject, None, None)): oList = properties.get(p, []) oList.append(o) properties[p] = oList return properties
[docs] def sortProperties(self, properties): """Take a hash from predicate uris to lists of values. Sort the lists of values. Return a sorted list of properties.""" # Sort object lists for prop, objects in properties.items(): objects.sort() # Make sorted list of properties propList = [] seen = {} for prop in self.predicateOrder: if (prop in properties) and (prop not in seen): propList.append(prop) seen[prop] = True props = properties.keys() props.sort() for prop in props: if prop not in seen: propList.append(prop) seen[prop] = True return propList
[docs] def subjectDone(self, subject): """Mark a subject as done.""" self._serialized[subject] = True
[docs] def indent(self, modifier=0): """Returns indent string multiplied by the depth""" return (self.depth + modifier) * self.indentString
[docs] def write(self, text): """Write text in given encoding.""" self.stream.write(text.encode(self.encoding, 'replace'))
SUBJECT = 0 VERB = 1 OBJECT = 2 _GEN_QNAME_FOR_DT = False _SPACIOUS_OUTPUT = False
[docs]class TurtleSerializer(RecursiveSerializer): short_name = "turtle" indentString = ' '
[docs] def __init__(self, store): self._ns_rewrite = {} super(TurtleSerializer, self).__init__(store) self.keywords = { RDF.type: 'a' } self.reset() self.stream = None self._spacious = _SPACIOUS_OUTPUT
[docs] def addNamespace(self, prefix, namespace): # Turtle does not support prefix that start with _ # if they occur in the graph, rewrite to p_blah # this is more complicated since we need to make sure p_blah # does not already exist. And we register namespaces as we go, i.e. # we may first see a triple with prefix _9 - rewrite it to p_9 # and then later find a triple with a "real" p_9 prefix # so we need to keep track of ns rewrites we made so far. if (prefix > '' and prefix[0] == '_') \ or self.namespaces.get(prefix, namespace) != namespace: if prefix not in self._ns_rewrite: p = "p" + prefix while p in self.namespaces: p = "p" + p self._ns_rewrite[prefix] = p prefix = self._ns_rewrite.get(prefix, prefix) super(TurtleSerializer, self).addNamespace(prefix, namespace) return prefix
[docs] def reset(self): super(TurtleSerializer, self).reset() self._shortNames = {} self._started = False self._ns_rewrite = {}
[docs] def serialize(self, stream, base=None, encoding=None, spacious=None, **args): self.reset() self.stream = stream self.base = base if spacious is not None: self._spacious = spacious self.preprocess() subjects_list = self.orderSubjects() self.startDocument() firstTime = True for subject in subjects_list: if self.isDone(subject): continue if firstTime: firstTime = False if self.statement(subject) and not firstTime: self.write('\n') self.endDocument() stream.write(u"\n".encode('ascii'))
[docs] def preprocessTriple(self, triple): super(TurtleSerializer, self).preprocessTriple(triple) for i, node in enumerate(triple): if node in self.keywords: continue # Don't use generated prefixes for subjects and objects self.getQName(node, gen_prefix=(i == VERB)) if isinstance(node, Literal) and node.datatype: self.getQName(node.datatype, gen_prefix=_GEN_QNAME_FOR_DT) p = triple[1] if isinstance(p, BNode): # hmm - when is P ever a bnode? self._references[p]+=1
[docs] def getQName(self, uri, gen_prefix=True): if not isinstance(uri, URIRef): return None parts = None try: parts = self.store.compute_qname(uri, generate=gen_prefix) except: # is the uri a namespace in itself? pfx = self.store.store.prefix(uri) if pfx is not None: parts = (pfx, uri, '') else: # nothing worked return None prefix, namespace, local = parts # QName cannot end with . if local.endswith("."): return None prefix = self.addNamespace(prefix, namespace) return u'%s:%s' % (prefix, local)
[docs] def startDocument(self): self._started = True ns_list = sorted(self.namespaces.items()) for prefix, uri in ns_list: self.write(self.indent() + '@prefix %s: <%s> .\n' % (prefix, uri)) if ns_list and self._spacious: self.write('\n')
[docs] def endDocument(self): if self._spacious: self.write('\n')
[docs] def statement(self, subject): self.subjectDone(subject) return self.s_squared(subject) or self.s_default(subject)
[docs] def s_default(self, subject): self.write('\n' + self.indent()) self.path(subject, SUBJECT) self.predicateList(subject) self.write(' .') return True
[docs] def s_squared(self, subject): if (self._references[subject] > 0) or not isinstance(subject, BNode): return False self.write('\n' + self.indent() + '[]') self.predicateList(subject) self.write(' .') return True
[docs] def path(self, node, position, newline=False): if not (self.p_squared(node, position, newline) or self.p_default(node, position, newline)): raise Error("Cannot serialize node '%s'" % (node, ))
[docs] def p_default(self, node, position, newline=False): if position != SUBJECT and not newline: self.write(' ') self.write(self.label(node, position)) return True
[docs] def label(self, node, position): if node == RDF.nil: return '()' if position is VERB and node in self.keywords: return self.keywords[node] if isinstance(node, Literal): return node._literal_n3( use_plain=True, qname_callback=lambda dt: self.getQName( dt, _GEN_QNAME_FOR_DT)) else: node = self.relativize(node) return self.getQName(node, position == VERB) or node.n3()
[docs] def p_squared(self, node, position, newline=False): if (not isinstance(node, BNode) or node in self._serialized or self._references[node] > 1 or position == SUBJECT): return False if not newline: self.write(' ') if self.isValidList(node): # this is a list self.write('(') self.depth += 1 # 2 self.doList(node) self.depth -= 1 # 2 self.write(' )') else: self.subjectDone(node) self.depth += 2 # self.write('[\n' + self.indent()) self.write('[') self.depth -= 1 # self.predicateList(node, newline=True) self.predicateList(node, newline=False) # self.write('\n' + self.indent() + ']') self.write(' ]') self.depth -= 1 return True
[docs] def isValidList(self, l): """ Checks if l is a valid RDF list, i.e. no nodes have other properties. """ try: if not self.store.value(l, RDF.first): return False except: return False while l: if l != RDF.nil and len( list(self.store.predicate_objects(l))) != 2: return False l = self.store.value(l, RDF.rest) return True
[docs] def doList(self, l): while l: item = self.store.value(l, RDF.first) if item is not None: self.path(item, OBJECT) self.subjectDone(l) l = self.store.value(l, RDF.rest)
[docs] def predicateList(self, subject, newline=False): properties = self.buildPredicateHash(subject) propList = self.sortProperties(properties) if len(propList) == 0: return self.verb(propList[0], newline=newline) self.objectList(properties[propList[0]]) for predicate in propList[1:]: self.write(' ;\n' + self.indent(1)) self.verb(predicate, newline=True) self.objectList(properties[predicate])
[docs] def verb(self, node, newline=False): self.path(node, VERB, newline)
[docs] def objectList(self, objects): count = len(objects) if count == 0: return depthmod = (count == 1) and 0 or 1 self.depth += depthmod self.path(objects[0], OBJECT) for obj in objects[1:]: self.write(',\n' + self.indent(1)) self.path(obj, OBJECT, newline=True) self.depth -= depthmod