Skip to content

Commit

Permalink
Add OSGi example
Browse files Browse the repository at this point in the history
Add scala-based example

Resolves: https://jira.duraspace.org/browse/FCREPO-1282
  • Loading branch information
acoburn authored and Andrew Woods committed Jan 22, 2015
1 parent c0c8e0e commit 0dbe6b6
Show file tree
Hide file tree
Showing 22 changed files with 619 additions and 178 deletions.
23 changes: 23 additions & 0 deletions examples/fcrepo-camel-osgi/ReadMe.txt
@@ -0,0 +1,23 @@
Fcrepo Camel OSGi Project
=========================

This is a sample project that listens to fedora's JMS message stream,
passing updates to an external triplestore and solr index. The component
also exposes a REST endpoint for re-indexing the entire collection.

To build this project use

mvn install

Once built, this can be deployed in an OSGi container, such as
[Karaf](https://karaf.apache.org). If using Karaf, add the fcrepo-camel
jarfile to $KARAF_HOME/deploy. Then, add this component to the same deploy
directory.

Properties can be defined and updated in an org.fcrepo.camel.examples.osgi.cfg
file.

For more help see the Apache Camel documentation

http://camel.apache.org/

Expand Up @@ -4,11 +4,11 @@
<modelVersion>4.0.0</modelVersion>

<groupId>org.fcrepo.camel.examples</groupId>
<artifactId>fcrepo-camel-solr</artifactId>
<packaging>jar</packaging>
<artifactId>fcrepo-camel-osgi</artifactId>
<packaging>bundle</packaging>
<version>1.0-SNAPSHOT</version>

<name>A Camel Route to connect Fedora4 with Solr</name>
<name>A Camel Route to connect Fedora with Solr</name>
<url>http://fcrepo.org</url>

<organization>
Expand All @@ -22,6 +22,7 @@
<activemq.version>5.10.0</activemq.version>
<camel.version>2.14.0</camel.version>
<logback.version>1.1.2</logback.version>
<fcrepo.version>4.0.1-SNAPSHOT</fcrepo.version>
</properties>

<licenses>
Expand All @@ -48,6 +49,11 @@
<artifactId>activemq-camel</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.fcrepo.camel</groupId>
<artifactId>fcrepo-camel</artifactId>
<version>${fcrepo.version}</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>ch.qos.logback</groupId>
Expand All @@ -73,8 +79,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
Expand All @@ -86,14 +92,29 @@
</configuration>
</plugin>

<!-- Allows the example to be run via 'mvn compile exec:java' -->
<!-- to generate the MANIFEST-FILE of the bundle -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.3.7</version>
<extensions>true</extensions>
<configuration>
<mainClass>org.fcrepo.camel.examples.MainApp</mainClass>
<includePluginDependencies>false</includePluginDependencies>
<instructions>
<Bundle-SymbolicName>fcrepo-camel-osgi</Bundle-SymbolicName>
<Export-Package>
org.fcrepo.camel.examples.osgi;version=${project.version},
</Export-Package>
<Import-Package>
javax.xml.transform.stream,
org.apache.activemq.camel.component,
org.apache.camel.*,
org.fcrepo.camel,
org.fcrepo.camel.processor,
org.osgi.service.blueprint,
org.w3c.dom
</Import-Package>
<Embed-Transitive>true</Embed-Transitive>
</instructions>
</configuration>
</plugin>

Expand Down
@@ -0,0 +1,72 @@
/**
* Copyright 2014 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.examples.osgi;

import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.fcrepo.camel.HttpMethods;
import org.fcrepo.camel.JmsHeaders;
import org.fcrepo.camel.processor.SparqlDeleteProcessor;

/**
* A Camel Router for handling delete operations.
*
* @author Aaron Coburn
*/
public class DeleteRouter extends RouteBuilder {

public void configure() throws Exception {

final String solrFormat = "{\"delete\":{\"id\":\"${headers[%s]}\", \"commitWithin\": \"500\"}}";

/**
* A generic error handler (specific to this RouteBuilder)
*/
onException(Exception.class)
.handled(true)
.transform()
.simple("Delete Error: ${routeId}")
.to("direct:error.log");

/*
* Handle items in the delete queue.
*/
from("seda:delete")
.routeId("FcrepoDelete")
.multicast().to("direct:delete.log", "direct:delete.triplestore", "direct:delete.solr");

/*
* Remove objects from the triplestore.
*/
from("direct:delete.triplestore")
.routeId("FcrepoDeleteTriplestore")
.process(new SparqlDeleteProcessor())
.to("http4:{{triplestore.baseUrl}}/update?throwExceptionOnFailure=false");

/*
* Remove objects from the solr index.
*/
from("direct:delete.solr")
.routeId("FcrepoDeleteSolr")
.setHeader(Exchange.CONTENT_TYPE)
.constant("application/json")
.setHeader(Exchange.HTTP_METHOD)
.constant(HttpMethods.POST)
.transform()
.simple(String.format(solrFormat, JmsHeaders.IDENTIFIER))
.to("http4://{{solr.baseUrl}}/update");
}
}
@@ -0,0 +1,53 @@
/**
* Copyright 2014 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.examples.osgi;

import org.apache.camel.builder.RouteBuilder;
import org.fcrepo.camel.JmsHeaders;
import org.fcrepo.camel.RdfNamespaces;

/**
* A content router for handling JMS events.
*
* @author Aaron Coburn
*/
public class EventRouter extends RouteBuilder {

public void configure() throws Exception {

/**
* A generic error handler (specific to this RouteBuilder)
*/
onException(Exception.class)
.handled(true)
.transform()
.simple("Event Routing Error: ${routeId}")
.to("direct:error.log");

/**
* route a message to the proper queue, based on whether
* it is a DELETE or UPDATE operation.
*/
from("activemq:{{jms.fcrepoEndpoint}}")
.routeId("FcrepoEventRouter")
.to("direct:event.log")
.choice()
.when(header(JmsHeaders.EVENT_TYPE).isEqualTo(RdfNamespaces.REPOSITORY + "NODE_REMOVED"))
.to("seda:delete?blockWhenFull=true")
.otherwise()
.to("seda:update?blockWhenFull=true");
}
}
@@ -0,0 +1,50 @@
/**
* Copyright 2014 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.examples.osgi;

import java.net.URL;

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.fcrepo.camel.FcrepoHeaders;

/**
* A processor that converts the REST uri into the
* identifying path for an fcrepo node.
*
* This assumes that the `restPrefix` value is stored
* in the org.fcrepo.camel.examples.osgi.restPrefix header.
*
* @author Aaron Coburn
*/
public class IndexPathProcessor implements Processor {

public void process(final Exchange exchange) throws Exception {
final Message in = exchange.getIn();
final URL url = new URL(in.getHeader(Exchange.HTTP_URI, String.class));
final String prefix = in.getHeader(MessageHeaders.REST_PREFIX, String.class);

in.setHeader(FcrepoHeaders.FCREPO_IDENTIFIER,
url.getPath().substring(prefix.length()));
in.setHeader(FcrepoHeaders.FCREPO_BASE_URL,
in.getHeader(MessageHeaders.BASE_URL, String.class));

in.removeHeaders("CamelHttp*");
in.removeHeaders("CamelRestlet*");
in.removeHeaders("org.restlet*");
}
}
@@ -0,0 +1,95 @@
/**
* Copyright 2014 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.examples.osgi;

import javax.xml.transform.stream.StreamSource;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.xml.XPathBuilder;
import org.apache.camel.builder.xml.Namespaces;
import org.apache.camel.Exchange;
import org.fcrepo.camel.FcrepoHeaders;
import org.fcrepo.camel.RdfNamespaces;

/**
* A router for traversing a node hierarchy, initiating'
* re-indexing operations.
*
* @author Aaron Coburn
*/
public class IndexRouter extends RouteBuilder {

public void configure() throws Exception {

final Namespaces ns = new Namespaces("rdf", RdfNamespaces.RDF);
ns.add("ldp", RdfNamespaces.LDP);

final XPathBuilder children = new XPathBuilder("/rdf:RDF/rdf:Description/ldp:contains");
children.namespaces(ns);

/**
* A generic error handler (specific to this RouteBuilder)
*/
onException(Exception.class)
.handled(true)
.transform()
.simple("Indexing Error: ${routeId}")
.to("direct:error.log");

/**
* Index objects, starting at the identified node
*/
from("direct:index")
.routeId("FcrepoIndexEndpoint")
.log(String.format("Initial indexing path: ${headers[%s]}", Exchange.HTTP_URI))
.setHeader(MessageHeaders.REST_PREFIX)
.constant("{{rest.prefix}}")
.setHeader(MessageHeaders.BASE_URL)
.constant("http://{{fcrepo.baseUrl}}")
.process(new IndexPathProcessor())
.to("seda:enqueue?waitForTaskToComplete=Never&blockWhenFull=true")
.setHeader(Exchange.CONTENT_TYPE)
.constant("text/plain")
.transform()
.simple(String.format("Indexing started at ${headers[%s]}", FcrepoHeaders.FCREPO_IDENTIFIER));

/**
* Buffer index requests in an asynchronous queue before sending to ActiveMQ
*/
from("seda:enqueue")
.routeId("FcrepoEnqueue")
.removeHeaders("CamelHttp*")
.to("activemq:queue:index");

/**
* Process indexing requests from the index queue
*/
from("activemq:queue:index")
.routeId("FcrepoIndexer")
.streamCaching()
.to("seda:update?waitForTaskToComplete=Never&blockWhenFull=true")
.removeHeaders("CamelHttp*")
.setHeader(Exchange.HTTP_METHOD)
.constant("GET")
.to("fcrepo:{{fcrepo.baseUrl}}?preferInclude=PreferContainment&preferOmit=ServerManaged")
.convertBodyTo(StreamSource.class)
.split(children).streaming()
.transform()
.xpath("/ldp:contains/@rdf:resource", String.class, ns)
.process(new NodePathProcessor())
.to("seda:enqueue?waitForTaskToComplete=Never&blockWhenFull=true");
}
}

0 comments on commit 0dbe6b6

Please sign in to comment.