Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Update to accomodate node handle in JMS message
Browse files Browse the repository at this point in the history
  • Loading branch information
osmandin authored and Andrew Woods committed Sep 6, 2013
1 parent 3acf39b commit 6b0a785
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 56 deletions.
8 changes: 7 additions & 1 deletion fcrepo-jms-indexer-core/pom.xml
Expand Up @@ -39,7 +39,13 @@
</exclusion>
</exclusions>
</dependency>


<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>

<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-spring</artifactId>
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Date;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;

/**
* Basic Indexer implementation that writes object content to timestamped files
Expand Down Expand Up @@ -60,7 +61,7 @@ public void update(String pid, String content) throws IOException {
// timestamped filename
String fn = pid + "." + fmt.format( new Date() );
if ( fn.indexOf("/") != -1 ) {
fn = fn.replaceAll("/","_");
fn = StringUtils.substringAfterLast(fn, "/");
}

// write content to disk
Expand Down
Expand Up @@ -30,6 +30,7 @@
import javax.jms.TextMessage;

import org.apache.abdera.Abdera;
import org.apache.abdera.model.Category;
import org.apache.abdera.model.Document;
import org.apache.abdera.model.Entry;
import org.apache.abdera.parser.Parser;
Expand All @@ -43,10 +44,10 @@
/**
* MessageListener implementation that retrieves objects from the repository and
* invokes one or more indexers to index the content.
*
*
* @author Esmé Cowles
* Date: Aug 19, 2013
**/
**/
public class IndexerGroup implements MessageListener {
private Parser atomParser = new Abdera().getParser();
private String repositoryURL;
Expand All @@ -56,7 +57,7 @@ public class IndexerGroup implements MessageListener {

/**
* Default constructor.
**/
**/
public IndexerGroup() {
PoolingClientConnectionManager p = new PoolingClientConnectionManager();
p.setDefaultMaxPerRoute(5);
Expand All @@ -66,64 +67,76 @@ public IndexerGroup() {

/**
* Set repository URL.
**/
**/
public void setRepositoryURL(String repositoryURL) {
this.repositoryURL = repositoryURL;
}

/**
* Get repository URL.
**/
**/
public String getRepositoryURL() {
return repositoryURL;
}

/**
* Set indexers for this group.
**/
**/
public void setIndexers(Set indexers) {
for (Iterator it = indexers.iterator(); it.hasNext();) {
Object o = it.next();
if (o instanceof Indexer) {
if (this.indexers == null) {
this.indexers = new HashSet<Indexer>();
}
this.indexers.add( (Indexer)o );
this.indexers.add((Indexer) o);
}
}
}

/**
* Get indexers set for this group.
**/
**/
public Set<Indexer> getIndexers() {
return indexers;
}

/**
* Extract node path from Atom category list
* @return Node path or repositoryUrl if it's not found
*/
private String getPath(java.util.List<Category> categories) {
for (Category c : categories) {
if (c.getLabel().equals("path")) {
return repositoryURL + c.getTerm();
}
}
return repositoryURL;
}

/**
* Handle a JMS message representing an object update or deletion event.
**/
**/
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
// get pid from message
final String xml = ((TextMessage) message).getText();
Document<Entry> doc = atomParser.parse(new StringReader(xml));
Entry entry = doc.getRoot();
// FIXME: This pid logic does not work with path: /rest/a/b/c
final String pid = entry.getCategories("xsd:string").get(0)
.getTerm();

// if the object is updated, fetch current content
String content = null;
if (!"purgeObject".equals(entry.getTitle())) {
HttpGet get = new HttpGet(repositoryURL + pid);
HttpGet get = new HttpGet(
getPath(entry.getCategories("xsd:string")));
HttpResponse response = httpclient.execute(get);
content = IOUtils.toString(
response.getEntity().getContent(),
Charset.forName("UTF-8")
);
content = IOUtils.toString(response.getEntity()
.getContent(), Charset.forName("UTF-8"));
}
//pid represents the full path. Alternative would be to send path separately in all calls
//String pid = getPath(entry.getCategories("xsd:string")).replace("//objects", "/objects");
String pid = getPath(entry.getCategories("xsd:string"));


// call each registered indexer
for (Indexer indexer : indexers) {
Expand Down
Expand Up @@ -91,7 +91,6 @@ public void update( String pid, String content ) {
remove(pid);

// parse content into a model
String subject = prefix + pid;
Model model = ModelFactory.createDefaultModel();
model.read( new StringReader(content), null, "N3");

Expand All @@ -111,8 +110,7 @@ public void update( String pid, String content ) {
* Perform a DESCRIBE query for triples about the Fedora object and remove
* all triples with subjects starting with the same subject.
**/
public void remove( String pid ) {
String subject = prefix + pid;
public void remove( String subject ) {

// find triples/quads to delete
String describeQuery = "DESCRIBE <" + subject + ">";
Expand Down Expand Up @@ -176,7 +174,7 @@ private void exec( UpdateRequest update ) {
**/
public int countTriples(String pid) {
// perform describe query
String describeQuery = "DESCRIBE <" + prefix + pid + ">";
String describeQuery = "DESCRIBE <" + pid + ">";
QueryEngineHTTP qexec = new QueryEngineHTTP( queryBase, describeQuery );
Iterator<Triple> results = qexec.execDescribeTriples();

Expand Down
Expand Up @@ -38,7 +38,6 @@
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.iq80.leveldb.util.FileUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
Expand All @@ -62,8 +61,8 @@ public class IndexerGroupIT {
private static final int SERVER_PORT = Integer.parseInt(System
.getProperty("test.port", "8080"));

private static final String serverAddress = "http://localhost:" +
SERVER_PORT + "/rest/objects/";
private static final String serverAddress = "http://localhost:"
+ SERVER_PORT + "/";

private final PoolingClientConnectionManager connectionManager =
new PoolingClientConnectionManager();
Expand All @@ -83,9 +82,9 @@ private static TextMessage getMessage(String operation,
Abdera abdera = new Abdera();

Entry entry = abdera.newEntry();
entry.setTitle(operation, TEXT)
.setBaseUri("http://localhost:" + SERVER_PORT + "/rest");
entry.setTitle(operation, TEXT).setBaseUri(serverAddress);
entry.addCategory("xsd:string", pid, "fedora-types:pid");
entry.addCategory("xsd:string", "/" + pid, "path");
entry.setContent("contentds");
StringWriter writer = new StringWriter();
entry.writeTo(writer);
Expand Down Expand Up @@ -129,11 +128,11 @@ private void doIndexerGroupUpdateTest(final String pid) throws Exception {

final int expectedTriples = 4;
waitForTriples(expectedTriples, pid);

// triples should exist in the triplestore
assertTrue("Triples should exist",
sparqlIndexer.countTriples(pid) == expectedTriples);
}
sparqlIndexer.countTriples(serverAddress + pid) == expectedTriples);
}

@Test
public void indexerGroupDeleteTest() throws Exception {
Expand All @@ -147,7 +146,7 @@ public void indexerGroupDeleteTest() throws Exception {
final HttpDelete method = new HttpDelete(serverAddress + pid);
final HttpResponse response = client.execute(method);
assertEquals(204, response.getStatusLine().getStatusCode());

// create update message and send to indexer group
Message deleteMessage = getMessage("purgeObject", pid);
indexerGroup.onMessage( deleteMessage );
Expand All @@ -157,7 +156,7 @@ public void indexerGroupDeleteTest() throws Exception {

// two files should exist: one empty and one with data
File[] files = fileSerializerPath.listFiles(filter);

assertNotNull(files);
assertEquals(2, files.length);

Expand All @@ -173,20 +172,17 @@ public void indexerGroupDeleteTest() throws Exception {

final int expectedTriples = 0;
waitForTriples(expectedTriples, pid);

// triples should not exist in the triplestore
assertTrue("Triples should not exist",
sparqlIndexer.countTriples(pid) == expectedTriples);
}

sparqlIndexer.countTriples(serverAddress + pid) == expectedTriples);
}

//Test should fail because IndexGroup.java just uses repositoryURL + pid, not the full path, to retrieve node
@Test
@Ignore
public void indexerGroupUpdateTestingFullPath() throws Exception {
// create update message and send to indexer group
final String pid = "test_pid_10";
final String SUFFIX = "/a/b/c/";
final String SUFFIX = "a/b/c/";

// create dummy object
final HttpPost method = new HttpPost(serverAddress + SUFFIX + pid);
Expand All @@ -205,18 +201,15 @@ public void indexerGroupUpdateTestingFullPath() throws Exception {
assertTrue("Filename doesn't match: " + f.getAbsolutePath(),
f.getName().startsWith(pid));
assertTrue("File size too small: " + f.length(), f.length() > 500);

final int expectedTriples = 4;
waitForTriples(expectedTriples, pid);

waitForTriples(expectedTriples, SUFFIX + pid);
// triples should exist in the triplestore
assertTrue("Triples should exist",
sparqlIndexer.countTriples(pid) == expectedTriples);

sparqlIndexer.countTriples(serverAddress + SUFFIX + pid) == expectedTriples);
}



private void waitForFiles(int expectedFiles, FilenameFilter filter) throws InterruptedException {
long elapsed = 0;
long restingWait = 500;
Expand All @@ -233,7 +226,7 @@ private void waitForFiles(int expectedFiles, FilenameFilter filter) throws Inter

private void waitForTriples(int expectTriples, String pid) throws InterruptedException {
long elapsed = 0;
long restingWait = 500;
long restingWait = 1500;
long maxWait = 15000; // 15 seconds

int count = sparqlIndexer.countTriples(pid);
Expand Down
Expand Up @@ -67,13 +67,13 @@ public class SparqlIndexerIT {
@Test
public void indexerTest() throws Exception {
// add object
sparqlIndexer.update("foo",fooN3);
sparqlIndexer.update(serverAddress + "/foo",fooN3);

waitForTriples(3);

// triples should be present in the triplestore
assertEquals("Triples should be present",
3, sparqlIndexer.countTriples("foo"));
3, sparqlIndexer.countTriples(serverAddress + "/foo"));

// remove object
sparqlIndexer.remove("foo");
Expand Down
Expand Up @@ -10,7 +10,7 @@
<!-- sparql-update indexer -->
<bean id="sparqlUpdate" class="org.fcrepo.indexer.SparqlIndexer">
<!-- base URL for triplestore subjects, PID will be appended -->
<property name="prefix" value="http://localhost:${test.port:8080}/rest/objects/"/>
<property name="prefix" value="http://localhost:${test.port:8080}"/>

<!-- fuseki -->
<property name="queryBase" value="http://localhost:${test.fuseki.port:3030}/test/query"/>
Expand All @@ -36,7 +36,7 @@

<!-- Message Driven POJO (MDP) that manages individual indexers -->
<bean id="indexerGroup" class="org.fcrepo.indexer.IndexerGroup">
<property name="repositoryURL" value="http://localhost:${test.port:8080}/rest/objects/" />
<property name="repositoryURL" value="http://localhost:${test.port:8080}" />
<property name="indexers">
<set>
<ref bean="fileSerializer"/>
Expand Down
Expand Up @@ -7,7 +7,7 @@
<bean id="sparqlUpdate" class="org.fcrepo.indexer.SparqlIndexer">

<!-- base URL for triplestore subjects, PID will be appended -->
<property name="prefix" value="http://${fcrepo.host:localhost}:${fcrepo.port:8080}/rest/"/>
<property name="prefix" value="http://${fcrepo.host:localhost}:${fcrepo.port:8080}/rest"/>

<!-- fuseki -->
<property name="queryBase" value="http://${fuseki.host:localhost}:${fuseki.port:3030}/test/query"/>
Expand All @@ -28,12 +28,12 @@

<!-- file serializer -->
<bean id="fileSerializer" class="org.fcrepo.indexer.FileSerializer">
<property name="path" value="${file.serializer.dir:/tmp/fileSerializer/}"/>
<property name="path" value="${file.serializer.dir:./target/test-classes/fileSerializer/}"/>
</bean>

<!-- Message Driven POJO (MDP) that manages individual indexers -->
<bean id="indexerGroup" class="org.fcrepo.indexer.IndexerGroup">
<property name="repositoryURL" value="http://${fcrepo.host:localhost}:${fcrepo.port:8080}/rest/" />
<property name="repositoryURL" value="http://${fcrepo.host:localhost}:${fcrepo.port:8080}/rest" />
<property name="indexers">
<set>
<ref bean="fileSerializer"/>
Expand Down

0 comments on commit 6b0a785

Please sign in to comment.