Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
One successful round-trip system test available using default transfo…
Browse files Browse the repository at this point in the history
…rm, still waiting on https://www.pivotaltracker.com/story/show/61942638 to do custom examples
  • Loading branch information
ajs6f committed Dec 8, 2013
1 parent f2f1766 commit 7abf314
Show file tree
Hide file tree
Showing 17 changed files with 451 additions and 284 deletions.
12 changes: 12 additions & 0 deletions fcrepo-jms-indexer-core/pom.xml
Expand Up @@ -88,10 +88,22 @@
<artifactId>fcrepo-http-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.fcrepo</groupId>
<artifactId>fcrepo-transform</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.fcrepo</groupId>
<artifactId>fcrepo-kernel</artifactId>
</dependency>


<dependency>
<groupId>org.fcrepo</groupId>
Expand Down
Expand Up @@ -171,87 +171,95 @@ public void onMessage(final Message message) {
final RdfRetriever rdfr = new RdfRetriever(uri, httpClient);
final NamedFieldsRetriever nfr =
new NamedFieldsRetriever(uri, httpClient, rdfr);
final Model rdf =
createDefaultModel().read(rdfr.call(), null, "N3");
Boolean indexable = false;

if (rdf.contains(createResource(uri), type, INDEXABLE_MIXIN)) {
LOGGER.debug("Discovered indexable type on this resource.");
for (final Indexer indexer : getIndexers()) {
LOGGER.debug("Operating for indexer: {}", indexer);
Boolean hasContent = false;
Reader content = EMPTY_CONTENT;
if (!removal) {
switch (indexer.getIndexerType()) {
case NAMEDFIELDS:
LOGGER.debug(
"Retrieving named fields for: {}, (may be cached) to index to {}...",
pid, indexer);
try (final InputStream result = nfr.call()) {
content = new InputStreamReader(result);
hasContent = true;
} catch (final IOException | HttpException e) {
LOGGER.error(
"Could not retrieve content for update of: {} to indexer {}!",
pid, indexer);
LOGGER.error("with exception:", e);
hasContent = false;
} catch (final AbsentTransformPropertyException e) {
hasContent = false;
}
break;
case RDF:
LOGGER.debug(
"Retrieving RDF for: {}, (may be cached) to index to {}...",
pid, indexer);
try (final InputStream result = rdfr.call()) {
content = new InputStreamReader(result);
hasContent = true;
} catch (IOException | HttpException e) {
LOGGER.error(
"Could not retrieve content for update of: {} to indexer {}!",
pid, indexer);
LOGGER.error("with exception:", e);
hasContent = false;
} catch (final AbsentTransformPropertyException e1) {
hasContent = false;
}
break;
default:
content =
new StringReader(
"Default content for update: "
+ pid);
hasContent = true;
break;
}
}
if (!removal) {
final Model rdf =
createDefaultModel().read(rdfr.call(), null, "N3");
if (rdf.contains(createResource(uri), type, INDEXABLE_MIXIN)) {
LOGGER.debug("Resource: {} retrieved with indexable type.",
pid);
indexable = true;
} else {
LOGGER.debug(
"Resource: {} retrieved without indexable type.",
pid);
}
}

try {
if (removal) {
for (final Indexer indexer : getIndexers()) {
LOGGER.debug("Operating for indexer: {}", indexer);
Boolean hasContent = false;
Reader content = EMPTY_CONTENT;
if (!removal && indexable) {
switch (indexer.getIndexerType()) {
case NAMEDFIELDS:
LOGGER.debug(
"Executing removal of: {} to indexer: {}...",
"Retrieving named fields for: {}, (may be cached) to index to {}...",
pid, indexer);
indexer.remove(pid);
} else {
if (hasContent) {
LOGGER.debug(
"Executing update of: {} to indexer: {}...",
try (final InputStream result = nfr.call()) {
content = new InputStreamReader(result);
hasContent = true;
} catch (final IOException | HttpException e) {
LOGGER.error(
"Could not retrieve content for update of: {} to indexer {}!",
pid, indexer);
indexer.update(pid, content);
} else {
LOGGER.error("with exception:", e);
hasContent = false;
} catch (final AbsentTransformPropertyException e) {
hasContent = false;
}
break;
case RDF:
LOGGER.debug(
"Retrieving RDF for: {}, (may be cached) to index to {}...",
pid, indexer);
try (final InputStream result = rdfr.call()) {
content = new InputStreamReader(result);
hasContent = true;
} catch (IOException | HttpException e) {
LOGGER.error(
"Received update for: {} but was unable to retrieve "
+ "content for update to indexer: {}!",
"Could not retrieve content for update of: {} to indexer {}!",
pid, indexer);
LOGGER.error("with exception:", e);
hasContent = false;
} catch (final AbsentTransformPropertyException e1) {
hasContent = false;
}
break;
default:
content =
new StringReader("Default content for update: "
+ pid);
hasContent = true;
break;
}
}

try {
if (removal) {
LOGGER.debug(
"Executing removal of: {} to indexer: {}...",
pid, indexer);
indexer.remove(uri);
} else {
if (hasContent) {
LOGGER.debug(
"Executing update of: {} to indexer: {}...",
pid, indexer);
indexer.update(uri, content);
} else if (indexable) {
LOGGER.error(
"Received update for: {} but was unable to retrieve "
+ "content for update to indexer: {}!",
pid, indexer);
}
} catch (final Exception e) {
LOGGER.error("Error indexing {}: {}!", pid, e);
}
} catch (final Exception e) {
LOGGER.error("Error indexing {}: {}!", pid, e);
}
} else {
LOGGER.info("Resource retrieved without indexable type. Will not index.");
}

} catch (final JMSException | IOException | HttpException e) {
LOGGER.error("Error processing JMS event!", e);
} catch (final AbsentTransformPropertyException e2) {
Expand Down
Expand Up @@ -51,7 +51,7 @@
public class SolrIndexer implements Indexer {

public static final String CONFIGURATION_FOLDER =
"fedora:system/fedora:transform/fedora:ldpath/";
"/fedora:system/fedora:transform/fedora:ldpath/";

private final SolrServer server;

Expand All @@ -76,6 +76,16 @@ public SolrIndexer(final SolrServer solrServer) {
deserializer.setGson(this.gson);
}

/*
* (non-Javadoc)
* @see org.fcrepo.indexer.Indexer#update(java.lang.String, java.io.Reader)
* This method expects to receive a JSON input in {@param doc} that shows
* the following form:
* [{ "id" : ["myId"],
* "myField" : ["myFieldValue"],
* "myMultiValuedField" : ["value1", "value2"]
* }]
*/
@Override
public ListenableFuture<UpdateResponse> update(final String pid,
final Reader doc) {
Expand Down Expand Up @@ -142,10 +152,9 @@ public UpdateResponse call() throws Exception {
}

private <T> ListenableFuture<T> run(final ListenableFutureTask<T> task) {
synchronized (this) {
task.run();
notifyAll();
}
LOGGER.debug("Executing Solr update/remove...");
task.run();
LOGGER.debug("Solr update/remove executed.");
return task;
}

Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Map;

import org.apache.solr.common.SolrInputDocument;
Expand All @@ -44,7 +45,7 @@
public class SolrInputDocumentDeserializer extends
TypeAdapter<SolrInputDocument> {

private static final Type type = new TypeToken<Map<String, JsonElement>>() {}
private static final Type type = new TypeToken<Collection<Map<String, JsonElement>>>() {}
.getType();

private Gson gson;
Expand All @@ -59,9 +60,11 @@ public class SolrInputDocumentDeserializer extends

@Override
public SolrInputField transformEntry(final String key,
final JsonElement input) {
final JsonElement input) {
final SolrInputField field = new SolrInputField(key);
field.setValue(input.getAsString(), INDEX_TIME_BOOST);
for (final JsonElement value : input.getAsJsonArray()) {
field.addValue(value.getAsString(), INDEX_TIME_BOOST);
}
return field;
}
};
Expand All @@ -74,9 +77,10 @@ public void write(final JsonWriter out, final SolrInputDocument value) throws IO
@Override
public SolrInputDocument read(final JsonReader in) throws IOException {
try {
final Map<String, JsonElement> fields = gson.fromJson(in, type);
return new SolrInputDocument(transformEntries(fields,
jsonElement2solrInputField));
final Collection<Map<String, JsonElement>> fields =
gson.fromJson(in, type);
return new SolrInputDocument(transformEntries(fields.iterator()
.next(), jsonElement2solrInputField));
} catch (final Exception e) {
LOGGER.error("Failed to parse JSON to Solr update document!", e);
throw e;
Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.fcrepo.indexer.sparql;

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
import static com.hp.hpl.jena.sparql.util.Context.emptyContext;
Expand All @@ -24,9 +25,13 @@
import static org.fcrepo.indexer.Indexer.IndexerType.RDF;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Reader;
import java.util.HashSet;
import java.util.Iterator;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand All @@ -41,6 +46,7 @@
import com.hp.hpl.jena.update.UpdateProcessor;
import com.hp.hpl.jena.update.UpdateRequest;

import org.apache.jena.atlas.io.IndentedWriter;
import org.fcrepo.indexer.Indexer;
import org.slf4j.Logger;

Expand Down Expand Up @@ -94,7 +100,7 @@ public ListenableFuture<Void> update( final String pid, final Reader content ) {
* all triples with subjects starting with the same subject.
**/
@Override
public ListenableFuture<Void> remove( final String subject ) {
public ListenableFuture<Void> remove(final String subject) {

LOGGER.debug("Received remove for: {}", subject);
// find triples/quads to delete
Expand Down Expand Up @@ -148,11 +154,16 @@ private boolean matches( final String uri1, final String uri2 ) {
}

private ListenableFuture<Void> exec(final UpdateRequest update) {
if (update.getOperations().isEmpty()) {
LOGGER.debug("Received empty update/remove operation.");
return immediateFuture((Void) null);
}
final ListenableFutureTask<Void> task =
ListenableFutureTask.create(new Runnable() {

@Override
public void run() {

if (formUpdates) {
// form updates
final UpdateProcessor proc =
Expand All @@ -163,10 +174,33 @@ public void run() {
final UpdateProcessRemote proc =
new UpdateProcessRemote(update, updateBase,
emptyContext);
proc.execute();
try {
proc.execute();
} catch (final Exception e) {
LOGGER.error(
"Error executing Sparql update/remove!", e);
}
}
}
}, null);
task.addListener(new Runnable() {

@Override
public void run() {
try (final OutputStream buffer = new ByteArrayOutputStream()) {
final IndentedWriter out = new IndentedWriter(buffer);
update.output(out);
LOGGER.trace(
"Executed update/remove operation:\n{}",
buffer.toString());
out.close();
} catch (final IOException e) {
LOGGER.error(
"Couldn't retrieve execution of update/remove operation!",
e);
}
}
}, executorService);
executorService.submit(task);
return task;
}
Expand Down

0 comments on commit 7abf314

Please sign in to comment.