code to async_http_client to fetch remote rdf asynchronously, and aalto_xml to parse rdf_xml asynchronously. (still missing other formats) webid
authorHenry Story <henry.story@bblfish.net>
Wed, 01 Feb 2012 17:09:29 +0100
branchwebid
changeset 180 aa9074df0635
parent 179 8b057135a94d
child 181 d9c1f87eee55
code to async_http_client to fetch remote rdf asynchronously, and aalto_xml to parse rdf_xml asynchronously. (still missing other formats)
project/build.scala
src/main/java/patch/AsyncJenaParser.java
--- a/project/build.scala	Tue Jan 24 20:20:26 2012 +0100
+++ b/project/build.scala	Wed Feb 01 17:09:29 2012 +0100
@@ -28,19 +28,27 @@
   val rdfa = "net.rootdev" % "java-rdfa" % "0.4.2-RC2"
   val htmlparser = "nu.validator.htmlparser" % "htmlparser" % "1.2.1"
   val grizzled = "org.clapper" %% "grizzled-scala" % "1.0.8" % "test"
-  val scalaz = "org.scalaz" %% "scalaz-core" % "6.0.3"
+  val scalaz = "org.scalaz" %% "scalaz-core" % "6.0.4"
   val argot =  "org.clapper" %% "argot" % "0.3.5"
   val guava =  "com.google.guava" % "guava" % "11.0"
 //  val restlet = "org.restlet.dev" % "org.restlet" % "2.1-SNAPSHOT"
 //  val restlet_ssl = "org.restlet.dev" % "org.restlet.ext.ssl" % "2.1-SNAPSHOT"
   val jsslutils = "org.jsslutils" % "jsslutils" % "1.0.5"
   val slf4s = "com.weiglewilczek.slf4s" %% "slf4s" % "1.0.7"
+//  val akka_actor = "com.typesafe.akka" % "akka-actor" % "2.0-M3"
+  val async_http_client = "com.ning" % "async-http-client" % "1.7.0"
+  val aalto_xml = "com.fasterxml" % "aalto-xml" % "0.9.7"
+//  val akka_remote = "com.typesafe.akka" % "akka-remote" % "2.0-M3"
+//  val finagle_http = "com.twitter" %% "finagle-http" % "1.9.12"
 }
 
 // some usefull repositories
 object Resolvers {
   val novus = "repo.novus snaps" at "http://repo.novus.com/snapshots/"
   val mavenLocal = "Local Maven Repository" at "file://" + (Path.userHome / ".m2" / "repository").absolutePath
+//  val typesafe = "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
+  val sonatype = "Sonatype Release" at "http://oss.sonatype.org/content/repositories/releases"
+//  val twitter = "Twitter Repository" at "http://maven.twttr.com/"
 }
 
 // general build settings
@@ -98,6 +106,8 @@
   val projectSettings =
     Seq(
       resolvers += mavenLocal,
+      resolvers += sonatype,
+ //     resolvers += typesafe,
       resolvers += ScalaToolsReleases,
       resolvers += ScalaToolsSnapshots,
       libraryDependencies += specs2,
@@ -122,6 +132,8 @@
       libraryDependencies += rdfa,
       libraryDependencies += htmlparser,
       libraryDependencies += slf4s,
+      libraryDependencies += async_http_client,
+      libraryDependencies += aalto_xml,
 
       jarName in assembly := "read-write-web.jar",
       mainClass in assembly := Some("org.w3.readwriteweb.netty.ReadWriteWebNetty")
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/patch/AsyncJenaParser.java	Wed Feb 01 17:09:29 2012 +0100
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package patch;
+
+import java.util.Iterator;
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.events.Attribute;
+import javax.xml.stream.events.Characters;
+import javax.xml.stream.events.Comment;
+import javax.xml.stream.events.DTD;
+import javax.xml.stream.events.EndElement;
+import javax.xml.stream.events.EntityDeclaration;
+import javax.xml.stream.events.Namespace;
+import javax.xml.stream.events.ProcessingInstruction;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+
+import com.fasterxml.aalto.AsyncXMLStreamReader;
+import com.fasterxml.aalto.stax.InputFactoryImpl;
+import org.xml.sax.Attributes;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.Locator;
+import org.xml.sax.SAXException;
+import org.xml.sax.ext.LexicalHandler;
+import org.xml.sax.helpers.AttributesImpl;
+
+/**
+ *
+ * Bridge StAX and SAX parsing, with support for asynchronous parsing
+ *
+ * @author Henry Story mostly adapted from Damian Steer's jena.rdf.arp.StAX2SAX
+ */
+public class AsyncJenaParser {
+    private final ContentHandler handler;
+    private final LexicalHandler lhandler;
+    private final XMLEventReader eventReader;
+    private final AsyncXMLStreamReader streamReader;
+
+    /**
+     * Primes a converter with a SAX handler.
+     *
+     * Note: if handler is also LexicalHandler it will pass on lexical events.
+     *
+     * @param handler
+     */
+    public AsyncJenaParser(ContentHandler handler, AsyncXMLStreamReader streamReader) throws XMLStreamException {
+        this.handler = handler;
+        this.lhandler = (handler instanceof LexicalHandler) ?
+                (LexicalHandler) handler :
+                NO_LEXICAL_HANDLER ;
+        handler.setDocumentLocator(new LocatorConv(streamReader));
+        final XMLInputFactory xf = InputFactoryImpl.newInstance();
+        this.streamReader = streamReader;
+        this.eventReader = xf.createXMLEventReader(streamReader);
+    }
+
+
+    public void parse() throws XMLStreamException, SAXException {
+        // We permit nesting, so keep at track of where we are
+        boolean consecutive_incomplete = false;
+        while (eventReader.hasNext()) {
+            try {
+                XMLEvent e = eventReader.nextEvent();
+                handleXMLEvent(e);
+                consecutive_incomplete=false;
+            } catch (XMLStreamException e) {
+                if (streamReader.getEventType() == AsyncXMLStreamReader.EVENT_INCOMPLETE) {
+                   if (consecutive_incomplete) return;
+                   else consecutive_incomplete=true;
+                } else throw e;
+            }
+        }
+    }
+
+    /**
+     * Handle the next event
+     * @param e
+     * @return true if the end of the document has been reached. (ie, we are back at level 0)
+     * @throws SAXException
+     */
+    private void handleXMLEvent(XMLEvent e) throws SAXException {
+        if (e.isStartDocument()) handler.startDocument();
+        else if (e.isEndDocument()) handler.endDocument();
+        else if (e.isStartElement()) emitSE(e.asStartElement());
+        else if (e.isEndElement()) emitEE(e.asEndElement());
+        else if (e.isProcessingInstruction()) emitPi((ProcessingInstruction) e);
+        else if (e.isCharacters()) emitChars(e.asCharacters());
+        else if (e.isAttribute()) emitAttr((Attribute) e);
+        else if (e.isEntityReference()) emitEnt((EntityDeclaration) e);
+        else if (e.isNamespace()) emitNS((Namespace) e);
+        else if (e instanceof Comment) emitComment((Comment) e);
+        else if (e instanceof DTD) emitDTD((DTD) e);
+        else {
+            //System.err.println("Unknown / unhandled event type " + e);
+            //throw new SAXException("Unknown / unhandled event type " + e);
+        }
+    }
+
+    private void emitSE(StartElement se) throws SAXException {
+        @SuppressWarnings("unchecked")
+        Iterator<Attribute> aIter = se.getAttributes() ;
+        handler.startElement(se.getName().getNamespaceURI(),
+                se.getName().getLocalPart(), qnameToS(se.getName()), convertAttrs(aIter));
+        @SuppressWarnings("unchecked")
+        Iterator<Namespace> it = se.getNamespaces();
+        while (it.hasNext()) emitNS(it.next());
+    }
+
+    private void emitEE(EndElement ee) throws SAXException {
+        handler.endElement(ee.getName().getNamespaceURI(),
+                ee.getName().getLocalPart(), qnameToS(ee.getName()));
+        @SuppressWarnings("unchecked")
+        Iterator<Namespace> it = ee.getNamespaces();
+        while (it.hasNext()) emitNSGone(it.next());
+    }
+
+    private void emitPi(ProcessingInstruction pi) throws SAXException {
+        handler.processingInstruction(pi.getTarget(), pi.getData());
+    }
+
+    private void emitChars(Characters chars) throws SAXException {
+        if (chars.isIgnorableWhiteSpace())
+            handler.ignorableWhitespace(chars.getData().toCharArray(),
+                    0, chars.getData().length());
+        else
+            handler.characters(chars.getData().toCharArray(),
+                    0, chars.getData().length());
+    }
+
+    private void emitAttr(Attribute attribute) {
+        // nowt to do
+    }
+
+    private void emitEnt(EntityDeclaration entityDeclaration) {
+        // nowt to do
+    }
+
+    private void emitNS(Namespace namespace) throws SAXException {
+        // Safety check to work around nasty tests
+        if (namespace.getPrefix() == null ||
+                namespace.getNamespaceURI() == null) return;
+        handler.startPrefixMapping(namespace.getPrefix(), namespace.getNamespaceURI());
+    }
+
+    private void emitNSGone(Namespace namespace) throws SAXException {
+        handler.endPrefixMapping(namespace.getPrefix());
+    }
+
+    private void emitComment(Comment comment) throws SAXException {
+        lhandler.comment(comment.getText().toCharArray(), 0, comment.getText().length());
+    }
+
+    private void emitDTD(DTD dtd) {
+        // Is this useful??
+    }
+
+    private Attributes convertAttrs(Iterator<Attribute> attributes) {
+        AttributesImpl toReturn = new AttributesImpl();
+        while (attributes.hasNext()) {
+            Attribute a = attributes.next();
+            toReturn.addAttribute(a.getName().getNamespaceURI(), a.getName().getLocalPart(),
+                    qnameToS(a.getName()), a.getDTDType(), a.getValue());
+        }
+        return toReturn;
+    }
+
+    private String qnameToS(QName name) {
+        if (name.getPrefix().length() == 0) return name.getLocalPart();
+        else return name.getPrefix() + ":" + name.getLocalPart();
+    }
+
+    static class LocatorConv implements Locator {
+        private final XMLStreamReader reader;
+
+        public LocatorConv(XMLStreamReader reader) { this.reader = reader; }
+
+        @Override
+        public final String getPublicId() { return reader.getLocation().getPublicId(); }
+        @Override
+        public final String getSystemId() { return reader.getLocation().getSystemId(); }
+        @Override
+        public final int getLineNumber() { return reader.getLocation().getLineNumber(); }
+        @Override
+        public final int getColumnNumber() { return reader.getLocation().getColumnNumber(); }
+    }
+
+    final static LexicalHandler NO_LEXICAL_HANDLER = new LexicalHandler() {
+        @Override
+        public void startDTD(String string, String string1, String string2) throws SAXException {}
+        @Override
+        public void endDTD() throws SAXException {}
+        @Override
+        public void startEntity(String string) throws SAXException {}
+        @Override
+        public void endEntity(String string) throws SAXException {}
+        @Override
+        public void startCDATA() throws SAXException {}
+        @Override
+        public void endCDATA() throws SAXException {}
+        @Override
+        public void comment(char[] chars, int i, int i1) throws SAXException {}
+    };
+}