Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Add REST API to trigger reindexing
Browse files Browse the repository at this point in the history
- add parameters to specify path to reindex and toggle recursiveness
- add IndexerGroup.reindex() method to reindex all indexable objects in a repository

Resolves: https://www.pivotaltracker.com/story/show/65168574
  • Loading branch information
escowles authored and Andrew Woods committed Apr 13, 2014
1 parent 3cfc5c0 commit 96a637d
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 103 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -14,3 +14,4 @@ ObjectStore/
*~
transaction.log
velocity.log
fcrepo4-data/
Expand Up @@ -18,6 +18,7 @@

import com.google.common.base.Supplier;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.NodeIterator;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.Resource;
import org.apache.http.client.HttpClient;
Expand All @@ -30,6 +31,7 @@
import javax.jms.Message;
import javax.jms.MessageListener;
import java.io.Reader;
import java.util.HashSet;
import java.util.Set;

import static com.google.common.base.Suppliers.memoize;
Expand All @@ -39,6 +41,7 @@
import static com.hp.hpl.jena.vocabulary.RDF.type;
import static java.lang.Integer.MAX_VALUE;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static org.fcrepo.kernel.RdfLexicon.HAS_CHILD;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.slf4j.LoggerFactory.getLogger;

Expand All @@ -61,6 +64,8 @@ public class IndexerGroup implements MessageListener {

private HttpClient httpClient;

private Set<String> reindexed;

/**
* Identifier message header
*/
Expand All @@ -79,6 +84,12 @@ public class IndexerGroup implements MessageListener {
static final String REMOVAL_EVENT_TYPE = REPOSITORY_NAMESPACE
+ EventType.valueOf(NODE_REMOVED).toString();

/**
* Type of event to indicate reindexing.
*/
private static final String REINDEX_EVENT_TYPE = REPOSITORY_NAMESPACE
+ "NODE_REINDEXED";

public static final String INDEXER_NAMESPACE =
"http://fedora.info/definitions/v4/indexing#";

Expand Down Expand Up @@ -186,91 +197,138 @@ public void onMessage(final Message message) {
pid = message.getStringProperty(IDENTIFIER_HEADER_NAME);
}


LOGGER.debug("Discovered pid: {} in message.", pid);
LOGGER.debug("Discovered event type: {} in message.", eventType);

final Boolean removal = REMOVAL_EVENT_TYPE.equals(eventType);
LOGGER.debug("It is {} that this is a removal operation.", removal);
final String uri = getRepositoryURL() + pid;
final Supplier<Model> rdfr =
memoize(new RdfRetriever(uri, httpClient));
final Supplier<NamedFields> nfr =
memoize(new NamedFieldsRetriever(uri, httpClient, rdfr));
Boolean indexable = false;

if (!removal) {
final Model rdf = rdfr.get();
if (rdf.contains(createResource(uri), type, INDEXABLE_MIXIN)) {
LOGGER.debug("Resource: {} retrieved with indexable type.",
pid);
indexable = true;
} else {
LOGGER.debug(
"Resource: {} retrieved without indexable type.",
pid);
}
index( getRepositoryURL() + pid, eventType );
} catch (final JMSException e) {
LOGGER.error("Error processing JMS event!", e);
}
}

/**
* Index a resource.
**/
private void index( final String uri, final String eventType ) {
final Boolean removal = REMOVAL_EVENT_TYPE.equals(eventType);
LOGGER.debug("It is {} that this is a removal operation.", removal);
final Supplier<Model> rdfr =
memoize(new RdfRetriever(uri, httpClient));
final Supplier<NamedFields> nfr =
memoize(new NamedFieldsRetriever(uri, httpClient, rdfr));
Boolean indexable = false;

if (!removal) {
final Model rdf = rdfr.get();
if (rdf.contains(createResource(uri), type, INDEXABLE_MIXIN)) {
LOGGER.debug("Resource: {} retrieved with indexable type.",
uri);
indexable = true;
} else {
LOGGER.debug(
"Resource: {} retrieved without indexable type.",
uri);
}
}

for (final Indexer<Object> indexer : getIndexers()) {
LOGGER.debug("Operating for indexer: {}", indexer);
Boolean hasContent = false;
Object content = EMPTY_CONTENT;
if (!removal && indexable) {
switch (indexer.getIndexerType()) {
case NAMEDFIELDS:
LOGGER.debug(
"Retrieving named fields for: {}, (may be cached) to index to {}...",
pid, indexer);
try {
content = nfr.get();
hasContent = true;
} catch (final AbsentTransformPropertyException e) {
LOGGER.error("Failed to retrieve indexable content:"
+ "could not find transform property!");
hasContent = false;
}
break;
case RDF:
LOGGER.debug(
"Retrieving RDF for: {}, (may be cached) to index to {}...",
pid, indexer);
content = rdfr.get();
hasContent = true;
break;
default:
for (final Indexer<Object> indexer : getIndexers()) {
LOGGER.debug("Operating for indexer: {}", indexer);
Boolean hasContent = false;
Object content = EMPTY_CONTENT;
if (!removal && indexable) {
switch (indexer.getIndexerType()) {
case NAMEDFIELDS:
LOGGER.debug(
"Retrieving named fields for: {}, (may be cached) to index to {}...",
uri, indexer);
try {
content = nfr.get();
hasContent = true;
break;
}
} catch (final AbsentTransformPropertyException e) {
LOGGER.error("Failed to retrieve indexable content:"
+ "could not find transform property!");
hasContent = false;
}
break;
case RDF:
LOGGER.debug(
"Retrieving RDF for: {}, (may be cached) to index to {}...",
uri, indexer);
content = rdfr.get();
hasContent = true;
break;
default:
hasContent = true;
break;
}
}

try {
if (removal) {
try {
if (removal) {
LOGGER.debug(
"Executing removal of: {} to indexer: {}...",
uri, indexer);
indexer.remove(uri);
} else {
if (hasContent) {
LOGGER.debug(
"Executing removal of: {} to indexer: {}...",
pid, indexer);
indexer.remove(uri);
} else {
if (hasContent) {
LOGGER.debug(
"Executing update of: {} to indexer: {}...",
pid, indexer);
indexer.update(uri, content);
} else if (indexable) {
LOGGER.error(
"Received update for: {} but was unable to retrieve "
+ "content for update to indexer: {}!",
pid, indexer);
}
"Executing update of: {} to indexer: {}...",
uri, indexer);
indexer.update(uri, content);
} else if (indexable) {
LOGGER.error(
"Received update for: {} but was unable to retrieve "
+ "content for update to indexer: {}!",
uri, indexer);
}
} catch (final Exception e) {
LOGGER.error("Error indexing {}: {}!", pid, e);
}
} catch (final Exception e) {
LOGGER.error("Error indexing {}: {}!", uri, e);
}

} catch (final JMSException e) {
LOGGER.error("Error processing JMS event!", e);
}
}

/**
* Reindex all content in the repository by retrieving the root resource
* and recursively reindexing all indexable child resources.
**/
public void reindex() {
reindexed = new HashSet<>();
reindexURI( getRepositoryURL(), true );
}

/**
* Reindex a resource (and optionally all of its children).
* @param uri The resource URI to reindex.
* @param recursive If true, also recursively reindex all children.
**/
public void reindex( final String uri, boolean recursive ) {
reindexed = new HashSet<>();
reindexURI( uri, recursive );
}

private void reindexURI( final String uri, boolean recursive ) {
LOGGER.debug("Reindexing {}, recursive: {}", uri, recursive);
if ( !reindexed.contains(uri) ) {
// index() will check for indexable mixin
index( uri, REINDEX_EVENT_TYPE );
}

// prevent infinite recursion
reindexed.add( uri );

// check for children (rdf should be cached...)
if ( recursive ) {
final Supplier<Model> rdfr
= memoize(new RdfRetriever(uri, httpClient));
final Model model = rdfr.get();
NodeIterator children = model.listObjectsOfProperty( HAS_CHILD );
while ( children.hasNext() ) {
final String child = children.nextNode().asResource().getURI();
if ( !reindexed.contains(child) ) {
reindexURI( child, true );
}
}
}
}
}
Expand Up @@ -40,6 +40,7 @@
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -121,6 +122,14 @@ public void testRDFIndexablePropertyUpdateMessage() throws Exception {
verify(indexer, atLeastOnce()).update(anyString(), any());
}

@Test
public void testReindex() throws Exception {
mockContent("", true, null);
when(indexer.getIndexerType()).thenReturn(Indexer.IndexerType.RDF);
indexerGroup.reindex();
verify(indexer,atLeastOnce()).update(eq(repoUrl), any());
}

private Message createUnindexableMessage(String eventType, String identifier) throws Exception {
return createMockMessage(false, eventType, identifier, false, null, false);
}
Expand Down Expand Up @@ -154,17 +163,20 @@ private Message createMockMessage(boolean jmsExceptionOnGetMessage, String event
}
if (identifier != null) {
when(m.getStringProperty(IndexerGroup.IDENTIFIER_HEADER_NAME)).thenReturn(identifier);
final HttpResponse r = mock(HttpResponse.class);
final StatusLine s = mock(StatusLine.class);
when(s.getStatusCode()).thenReturn(HttpStatus.SC_OK);
when(r.getStatusLine()).thenReturn(s);
final HttpEntity e = mock(HttpEntity.class);
when(e.getContent()).thenReturn(new ByteArrayInputStream(getIndexableTriples(property ? parentId(identifier) : identifier, indexable, indexerName).getBytes("UTF-8")));
when(r.getEntity()).thenReturn(e);
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(r);
mockContent(property ? parentId(identifier) : identifier, indexable, indexerName);
}
return m;
}
private void mockContent(String identifier, boolean indexable, String indexerName) throws Exception {
final HttpResponse r = mock(HttpResponse.class);
final StatusLine s = mock(StatusLine.class);
when(s.getStatusCode()).thenReturn(HttpStatus.SC_OK);
when(r.getStatusLine()).thenReturn(s);
final HttpEntity e = mock(HttpEntity.class);
when(e.getContent()).thenReturn(new ByteArrayInputStream(getIndexableTriples(identifier, indexable, indexerName).getBytes("UTF-8")));
when(r.getEntity()).thenReturn(e);
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(r);
}

private String parentId(String identifier) {
return identifier.substring(0, identifier.lastIndexOf('/'));
Expand Down
Expand Up @@ -42,6 +42,12 @@ public class TestIndexer extends SynchIndexer<NoContent, Boolean> {
private final Set<String> updates = new HashSet<>();
private final Set<String> removes = new HashSet<>();

public void clear() {
LOGGER.debug("Clearing updates and removes");
updates.clear();
removes.clear();
}

@Override
public Callable<Boolean> updateSynch(final String identifier,
final NoContent content) {
Expand Down
Expand Up @@ -118,4 +118,35 @@ public void testIndexerGroupDelete() throws Exception {

}

@Test
public void testIndexerGroupReindex() throws Exception {
// create sample records
final String[] pids = { "a1", "a1/b1", "a1/b2", "a1/b1/c1" };
for ( String pid : pids ) {
doIndexerGroupUpdateTest(serverAddress + pid);
}

// clear test indexer lists of updated records
testIndexer.clear();

// reindex everything
indexerGroup.reindex();

// records should be reindexed
synchronized (testIndexer) {
for ( String pid : pids ) {
final String uri = serverAddress + pid;
final Long start = currentTimeMillis();
while (!testIndexer.receivedUpdate(uri)
&& (currentTimeMillis() - start < TIMEOUT)) {
LOGGER.debug("Waiting for " + uri);
testIndexer.wait(1000);
}

assertTrue("Record should have been reindexed: " + uri,
testIndexer.receivedUpdate(uri));
}
}
}

}
Expand Up @@ -77,7 +77,7 @@ public void indexerTest() throws Exception {
"SELECT ?p \n" +
"WHERE \n" +
"{ ?p fcrepo:hasParent ?c }";
assertEquals("Triple should return from search!", 1, countQueryTriples(sparqlQuery));
assertTrue("Triple should return from search!", countQueryTriples(sparqlQuery) > 0 );

// remove object
sparqlIndexer.remove(uri);
Expand Down

0 comments on commit 96a637d

Please sign in to comment.