Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Handle datastream updates correctly
Browse files Browse the repository at this point in the history
- RdfRetriever uses HEAD to find descriptions of non-RDF resources
- Update dependencies so fcrepo-http-commons is test-scoped and required modules are explicitly included
- Add inject dependency for Elasticsearch build

Resolves: https://www.pivotaltracker.com/story/show/81198770
  • Loading branch information
escowles authored and Andrew Woods committed Oct 24, 2014
1 parent 431bc8a commit 697ffcb
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 35 deletions.
13 changes: 13 additions & 0 deletions fcrepo-message-consumer-core/pom.xml
Expand Up @@ -94,9 +94,22 @@
<artifactId>logback-classic</artifactId>
</dependency>

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>

<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>

<dependency>
<groupId>org.fcrepo</groupId>
<artifactId>fcrepo-http-commons</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>lucene-core</artifactId>
Expand Down
Expand Up @@ -57,7 +57,9 @@
import static java.lang.Integer.MAX_VALUE;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.fcrepo.jcr.FedoraJcrTypes.FCR_METADATA;
import static org.fcrepo.kernel.RdfLexicon.CONTAINS;
import static org.fcrepo.kernel.RdfLexicon.HAS_PARENT;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.fcrepo.kernel.RdfLexicon.RESTAPI_NAMESPACE;
import static org.slf4j.LoggerFactory.getLogger;
Expand Down Expand Up @@ -275,22 +277,24 @@ private void index( final URI uri, final String eventType ) throws URISyntaxExce

if (!removal) {
final Model rdf = rdfr.get();
if (rdf.contains(createResource(uri.toString()), type, INDEXABLE_MIXIN)) {
LOGGER.debug("Resource: {} retrieved with indexable type.",
uri);
if (rdf.contains(createResource(uri.toString()), type, INDEXABLE_MIXIN)
|| rdf.contains(createResource(uri.toString() + "/" + FCR_METADATA), type, INDEXABLE_MIXIN)) {
LOGGER.debug("Resource: {} retrieved with indexable type.", uri);
indexable = true;
} else {
LOGGER.debug(
"Resource: {} retrieved without indexable type.",
uri);
LOGGER.debug("Resource: {} retrieved without indexable type.", uri);
}

// if this is a datastream, also index the parent object
if (rdf.contains(createResource(uri.toString()), type, DATASTREAM_TYPE)
final Resource subj = createResource(uri.toString());
if (rdf.contains(subj, type, DATASTREAM_TYPE)
&& uri.toString().indexOf("/fedora:system/") == -1 ) {
final URI parent = new URI(uri.toString().substring(0, uri.toString().lastIndexOf("/")));
LOGGER.info("Datastream found, also indexing parent {}", parent);
index( parent, "NODE_UPDATED");
final NodeIterator parents = rdf.listObjectsOfProperty(subj, HAS_PARENT);
if ( parents.hasNext() ) {
final String parent = parents.nextNode().asResource().getURI();
LOGGER.info("Datastream found, also indexing parent {}", parent);
index( new URI(parent), "NODE_UPDATED");
}
}
}

Expand Down Expand Up @@ -380,8 +384,7 @@ private void reindexURI( final URI uri, final boolean recursive ) throws URISynt

// check for children (rdf should be cached...)
if ( recursive ) {
final Supplier<Model> rdfr
= memoize(new RdfRetriever(uri, httpClient(uri.toString())));
final Supplier<Model> rdfr = memoize(new RdfRetriever(uri, httpClient(uri.toString())));
final Model model = rdfr.get();
final NodeIterator children = model.listObjectsOfProperty( CONTAINS );
while ( children.hasNext() ) {
Expand Down
Expand Up @@ -26,10 +26,14 @@
import java.io.Reader;
import java.net.URI;

import javax.ws.rs.core.Link;

import org.apache.http.Header;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpUriRequest;
import org.slf4j.Logger;

Expand Down Expand Up @@ -65,10 +69,29 @@ public RdfRetriever(final URI identifier, final HttpClient client) {

@Override
public Model get() {
final HttpUriRequest request = new HttpGet(identifier);
request.addHeader("Accept", RDF_SERIALIZATION);
LOGGER.debug("Retrieving RDF content from: {}...", request.getURI());

try {
// make an initial HEAD request and check Link headers for descriptions located elsewhere
final HttpHead headRequest = new HttpHead(identifier);
final HttpResponse headResponse = httpClient.execute(headRequest);
URI descriptionURI = null;
final Header[] links = headResponse.getHeaders("Link");
if ( links != null ) {
for ( Header h : headResponse.getHeaders("Link") ) {
final Link link = Link.valueOf(h.getValue());
if ( link.getRel().equals("describedby") ) {
descriptionURI = link.getUri();
LOGGER.debug("Using URI from Link header: {}", descriptionURI);
}
}
}
if ( descriptionURI == null ) {
descriptionURI = identifier;
}

final HttpUriRequest request = new HttpGet(descriptionURI);
request.addHeader("Accept", RDF_SERIALIZATION);
LOGGER.debug("Retrieving RDF content from: {}...", request.getURI());
final HttpResponse response = httpClient.execute(request);
if (response.getStatusLine().getStatusCode() == SC_OK) {
try (
Expand Down
Expand Up @@ -16,16 +16,19 @@
package org.fcrepo.indexer.integration;

import static java.lang.System.currentTimeMillis;
import static java.util.UUID.randomUUID;
import static org.apache.jena.riot.WebContent.contentTypeN3Alt1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.slf4j.LoggerFactory.getLogger;

import java.net.URI;
import javax.inject.Inject;
import javax.ws.rs.core.Link;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.fcrepo.indexer.IndexerGroup;
Expand Down Expand Up @@ -61,42 +64,48 @@ public class IndexerGroupIT extends IndexingIT {
// https://www.pivotaltracker.com/story/show/72709646
@Test
public void testIndexerGroupUpdate() throws Exception {
doIndexerGroupUpdateTest(serverAddress + "updateTestPid");
final String uri = serverAddress + randomUUID();
createIndexableObject(uri);
shouldBeIndexed(uri);
}
private void doIndexerGroupUpdateTest(final String uri) throws Exception {
final HttpPut createRequest = new HttpPut(uri);
private void createIndexableObject(final String uri) throws Exception {
final String objectRdf =
"@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> ."
+ "@prefix indexing:<http://fedora.info/definitions/v4/indexing#>."
+ "<" + uri + "> rdf:type <http://fedora.info/definitions/v4/indexing#indexable> ;"
+ "indexing:hasIndexingTransformation \"default\".";

createRequest.setEntity(new StringEntity(objectRdf));
createRequest.addHeader("Content-Type", contentTypeN3Alt1);

createResource(uri, objectRdf, contentTypeN3Alt1);
LOGGER.debug("Created object at: {}", uri);
}
private HttpResponse createResource(final String uri, final String content, final String contentType)
throws Exception {
final HttpPut createRequest = new HttpPut(uri);
createRequest.setEntity(new StringEntity(content));
createRequest.addHeader("Content-Type", contentType);
final HttpResponse response = client.execute(createRequest);
assertEquals(201, response.getStatusLine().getStatusCode());
LOGGER.debug("Created object at: {}", uri);

return response;
}
private void shouldBeIndexed(final String uri) throws Exception {
final Long start = currentTimeMillis();
synchronized (testIndexer) {
while (!testIndexer.receivedUpdate(new URI(uri)) && (currentTimeMillis() - start < TIMEOUT)) {
LOGGER.debug("Waiting for next notification from TestIndexer...");
testIndexer.wait(1000);
}
}
assertTrue("Test indexer should have received an update message for " + uri + "!", testIndexer
.receivedUpdate(new URI(uri)));
assertTrue("Test indexer should have received an update message for " + uri + "!",
testIndexer.receivedUpdate(new URI(uri)));
LOGGER.debug("Received update at test indexer for identifier: {}", uri);

}

@Test
public void testIndexerGroupDelete() throws Exception {

final String uri = serverAddress + "removeTestPid";
createIndexableObject(uri);

doIndexerGroupUpdateTest(uri);
// delete dummy object
final HttpDelete method = new HttpDelete(uri);
final HttpResponse response = client.execute(method);
Expand Down Expand Up @@ -124,8 +133,9 @@ public void testIndexerGroupReindex() throws Exception {
// create sample records
final String[] pids = { "a1", "a1/b1", "a1/b2", "a1/b1/c1" };
for ( String pid : pids ) {
doIndexerGroupUpdateTest(serverAddress + pid);
createIndexableObject(serverAddress + pid);
}
shouldBeIndexed(serverAddress + "a1/b1/c1");

// clear test indexer lists of updated records
testIndexer.clear();
Expand All @@ -149,4 +159,24 @@ public void testIndexerGroupReindex() throws Exception {
}
}

@Test
public void testDatastreamIndexing() throws Exception {
// create object with datastream
final String uri = serverAddress + randomUUID();
createIndexableObject(uri);
final HttpResponse response = createResource(uri + "/ds1", "test datastream content", "text/plain");

// make datastream indexable
final URI descURI = Link.valueOf(response.getFirstHeader("Link").getValue()).getUri();
final HttpPatch patch = new HttpPatch(descURI);
final String sparqlUpdate = "insert { <> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> "
+ "<http://fedora.info/definitions/v4/indexing#indexable> } where {}";
patch.setEntity(new StringEntity(sparqlUpdate));
patch.addHeader("Content-Type", "application/sparql-update");
assertEquals(204, client.execute(patch).getStatusLine().getStatusCode());

// make sure it was indexed
shouldBeIndexed(uri + "/ds1");
}

}
Expand Up @@ -65,10 +65,8 @@ public void testRemoveSynch() throws URISyntaxException {
testIndexer.removeSynch(new URI("info://obj-0"));

final String cmd0 = "DELETE WHERE { <" + createURI("info://obj-0") + "> ?p ?o }";
final String cmd1 = "DELETE WHERE { <" + createURI("info://obj-0/fcr:content") + "> ?p ?o }";
final String cmd2 = "DELETE WHERE { <" + createURI("info://obj-0/child") + "> ?p ?o }";
Mockito.verify(updateRequest).add(cmd0);
Mockito.verify(updateRequest).add(cmd1);
Mockito.verify(updateRequest).add(cmd2);
}

Expand All @@ -92,9 +90,6 @@ private class MockSparqlIndexer extends SparqlIndexer {

protected QueryEngineHTTP buildQueryEngineHTTP(final String describeQuery) {
final Triple t0 = new Triple(createURI("info://sub"), createLiteral("p"), createURI("info://obj-0"));
final Triple t2 = new Triple(createURI("info://sub"),
createLiteral("p"),
createURI("info://obj-0/fcr:content"));
final Triple t1 = new Triple(createURI("info://sub"), createLiteral("p"), createURI("info://obj-1"));
final Triple t3 = new Triple(createURI("info://obj-0/child"),
createLiteral("p"),
Expand All @@ -103,7 +98,6 @@ protected QueryEngineHTTP buildQueryEngineHTTP(final String describeQuery) {
final Set<Triple> triples = new HashSet<>();
triples.add(t0);
triples.add(t1);
triples.add(t2);
triples.add(t3);

Mockito.when(queryEngineHTTP.execDescribeTriples()).thenReturn(triples.iterator());
Expand Down
5 changes: 5 additions & 0 deletions fcrepo-message-consumer-elasticsearch/pom.xml
Expand Up @@ -62,6 +62,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -165,7 +165,7 @@
<dependency>
<groupId>org.apache.jena</groupId>
<artifactId>jena-fuseki</artifactId>
<version>1.1.0</version>
<version>1.1.1</version>
<scope>test</scope>
</dependency>

Expand Down

0 comments on commit 697ffcb

Please sign in to comment.