Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Factored execution handling into abstract superclasses (out of concre…
Browse files Browse the repository at this point in the history
…te Inderer impls)
  • Loading branch information
ajs6f committed Dec 15, 2013
1 parent 18c0c16 commit 04afaf9
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 60 deletions.
Expand Up @@ -19,6 +19,8 @@
import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
import java.util.concurrent.Callable;

import org.slf4j.Logger;

import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -49,7 +51,8 @@ public ListenableFuture<Result> update(final String identifier,
final Content content) throws IOException {
LOGGER.debug("Received update for identifier: {}", identifier);

final ListenableFutureTask<Result> task = updateSynch(identifier, content);
final ListenableFutureTask<Result> task =
ListenableFutureTask.create(updateSynch(identifier, content));
task.addListener(new Runnable() {
@Override
public void run() {
Expand All @@ -68,7 +71,8 @@ public void run() {
public ListenableFuture<Result> remove(final String identifier)
throws IOException {
LOGGER.debug("Received remove for identifier: {}", identifier);
final ListenableFutureTask<Result> task = removeSynch(identifier);
final ListenableFutureTask<Result> task =
ListenableFutureTask.create(removeSynch(identifier));
task.addListener(new Runnable() {
@Override
public void run() {
Expand All @@ -85,14 +89,14 @@ public void run() {
* @param identifier
* @return
*/
public abstract ListenableFutureTask<Result> removeSynch(final String identifier);
public abstract Callable<Result> removeSynch(final String identifier);

/**
* @param identifier
* @param content
* @return
*/
public abstract ListenableFutureTask<Result> updateSynch(final String identifier,
public abstract Callable<Result> updateSynch(final String identifier,
final Content content);

}
Expand Up @@ -31,8 +31,6 @@

import org.slf4j.Logger;

import com.google.common.util.concurrent.ListenableFutureTask;

/**
* Basic Indexer implementation that writes object content to timestamped files
* on disk.
Expand Down Expand Up @@ -71,15 +69,15 @@ public String getPath() {
* @return
**/
@Override
public ListenableFutureTask<File> updateSynch(final String pid, final NamedFields content) {
public Callable<File> updateSynch(final String pid, final NamedFields content) {
// timestamped filename
String fn = pid + "." + fmt.format(new Date());
if (fn.indexOf('/') != -1) {
fn = substringAfterLast(fn, "/");
}
final File file = new File(path, fn);
LOGGER.debug("Updating to file: {}", file);
return ListenableFutureTask.create(new Callable<File>() {
return new Callable<File>() {

@Override
public File call() {
Expand All @@ -95,7 +93,7 @@ public File call() {
}
return file;
}
});
};

}

Expand All @@ -104,7 +102,7 @@ public File call() {
* Remove the object from the index.
**/
@Override
public ListenableFutureTask<File> removeSynch(final String id) {
public Callable<File> removeSynch(final String id) {
// empty update
LOGGER.debug("Received remove for identifier: {}", id);
return updateSynch(id, new NamedFields());
Expand Down
Expand Up @@ -43,7 +43,6 @@
import org.springframework.beans.factory.annotation.Autowired;

import com.google.common.collect.Maps.EntryTransformer;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;

/**
Expand Down Expand Up @@ -87,10 +86,10 @@ public SolrIndexer(final SolrServer solrServer) {
}

@Override
public ListenableFutureTask<UpdateResponse> updateSynch(final String id,
public Callable<UpdateResponse> updateSynch(final String id,
final NamedFields fields) {
LOGGER.debug("Received request for update to: {}", id);
return ListenableFutureTask.create(new Callable<UpdateResponse>() {
return new Callable<UpdateResponse>() {

@Override
public UpdateResponse call() throws Exception {
Expand Down Expand Up @@ -120,7 +119,7 @@ public UpdateResponse call() throws Exception {
throw propagate(e);
}
}
});
};
}

protected SolrInputDocument fromMap(final Map<String, Collection<String>> fields) {
Expand All @@ -145,9 +144,9 @@ public SolrInputField transformEntry(final String key,
};

@Override
public ListenableFutureTask<UpdateResponse> removeSynch(final String pid) {
public Callable<UpdateResponse> removeSynch(final String pid) {
LOGGER.debug("Received request for removal of: {}", pid);
return ListenableFutureTask.create(new Callable<UpdateResponse>() {
return new Callable<UpdateResponse>() {

@Override
public UpdateResponse call() throws Exception {
Expand All @@ -169,7 +168,7 @@ public UpdateResponse call() throws Exception {
throw propagate(e);
}
}
});
};
}

/**
Expand Down
Expand Up @@ -78,7 +78,7 @@ public class SparqlIndexer extends AsynchIndexer<Model, Void> {
* @content RDF in N3 format.
**/
@Override
public ListenableFutureTask<Void> updateSynch(final String pid,
public Callable<Void> updateSynch(final String pid,
final Model model) {
LOGGER.debug("Received update for: {}", pid);
// first remove old data
Expand All @@ -105,7 +105,7 @@ public ListenableFutureTask<Void> updateSynch(final String pid,
* all triples with subjects starting with the same subject.
**/
@Override
public ListenableFutureTask<Void> removeSynch(final String subject) {
public Callable<Void> removeSynch(final String subject) {

LOGGER.debug("Received remove for: {}", subject);
// find triples/quads to delete
Expand Down Expand Up @@ -158,43 +158,46 @@ private boolean matches( final String uri1, final String uri2 ) {
|| uri1.startsWith(uri2 + "#");
}

private ListenableFutureTask<Void> exec(final UpdateRequest update) {
private Callable<Void> exec(final UpdateRequest update) {
if (update.getOperations().isEmpty()) {
LOGGER.debug("Received empty update/remove operation.");
return ListenableFutureTask.create(new Callable<Void>() {
return new Callable<Void>() {

@Override
public Void call() throws Exception {
return null;
}
});
};
}

final ListenableFutureTask<Void> task =
ListenableFutureTask.create(new Runnable() {

@Override
public void run() {
final Callable<Void> callable = new Callable<Void>() {

if (formUpdates) {
// form updates
final UpdateProcessor proc =
createRemoteForm(update, updateBase);
@Override
public Void call() {

if (formUpdates) {
// form updates
final UpdateProcessor proc =
createRemoteForm(update, updateBase);
proc.execute();
} else {
// normal SPARQL updates
final UpdateProcessRemote proc =
new UpdateProcessRemote(update, updateBase,
emptyContext);
try {
proc.execute();
} else {
// normal SPARQL updates
final UpdateProcessRemote proc =
new UpdateProcessRemote(update, updateBase,
emptyContext);
try {
proc.execute();
} catch (final Exception e) {
LOGGER.error(
"Error executing Sparql update/remove!", e);
}
} catch (final Exception e) {
LOGGER.error(
"Error executing Sparql update/remove!", e);
}
}
}, null);
return null;
}
};

final ListenableFutureTask<Void> task =
ListenableFutureTask.create(callable);
task.addListener(new Runnable() {

@Override
Expand All @@ -217,7 +220,7 @@ public void run() {
}
}, executorService);
executorService.submit(task);
return task;
return callable;
}

/**
Expand Down
Expand Up @@ -26,8 +26,6 @@

import org.slf4j.Logger;

import com.google.common.util.concurrent.ListenableFutureTask;


/**
* Indexer implementation that tracks which PIDs it has received messages for,
Expand All @@ -45,24 +43,24 @@ public class TestIndexer extends SynchIndexer<NoContent, Boolean> {
private final Set<String> removes = new HashSet<>();

@Override
public ListenableFutureTask<Boolean> updateSynch(final String identifier,
public Callable<Boolean> updateSynch(final String identifier,
final NoContent content) {
LOGGER.debug("Received update for identifier: {}", identifier);
return ListenableFutureTask.create(new Callable<Boolean>() {
return new Callable<Boolean>() {

@Override
public Boolean call() throws Exception {
final Boolean success = updates.add(identifier);
LOGGER.debug("Current recorded updates include: {}", updates);
return success;
}
});
};
}

@Override
public ListenableFutureTask<Boolean> removeSynch(final String identifier) {
public Callable<Boolean> removeSynch(final String identifier) {
LOGGER.debug("Received remove for identifier: {}", identifier);
return ListenableFutureTask.create(new Callable<Boolean>() {
return new Callable<Boolean>() {

@Override
public Boolean call() throws Exception {
Expand All @@ -71,7 +69,7 @@ public Boolean call() throws Exception {
removes);
return success;
}
});
};

}

Expand Down
3 changes: 3 additions & 0 deletions fcrepo-jms-indexer-elasticsearch/README.txt
@@ -0,0 +1,3 @@
This indexer cannot be used in the same web-app deployment as the Solr indexer, because of library incompatibilities.

When Elasticsearch updates the version of Lucene on which it relies, we will update as appropriate.
Expand Up @@ -32,7 +32,6 @@
import org.fcrepo.indexer.NamedFields;
import org.slf4j.Logger;

import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;

/**
Expand Down Expand Up @@ -76,29 +75,29 @@ public void initIndex() throws InterruptedException {
}

@Override
public ListenableFutureTask<ActionResponse> removeSynch(final String id) {
return ListenableFutureTask.create(new Callable<ActionResponse>() {
public Callable<ActionResponse> removeSynch(final String id) {
return new Callable<ActionResponse>() {

@Override
public ActionResponse call() throws Exception {
return client.prepareDelete(getIndexName(),
getSearchIndexType(), id).execute().actionGet();
}
});
};
}

@Override
public ListenableFutureTask<ActionResponse> updateSynch(final String id,
public Callable<ActionResponse> updateSynch(final String id,
final NamedFields content) {
return ListenableFutureTask.create(new Callable<ActionResponse>() {
return new Callable<ActionResponse>() {

@Override
public ActionResponse call() throws Exception {
return client.prepareIndex(indexName, searchIndexType, id)
.execute().actionGet();

}
});
};
}

@Override
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Expand Up @@ -284,7 +284,6 @@

<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>${checkstyle.plugin.version}</version>
<dependencies>
<dependency>
<groupId>org.fcrepo</groupId>
Expand Down

0 comments on commit 04afaf9

Please sign in to comment.