I’ve been using Amara to address my high throughput needs for Extract Transform Load (ETL), querying, and processing of large amounts of RDF. In one particular part of the larger process, I needed to be able to stream very large XML documents in a particular dialect into RDF/XML. I sent an email to the akara google group describing the challenges and my thoughts behind wanting to use a streaming XML paradigm rather than XSLT.
I basically want to leverage Amara’s pushtree and its use of coroutines as a minimal-overhead pipeline for dispatching events triggered by elements in the source XML, where the source XML is a GRDDL source document and the pushtree coroutine is the transformation property. That task is still a work in progress, in the interest of expedience I went forward and used XSLT but need to try out some of what Uche suggested in the end.
The other part where I have made much more progress is in streaming results to SPARQL queries (against a SPARQL service) into a CSV file via command-line and with minimal overhead (also using Amara, pushtree, and coroutines). A recent set of changes to layercake-python modified the sparqler command-line to add an —endpoint option which takes a SPARQL service URL. Other changes were made to the remote SPARQL service store to support this.
Also added, was a new sparqlcsv script:
$ sparqlcsv --help Usage: sparqlcsv [options] [SPARQLXMLFilePath] Options: -h, --help show this help message and exit -q QUOTECHAR, --quoteChar=QUOTECHAR The quote character to use -c, --count Just count the results, do not serialize to CSV -d DELIMITER, --delimiter=DELIMITER The delimiter to use -o FILEPATH, --output=FILEPATH The path where to write the resulting CSV file
This script takes a SPARQL XML file either from the file indicated as the first argument or from STDIN if none is specified and writes out a CSV file to STDOUT or to a file. The general architectural idea is to build a bash pipeline from the SPARQL service to a CSV file (and eventually into a relational database for more sophisticated analysis) or to STDOUT for subsequent processing along the pipeline.
So, now I can run a query against Virtuoso and stream the CSV results into a file (with minimal processing overhead):
$ sparqler --owl=..OWL file.. --ns=..prefix..=..URL.. \ --endpoint=..SPARQL service URL.. \ "SELECT ... { ... }" | sparqlcsv | .. subsequent processong ..
Where the namespaces in the OWL/RDF file (provided by the —owl option) and those given explicitly via the —ns option are added as namespace prefix definitions at the top of the SPARQL query that is dispatched to the remote SPARQL service located via the URL provided to the —endpoint option. Alternatively, the -o option can be used to specify a filename where the CSV content is written to.
The sparqlcsv script uses a pushtree coroutine to stream XML content into a CSV file in this way:
def produce_csv(doc,csvWriter,justCount): cnt=Counter() @coroutine def receive_nodes(cnt): while True: node = yield if justCount: cnt.counter+=1 else: rt=[] badChars = False for binding in node.binding: try: rt.append(U(binding).encode('ascii')) except UnicodeEncodeError: rt.append(U(binding).encode('ascii', 'ignore')) badChars = True print >> sys.stderr, "Skipping character", U(binding) if badChars: cnt.skipCounter += 1 csvWriter.writerow(rt) return target = receive_nodes(cnt) pushtree(doc, u'result', target.send, entity_factory=entity_base) target.close() return cnt
Where doc is an XML document (as a string), csvWriter is an instance of the Writer Object, and the last parameter indicates whether or not only the size of the solution sequence is returned rather than the resulting CSV.