Using Amara's pushtree for heavyweight XML processing in GRDDL and SPARQL querying

I’ve been using Amara to address my high throughput needs for Extract Transform Load (ETL), querying, and processing of large amounts of RDF. In one particular part of the larger process, I needed to be able to stream very large XML documents in a particular dialect into RDF/XML. I sent an email to the akara google group describing the challenges and my thoughts behind wanting to use a streaming XML paradigm rather than XSLT.

I basically want to leverage Amara’s pushtree and its use of coroutines as a minimal-overhead pipeline for dispatching events triggered by elements in the source XML, where the source XML is a GRDDL source document and the pushtree coroutine is the transformation property. That task is still a work in progress, in the interest of expedience I went forward and used XSLT but need to try out some of what Uche suggested in the end.

The other part where I have made much more progress is in streaming results to SPARQL queries (against a SPARQL service) into a CSV file via command-line and with minimal overhead (also using Amara, pushtree, and coroutines). A recent set of changes to layercake-python modified the sparqler command-line to add an —endpoint option which takes a SPARQL service URL. Other changes were made to the remote SPARQL service store to support this.

Also added, was a new sparqlcsv script:

$ sparqlcsv --help
Usage: sparqlcsv [options] [SPARQLXMLFilePath]
Options:
 -h, --help            show this help message and exit
 -q QUOTECHAR, --quoteChar=QUOTECHAR
                       The quote character to use
 -c, --count           Just count the results, do not serialize to CSV
 -d DELIMITER, --delimiter=DELIMITER
                       The delimiter to use
 -o FILEPATH, --output=FILEPATH
                       The path where to write the resulting CSV file

This script takes a SPARQL XML file either from the file indicated as the first argument or from STDIN if none is specified and writes out a CSV file to STDOUT or to a file. The general architectural idea is to build a bash pipeline from the SPARQL service to a CSV file (and eventually into a relational database for more sophisticated analysis) or to STDOUT for subsequent processing along the pipeline.

So, now I can run a query against Virtuoso and stream the CSV results into a file (with minimal processing overhead):

$ sparqler --owl=..OWL file.. --ns=..prefix..=..URL.. \
           --endpoint=..SPARQL service URL.. \
"SELECT ... { ... }" | sparqlcsv | .. subsequent processong ..

Where the namespaces in the OWL/RDF file (provided by the —owl option) and those given explicitly via the —ns option are added as namespace prefix definitions at the top of the SPARQL query that is dispatched to the remote SPARQL service located via the URL provided to the —endpoint option. Alternatively, the -o option can be used to specify a filename where the CSV content is written to.

The sparqlcsv script uses a pushtree coroutine to stream XML content into a CSV file in this way:

def produce_csv(doc,csvWriter,justCount):
   cnt=Counter()
   @coroutine
   def receive_nodes(cnt):
       while True:
           node = yield
           if justCount:
               cnt.counter+=1
           else:
               rt=[]
               badChars = False
               for binding in node.binding:
                   try:
                       rt.append(U(binding).encode('ascii'))
                   except UnicodeEncodeError:
                       rt.append(U(binding).encode('ascii', 'ignore'))
                       badChars = True
                       print >> sys.stderr, "Skipping character", U(binding)
               if badChars:
                   cnt.skipCounter += 1
               csvWriter.writerow(rt)
       return
   target = receive_nodes(cnt)
   pushtree(doc, u'result', target.send, entity_factory=entity_base)
   target.close()
   return cnt

Where doc is an XML document (as a string), csvWriter is an instance of the Writer Object, and the last parameter indicates whether or not only the size of the solution sequence is returned rather than the resulting CSV.