Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Now using new asynch indexer type for SparqlIndexer
Browse files Browse the repository at this point in the history
  • Loading branch information
ajs6f committed Dec 8, 2013
1 parent 002b1c4 commit 1c65ff3
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 13 deletions.
Expand Up @@ -38,7 +38,10 @@ public abstract class AsynchIndexer<T> implements Indexer {

private static final Logger LOGGER = getLogger(AsynchIndexer.class);

abstract ListeningExecutorService executorService();
/**
* @return The {@link ListeningExecutorService} to use for operation.
*/
public abstract ListeningExecutorService executorService();

@Override
public ListenableFuture<T> update(final String identifier,
Expand Down Expand Up @@ -75,9 +78,18 @@ public void run() {
return task;
}

abstract ListenableFutureTask<T> removeSynch(final String identifier);
/**
* @param identifier
* @return
*/
public abstract ListenableFutureTask<T> removeSynch(final String identifier);

abstract ListenableFutureTask<T> updateSynch(final String identifier,
/**
* @param identifier
* @param content
* @return
*/
public abstract ListenableFutureTask<T> updateSynch(final String identifier,
final Reader content);

}
Expand Up @@ -16,7 +16,7 @@

package org.fcrepo.indexer.sparql;

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
import static com.hp.hpl.jena.sparql.util.Context.emptyContext;
Expand All @@ -31,8 +31,8 @@
import java.io.Reader;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Callable;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.hp.hpl.jena.rdf.model.Model;
Expand All @@ -47,7 +47,7 @@
import com.hp.hpl.jena.update.UpdateRequest;

import org.apache.jena.atlas.io.IndentedWriter;
import org.fcrepo.indexer.Indexer;
import org.fcrepo.indexer.AsynchIndexer;
import org.slf4j.Logger;


Expand All @@ -58,7 +58,7 @@
* @author ajs6f
* @date Aug 19, 2013
**/
public class SparqlIndexer implements Indexer {
public class SparqlIndexer extends AsynchIndexer<Void> {

private String queryBase;
private String updateBase;
Expand All @@ -80,10 +80,14 @@ public class SparqlIndexer implements Indexer {
* @content RDF in N3 format.
**/
@Override
public ListenableFuture<Void> update( final String pid, final Reader content ) {
public ListenableFutureTask<Void> updateSynch( final String pid, final Reader content ) {
LOGGER.debug("Received update for: {}", pid);
// first remove old data
remove(pid);
try {
remove(pid);
} catch (final IOException e) {
propagate(e);
}

// parse content into a model
final Model model = createDefaultModel().read(content, null, "N3");
Expand All @@ -105,7 +109,7 @@ public ListenableFuture<Void> update( final String pid, final Reader content ) {
* all triples with subjects starting with the same subject.
**/
@Override
public ListenableFuture<Void> remove(final String subject) {
public ListenableFutureTask<Void> removeSynch(final String subject) {

LOGGER.debug("Received remove for: {}", subject);
// find triples/quads to delete
Expand Down Expand Up @@ -158,11 +162,18 @@ private boolean matches( final String uri1, final String uri2 ) {
|| uri1.startsWith(uri2 + "#");
}

private ListenableFuture<Void> exec(final UpdateRequest update) {
private ListenableFutureTask<Void> exec(final UpdateRequest update) {
if (update.getOperations().isEmpty()) {
LOGGER.debug("Received empty update/remove operation.");
return immediateFuture((Void) null);
return ListenableFutureTask.create(new Callable<Void>() {

@Override
public Void call() throws Exception {
return null;
}
});
}

final ListenableFutureTask<Void> task =
ListenableFutureTask.create(new Runnable() {

Expand Down Expand Up @@ -258,4 +269,11 @@ public void setQueryBase( final String url ) {
public void setUpdateBase( final String url ) {
this.updateBase = url;
}

@Override
public ListeningExecutorService executorService() {
return executorService;
}


}
Expand Up @@ -105,7 +105,7 @@ public IndexerType getIndexerType() {
}

@Override
ListeningExecutorService executorService() {
public ListeningExecutorService executorService() {
return executorService;
}
}

0 comments on commit 1c65ff3

Please sign in to comment.