Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Indexing workflow complete, testing blocked on https://www.pivotaltra…
Browse files Browse the repository at this point in the history
  • Loading branch information
ajs6f committed Dec 8, 2013
1 parent 05ad676 commit f2f1766
Show file tree
Hide file tree
Showing 16 changed files with 428 additions and 82 deletions.
12 changes: 12 additions & 0 deletions fcrepo-jms-indexer-core/pom.xml
Expand Up @@ -109,11 +109,23 @@
<groupId>org.apache.jena</groupId>
<artifactId>jena-fuseki</artifactId>
<scope>test</scope>
<!-- <exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions> -->
</dependency>
<!-- Start of Solr Indexer libs -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>
<groupId>org.apache.solr</groupId>
Expand Down
Expand Up @@ -18,6 +18,8 @@
*/
package org.fcrepo.indexer;

import static org.slf4j.LoggerFactory.getLogger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -26,6 +28,7 @@
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.slf4j.Logger;


/**
Expand All @@ -40,20 +43,26 @@ public abstract class CachingRetriever implements IndexableContentRetriever {

private byte[] cache;

private static final Logger LOGGER = getLogger(CachingRetriever.class);


/* (non-Javadoc)
* @see java.util.concurrent.Callable#call()
*/
@Override
public InputStream call() throws ClientProtocolException, IOException,
AbsentTransformPropertyException, HttpException {
if (cached) {
LOGGER.debug("Returning cached content...");
return new ByteArrayInputStream(cache);
}
LOGGER.debug("Retrieving uncached content...");
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
retrieveHttpResponse().getEntity().writeTo(out);
cache = out.toByteArray();
}
cached = true;
LOGGER.debug("Retrieved cache-able content:\n{}", new String(cache));
return new ByteArrayInputStream(cache);
}

Expand Down
Expand Up @@ -17,7 +17,11 @@
package org.fcrepo.indexer;

import static com.google.common.base.Throwables.propagate;
import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
import static com.hp.hpl.jena.rdf.model.ResourceFactory.createProperty;
import static com.hp.hpl.jena.rdf.model.ResourceFactory.createResource;
import static com.hp.hpl.jena.vocabulary.RDF.type;
import static java.lang.Integer.MAX_VALUE;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.slf4j.LoggerFactory.getLogger;
Expand All @@ -36,18 +40,19 @@
import org.apache.http.HttpException;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.fcrepo.kernel.utils.EventType;
import org.slf4j.Logger;

import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.Property;

import com.hp.hpl.jena.rdf.model.Resource;

/**
* MessageListener implementation that retrieves objects from the repository and
* invokes one or more indexers to index the content.
*
* documentation:
* invokes one or more indexers to index the content. documentation:
* https://wiki.duraspace.org/display/FF/Design+-+Messaging+for+Workflow
*
* @author Esmé Cowles
* @author ajs6f
* @date Aug 19 2013
Expand All @@ -62,7 +67,6 @@ public class IndexerGroup implements MessageListener {

private final HttpClient httpClient;


/**
* Identifier message header
*/
Expand All @@ -81,11 +85,21 @@ public class IndexerGroup implements MessageListener {
private static final String REMOVAL_EVENT_TYPE = REPOSITORY_NAMESPACE
+ EventType.valueOf(NODE_REMOVED).toString();

public static final String INDEXER_NAMESPACE =
"http://fedora.info/definitions/v4/indexing#";

/**
* Indicates the transformation to use with this resource to derive indexing information.
* Indicates the transformation to use with this resource to derive indexing
* information.
*/
public static final Property INDEXING_TRANSFORM_PREDICATE =
createProperty(REPOSITORY_NAMESPACE + "hasIndexingTransformation");
createProperty(INDEXER_NAMESPACE + "hasIndexingTransformation");

/**
* Indicates that a resource is indexable.
*/
public static final Resource INDEXABLE_MIXIN =
createResource(INDEXER_NAMESPACE + "indexable");

private static final Reader EMPTY_CONTENT = null;

Expand All @@ -94,7 +108,11 @@ public class IndexerGroup implements MessageListener {
**/
public IndexerGroup() {
LOGGER.debug("Creating IndexerGroup: {}", this);
this.httpClient = new DefaultHttpClient();
final PoolingClientConnectionManager connMann =
new PoolingClientConnectionManager();
connMann.setMaxTotal(MAX_VALUE);
connMann.setDefaultMaxPerRoute(MAX_VALUE);
this.httpClient = new DefaultHttpClient(connMann);
}

/**
Expand Down Expand Up @@ -139,75 +157,106 @@ public void onMessage(final Message message) {
}
try {
// get pid and eventType from message
final String pid = message.getStringProperty(IDENTIFIER_HEADER_NAME);
final String eventType = message.getStringProperty(EVENT_TYPE_HEADER_NAME);
final String pid =
message.getStringProperty(IDENTIFIER_HEADER_NAME);
final String eventType =
message.getStringProperty(EVENT_TYPE_HEADER_NAME);

LOGGER.debug("Discovered pid: {} in message.", pid);
LOGGER.debug("Discovered event type: {} in message.", eventType);

final Boolean removal = REMOVAL_EVENT_TYPE.equals(eventType);
LOGGER.debug("It is {} that this is a removal operation.", removal);

final RdfRetriever rdfr =
new RdfRetriever(getRepositoryURL() + pid, httpClient);
final String uri = getRepositoryURL() + pid;
final RdfRetriever rdfr = new RdfRetriever(uri, httpClient);
final NamedFieldsRetriever nfr =
new NamedFieldsRetriever(getRepositoryURL() + pid, httpClient,
rdfr);

for (final Indexer indexer : indexers) {
LOGGER.debug("Operating for indexer: {}", indexer);
Boolean hasContent = false;
Reader content = EMPTY_CONTENT;
if (!removal) {
switch (indexer.getIndexerType()) {
case NAMEDFIELDS:
try (final InputStream result = nfr.call()) {
content = new InputStreamReader(result);
hasContent = true;
} catch (final IOException | HttpException e) {
LOGGER.error(
"Could not retrieve content for update!",
e);
hasContent = false;
} catch (final AbsentTransformPropertyException e) {
hasContent = false;
}
case RDF:
try (final InputStream result = rdfr.call()) {
content = new InputStreamReader(result);
new NamedFieldsRetriever(uri, httpClient, rdfr);
final Model rdf =
createDefaultModel().read(rdfr.call(), null, "N3");

if (rdf.contains(createResource(uri), type, INDEXABLE_MIXIN)) {
LOGGER.debug("Discovered indexable type on this resource.");
for (final Indexer indexer : getIndexers()) {
LOGGER.debug("Operating for indexer: {}", indexer);
Boolean hasContent = false;
Reader content = EMPTY_CONTENT;
if (!removal) {
switch (indexer.getIndexerType()) {
case NAMEDFIELDS:
LOGGER.debug(
"Retrieving named fields for: {}, (may be cached) to index to {}...",
pid, indexer);
try (final InputStream result = nfr.call()) {
content = new InputStreamReader(result);
hasContent = true;
} catch (final IOException | HttpException e) {
LOGGER.error(
"Could not retrieve content for update of: {} to indexer {}!",
pid, indexer);
LOGGER.error("with exception:", e);
hasContent = false;
} catch (final AbsentTransformPropertyException e) {
hasContent = false;
}
break;
case RDF:
LOGGER.debug(
"Retrieving RDF for: {}, (may be cached) to index to {}...",
pid, indexer);
try (final InputStream result = rdfr.call()) {
content = new InputStreamReader(result);
hasContent = true;
} catch (IOException | HttpException e) {
LOGGER.error(
"Could not retrieve content for update of: {} to indexer {}!",
pid, indexer);
LOGGER.error("with exception:", e);
hasContent = false;
} catch (final AbsentTransformPropertyException e1) {
hasContent = false;
}
break;
default:
content =
new StringReader(
"Default content for update: "
+ pid);
hasContent = true;
} catch (IOException | HttpException e) {
LOGGER.error(
"Could not retrieve content for update!",
e);
hasContent = false;
} catch (final AbsentTransformPropertyException e1) {
hasContent = false;
}
default:
content = new StringReader(pid);
hasContent = true;
break;
}
}
}

try {
if (removal) {
indexer.remove(pid);
} else {
if (hasContent) {
indexer.update(pid, content);
try {
if (removal) {
LOGGER.debug(
"Executing removal of: {} to indexer: {}...",
pid, indexer);
indexer.remove(pid);
} else {
LOGGER.error(
"Received update for {} but was unable to retrieve content for update!",
pid);
if (hasContent) {
LOGGER.debug(
"Executing update of: {} to indexer: {}...",
pid, indexer);
indexer.update(pid, content);
} else {
LOGGER.error(
"Received update for: {} but was unable to retrieve "
+ "content for update to indexer: {}!",
pid, indexer);
}
}
} catch (final Exception e) {
LOGGER.error("Error indexing {}: {}!", pid, e);
}
} catch (final Exception e) {
LOGGER.error("Error indexing {}: {}!", pid, e);
}
} else {
LOGGER.info("Resource retrieved without indexable type. Will not index.");
}
} catch (final JMSException e) {
} catch (final JMSException | IOException | HttpException e) {
LOGGER.error("Error processing JMS event!", e);
} catch (final AbsentTransformPropertyException e2) {
// cannot be thrown here: simply an artifact of Java's crappy type
// system
}
}

Expand Down
Expand Up @@ -65,8 +65,11 @@ public NamedFieldsRetriever(final String uri, final HttpClient client,
public HttpResponse retrieveHttpResponse() throws AbsentTransformPropertyException,
ClientProtocolException, IOException, HttpException {
LOGGER.debug("Retrieving RDF representation from: {}", uri);
final Model rdf = createDefaultModel().read(rdfr.call(), null);
final Model rdf = createDefaultModel().read(rdfr.call(), null, "N3");
if (!rdf.contains(createResource(uri), INDEXING_TRANSFORM_PREDICATE)) {
LOGGER.info(
"Found no property locating LDPath transform for: {}, will not retrieve transformed content.",
uri);
throw new AbsentTransformPropertyException(uri);
}
final RDFNode indexingTransform =
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.fcrepo.indexer;

import static org.apache.http.HttpStatus.SC_OK;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;

Expand All @@ -26,6 +27,8 @@
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.jena.riot.WebContent;
import org.slf4j.Logger;

/**
* Retrieves RDF representations of resources for storage in a triplestore.
Expand All @@ -37,12 +40,15 @@
*/
public class RdfRetriever extends CachingRetriever {

private static final String RDF_SERIALIZATION = "application/rdf+xml";
private static final String RDF_SERIALIZATION = WebContent.contentTypeN3;

private final String identifier;

private final HttpClient httpClient;

private static final Logger LOGGER = getLogger(RdfRetriever.class);


/**
* @param identifier
* @param client
Expand All @@ -57,11 +63,12 @@ public HttpResponse retrieveHttpResponse() throws ClientProtocolException, IOExc
HttpException {
final HttpUriRequest request = new HttpGet(identifier);
request.addHeader("Accept", RDF_SERIALIZATION);
LOGGER.debug("Retrieving RDF content from: {}...", request.getURI());
final HttpResponse response = httpClient.execute(request);
if (response.getStatusLine().getStatusCode() == SC_OK) {
return response;
} else {
throw new HttpException(response.getStatusLine().getReasonPhrase());
throw new HttpException(response.getStatusLine().toString());
}
}

Expand Down
Expand Up @@ -50,6 +50,9 @@
*/
public class SolrIndexer implements Indexer {

public static final String CONFIGURATION_FOLDER =
"fedora:system/fedora:transform/fedora:ldpath/";

private final SolrServer server;

private static final Logger LOGGER = getLogger(SolrIndexer.class);
Expand Down Expand Up @@ -138,9 +141,11 @@ public UpdateResponse call() throws Exception {
}));
}

private static <T> ListenableFuture<T> run(
final ListenableFutureTask<T> task) {
task.run();
private <T> ListenableFuture<T> run(final ListenableFutureTask<T> task) {
synchronized (this) {
task.run();
notifyAll();
}
return task;
}

Expand Down

0 comments on commit f2f1766

Please sign in to comment.