You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Writing a SPIN Query

A SPIN network is a collection of nodes, which are objects that can perform queries. These odes join together to form a network that allow queries to be broadcast from node to node and then aggregate results in reverse order. 

Nodes can be accessed directly by code running in the same JVM, or exposed over HTTP via an application server.

This document explains how to write the SPIN plug-in used to implement a query, and it provides client code that can be used to perform this query remotely.

Resources

System Requirements

  • Sun/Oracle Java Development Kit (JDK) 1.6.0_04 or later.  Other JDKs, including OpenJDK, are not supported but will likely work.
  • Apache Maven 2.2.1. Maven 3.x is not supported, but will likely work. (Required to build SPIIN and the example module)
  • SPIN

Hello, Spin

* *The queries that SPIN nodes perform are implemented as instances of the QueryAction interface.

public interface QueryAction<Criteria>

{

    String perform(final QueryContext context, final Criteria criteria) throws QueryException;


    Criteria unmarshal(final String serializedCriteria) throws SerializationException;


    boolean isReady();


    void destroy();

}

https://scm.chip.org/svn/repos/spin/base/trunk/node/api/src/main/java/org/spin/node/actions/QueryAction.java

QueryActions takes a serialized input criteria and returns serialized results.  It is up to the implementer to choose a serialization format, if any.

Queries are submitted to a SPIN network using either the Agent or Querier class.  The Querier class is higher level, Itis used in this document and the examples module.

Implement QueryAction

A simple QueryAction is one that echoes its input.  Here's how it would look in Scala:

final class EchoQueryAction extends QueryAction[String] {

    override def unmarshal(serialized: String) = serialized


    override def perform(context: QueryContext, input: String) = input


    override def isReady = true


    override def destroy { }

}

This could be simplified by extending AbstractQueryAction, which provides default implementations for isReady() and destroy() which are suitable for a stateless QueryAction like EchoQueryAction:

final class EchoQueryAction extends AbstractQueryAction[String] {

    override def unmarshal(serialized: String) = serialized


    override def perform(context: QueryContext, input: String) = input

}
A QueryAction's unmarshal method defines how the input criteria is unmarshalled into a Java object.  In this case, we  just echoing our input, so we don't need to deserialize. 

Consider a slightly more complicated QueryAction, that receives as input a list of integers and returns the sum.  Here the serialization format is XML, so we can extends JAXBQueryAction, which supplies an implementation of unmarshal() that uses JAXB to turn raw XML into a object in the JVM:

final class AddQueryAction extends JAXBQueryAction(classOf[AddInput]) {

    //Take an AddInput, and return an XML-serialized AddResult

    override def perform(context: QueryContext, input: AddInput): String = {

        val result = new AddResult(input.toAdd.sum)


        JAXBUtils.marshalToString(result)

    }

}

This class takes input like:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>

<ns2:AddInput xmlns:ns2="http://spin.org/xml/res/">

    <number xsi:type="xs:int" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">1</number>

    <number xsi:type="xs:int" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">2</number>

    <number xsi:type="xs:int" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">3</number>

</ns2:AddInput>

which gets unmarshalled into a class like

@XmlAccessorType(XmlAccessType.FIELD)

@XmlType(name = "AddInput", namespace = "http://spin.org/xml/res/")

@XmlRootElement(name = "AddInput", namespace = "http://spin.org/xml/res/")

//The only field is a Java List, which works with JAXB

final case class AddInput(private val number: JList[Int])
{ &nbsp;&nbsp;&nbsp; //JAXB requires a no-arg constructor; can be private &nbsp;&nbsp;&nbsp; *def* *this*() = *this*(*new* JArrayList\[Int\]) &nbsp;&nbsp;&nbsp; //For nicer Scala interop &nbsp;&nbsp;&nbsp; *def* *this*(toAdd: Seq\[Int\]) = *this*(*new* JArrayList(asJavaList(toAdd))) &nbsp;&nbsp;&nbsp; *def* toAdd: Seq\[Int\] = number.toSeq }

Returning Results

QueryActions return results as serialized Strings.  SPIN is agnostic about the contents of the returned String and, as with input, the serialization format used.  In this case, if we define a class that's serializable by JAXB, like:

@XmlAccessorType(XmlAccessType.FIELD)

@XmlType(name = "AddResult", namespace = "http://spin.org/xml/res/")

@XmlRootElement(name = "AddResult", namespace = "http://spin.org/xml/res/")

final case class AddResult(val sum: Int) {

    //JAXB requires a no-arg constructor; can be private

    def this() = this(0)

}

You can use SPIN's JAXBUtils class to serialize an instance into a String:

final class AddQueryAction extends JAXBQueryAction(classOf[AddInput]) {

    //Take an AddInput, and return an XML-serialized AddResult

    override def perform(context: QueryContext, input: AddInput): String = {

        val result = new AddResult(input.toAdd.sum)



        JAXBUtils.marshalToString(result)

    }

}

Make a Node that loads your query

* *A SPIN node is an instance of the SpinNodeImpl class, which implements the SpinNode interface accessed directly in the same JVM, or remotely via HTTP.  This example uses the JVM method, because it's simplest.  Deploying a SPIN node in a servlet container is covered later.

 Nodes assign each type of query a unique string identifier, called a QueryType, which is used to look up the QueryAction instance that implements the query.  By convention, QueryTypes are human-readable.  This mapping can be set up one of two ways:

 Specify QueryAction class directly

 Create a NodeConfig object with the mapping like:

val queryType = "Spin.Example.Echo"

val queryActionClassName = classOf[EchoQueryAction].getName

val echoNodeConfig = NodeConfig.Default.withQuery(new QueryTypeConfig(queryType, queryActionClassName))

echoNodeConfig may be passed to SpinNodeImpl's constructor:

val nodeID = ...

val routingTable = ...

val node = new SpinNodeImpl(nodeID, echoNodeConfig, routingTable)

It is also possible to set up this mapping in a node.xml file, which is an XML-serialized NodeConfig.  Setting up this mapping is covered later in this document.

Use a QueryActionMap

Specifying QueryType-to-QueryAction-class mappings directly is straightforward, but has some drawbacks: 

  • First, the QueryAction implementation must have a public no-argument constructor. 
  • Second, the SPIN extension implementer has less control over the lifecycle of the QueryAction.  QueryActions will be instantiated when the Node starts, and their destroy() methods will be called when the Node is shut down, but no further guarantees are possible.  Caching, lazy-loading, memorization, or other techniques may be performed by the SPIN node, but this is out of the extension implementer's hands.

Alternatively, you can supply an instance of the QueryActionMap interface.  This interface describes a factory for QueryActions that the node uses to obtain them.  A QueryActionMap may be lazy, cache QueryActions or not, or use a DI framework like Guice or Spring, among other possibilities.  Defining a QueryActionMap, while straightforward, is more verbose than the direct-mapping approach so it is not used in this guide.

Make a client

* *SPIN's client-side classes are fairly low-level.  First, you need a place to hold some constants and perform administrative tasks:

 object Config {



    //The human-readable name of the node we will create; purely descriptive

    val nodeName = "AddNode"



    //Some dummy credentials for submitting queries with

    val credentials = new Credentials("CBMI", "some-user", "some-password")



    //Method that returns a SpinClientConfig with all necessary fields set to enable

    //querying the passed in node

    def spinClientConfigFor(node: SpinNode) = {

        val nodeConnectorSource = registerAsLocalSource(nodeConnectorSourceFor(node))



        val entryPoint = new EndpointConfig(EndpointType.Local, nodeName)



        SpinClientConfig.Default.withCredentials(credentials).withNodeConnectorSource(nodeConnectorSource).withEntryPoint(entryPoint)

    }



    //Nodes may participate in one or more overlay networks, called peer groups.  By belonging to more

    //than one peer group, the same node can exist at different points in different logical network topologies.

    val peerGroupName = "AddPeerGroup"



    //An empty, dummy, routing table.  Defines one peer group, 'AddPeerGroup' that only the node

    //we will create belongs to.

    val routingTableConfig = new RoutingTableConfig(new PeerGroupConfig(peerGroupName))



    //The QueryType name that will map to the AddQueryAction class.  Clients specify which query they would

    //like to perform by specifying a QueryType.

    val queryType = "Spin.Add"



    //Create a NodeConfig with default values for its fields, and a mapping between the QueryType 'Spin.Add' and

    //an instance of AddQueryAction

    val nodeConfig = NodeConfig.Default.withQuery(new QueryTypeConfig(queryType, classOf[AddQueryAction].getName))



    //Needed to enable locating node instances when making in-JVM queries

    private def nodeConnectorSourceFor(node: SpinNode) = new NodeConnectorSource {

        override def getNodeConnector(endpoint: EndpointConfig, timeoutPeriod: Long): NodeConnector = NodeConnector.instance(node)

    }



    //Needed to enable locating node instances when making in-JVM queries

    private def registerAsLocalSource(source: NodeConnectorSource): NodeConnectorSource = {

        NodeOperationFactory.addMapping(EndpointType.Local, source)



        source

    }

}

 Here is a higher-level wrapper that makes use of the information from Config to synchronously send a query to a node and get the results:

final class AddClient(client: SpinClient) {

    //Init with the in-JVM node we're going to query

    def this(toBeQueried: SpinNode) = this(new SpinClient(Config.spinClientConfigFor(toBeQueried)))



    //Take a bunch of ints, return an Option of their sum, or None if there was an error.

    def query(intsToBeAdded: Seq[Int]): Option[Int] = {



        //method to extract the first (and in this case, only) Result from a ResultSet

        def firstAddResult(resultSet: ResultSet) = JAXBUtils.unmarshal(resultSet.getResults.head.getPayload.getData, classOf[AddResult])



        try {

            //Actually make the query

            val resultSet = client.query(Config.peerGroupName, Config.queryType, new AddInput(intsToBeAdded))



            log(resultSet)



            //Inspect the results, and extract the first (only) one

            if(resultSet.size > 0) Some(firstAddResult(resultSet).sum) else None

        }

        catch {

            case e: TimeoutException => { println("Timed out waiting for query to complete: " + e.getMessage) ; e.printStackTrace(System.err) ; None }



            case e: Exception => { println("Error making query: " + e.getMessage) ; e.printStackTrace(System.err) ; None}

        }

    }



    private def log(resultSet: ResultSet) {

        val expectedString = Option(resultSet.getTotalExpected).map(_.toString).getOrElse("?")



        val completeString = if(resultSet.isComplete) "complete" else "incomplete"



        println("(" + completeString + ": " + resultSet.size + "/" + expectedString + " results)")

    }

}

 Here is a main method that ties it all together:

object Main {

    def main(args: Array[String]) = {

        //Create a node to query; once instantiated, it is ready to be queried

        val node = new SpinNodeImpl(new CertID("123456789", "some-node"), //arbitrary ID

                                    Config.nodeConfig, //pre-made config, references AddQueryAction

                                    RoutingTableConfigSources.withConfigObject(Config.routingTableConfig)) //pre-made config, contains one peer group



        //Create a client pointing at that node

        val client = new AddClient(node)



        val prompt = "Add> "



        val in = new BufferedReader(new InputStreamReader(System.in))



        println("Enter a list of numbers to be summed, separated by spaces or commas")

        println("Enter quit to exit")

        print(prompt)



        var line = in.readLine



        while(line != null) {

            //Shut down cleanly if requested

            if(line.equalsIgnoreCase("quit")) {

                println("Exiting...")



                node.destroy()



                System.exit(0)

            }



            if(!line.isEmpty) {

                //Split each line on non-numeric chars, and turn the chunks into ints

                val numbers = line.trim.split("[^\\d]+").map(Integer.parseInt)



                //Query the node we made earlier

                val queryResult = client.query(numbers)



                //Print the results

                println(queryResult.map(result => "Received: '" + result + "'").getOrElse("Query failed"))

            }



            print(prompt)



            line = in.readLine

        }

    }

}

Running this will create an in-JVM SPIN node and configure it to respond to the 'Spin.Add' QueryType using an AddQueryAction instance; create a client that queries that node and performs queries in response to user input.  These examples are available at:http://scm.chip.org/svn/repos/spin/base/trunk/examples

It is recommended to use the SpinClient class for all new client applications.  SpinClient implements the most common use case – synchronous queries that block until all results have arrived – in a streamlined manner.  A similarly streamlined asynchronous client API is in development.  Until then, the lower-level Querier and Agent classes provide and asynchronous client API.

The following code creates a SpinClient that can query a SPIN network rooted at an arbitrary URL:

val entrypoint = new EndpointConfig(EndpointType.SOAP, "http://localhost:8080/examples/node")

 val config = SpinClientConfig.Default.withEntryPoint(entrypoint)

 val client = new SpinClient(config)

SpinClientConfig has several fields;  the only one without a default value is the entry point URL, expressed as an EndpointConfig, which is a String address and an EnpointType. It describes the way to connect to the node at that address.  The currently allowed EndpointTypes are SOAP and Local with more planned. Local addresses require a  more setup (as evidenced in the previous examples) to allow locating nodes in the current JVM using a String identifier.

A SpinClient with the default configuration will attempt to sign all outgoing queries, which means that a certificate with a private key part must be available.  (TODO: Spin certificate management)

Testing in a Servlet Container

The easiest way to get started is to build the SPIN examples module.  This exists in source control at http://scm.chip.org/svn/repos/spin/base/trunk/examples.  The examples.war file, built by the examples-war module is also available from the Open.Med Maven repository at (TODO: Nexus URL).  Examples.war contains a SPIN node, plus example QueryActions defined in the examples/* modules.

1) Obtain the SPIN node WAR file

2) Install and configure SPIN

The SPIN Installation Guide provides installation, configuration and testing instructions.

  • No labels