Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
All indexers now using appropriate types for their asynch/synch behavior
  • Loading branch information
ajs6f committed Dec 9, 2013
1 parent 1c65ff3 commit d273f7c
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 95 deletions.
Expand Up @@ -32,7 +32,7 @@
*
* @author ajs6f
* @date Dec 8, 2013
* @param <T>
* @param <T> the type of response to expect from an operation
*/
public abstract class AsynchIndexer<T> implements Indexer {

Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;

/**
Expand All @@ -45,7 +44,7 @@
* @author Esmé Cowles
* @date Aug 19, 2013
**/
public class FileSerializer implements Indexer {
public class FileSerializer extends SynchIndexer<File> {

private static final Logger LOGGER = getLogger(FileSerializer.class);

Expand Down Expand Up @@ -75,15 +74,15 @@ public String getPath() {
* @return
**/
@Override
public ListenableFuture<File> update(final String pid, final Reader content) throws IOException {
public ListenableFutureTask<File> updateSynch(final String pid, final Reader content) {
// timestamped filename
String fn = pid + "." + fmt.format(new Date());
if (fn.indexOf('/') != -1) {
fn = substringAfterLast(fn, "/");
}
final File file = new File(path, fn);
LOGGER.debug("Updating to file: {}", file);
return run(ListenableFutureTask.create(new Callable<File>() {
return ListenableFutureTask.create(new Callable<File>() {

@Override
public File call() {
Expand All @@ -95,7 +94,7 @@ public File call() {
}
return file;
}
}));
});

}

Expand All @@ -104,17 +103,12 @@ public File call() {
* Remove the object from the index.
**/
@Override
public ListenableFuture<File> remove(final String pid) throws IOException {
public ListenableFutureTask<File> removeSynch(final String id) {
// empty update
LOGGER.debug("Received remove for identifier: {}", pid);
return update(pid, new StringReader(""));
LOGGER.debug("Received remove for identifier: {}", id);
return updateSynch(id, new StringReader(""));
}

private static <T> ListenableFuture<T> run(
final ListenableFutureTask<T> task) {
task.run();
return task;
}
@Override
public IndexerType getIndexerType() {
return NAMEDFIELDS;
Expand Down
@@ -0,0 +1,40 @@
/**
* Copyright 2013 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.indexer;

import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;

import com.google.common.util.concurrent.ListeningExecutorService;

/**
* An {@Indexer} that completes its operations synchronously.
*
* @author ajs6f
* @date Dec 8, 2013
* @param <T> the type of response to expect from an operation
*/
public abstract class SynchIndexer<T> extends AsynchIndexer<T> {

private final ListeningExecutorService executorService =
sameThreadExecutor();

@Override
public ListeningExecutorService executorService() {
return executorService;
}

}
Expand Up @@ -17,6 +17,8 @@
package org.fcrepo.indexer.solr;

import static com.google.common.base.Throwables.propagate;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.fcrepo.indexer.Indexer.IndexerType.NAMEDFIELDS;
import static org.slf4j.LoggerFactory.getLogger;

Expand All @@ -30,13 +32,13 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.fcrepo.indexer.Indexer;
import org.fcrepo.indexer.AsynchIndexer;
import org.fcrepo.indexer.IndexerGroup;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

Expand All @@ -48,13 +50,22 @@
* @author yecao
* @date Nov 2013
*/
public class SolrIndexer implements Indexer {
public class SolrIndexer extends AsynchIndexer<UpdateResponse> {

public static final String CONFIGURATION_FOLDER =
"/fedora:system/fedora:transform/fedora:ldpath/";

private final SolrServer server;

/**
* Number of threads to use for operating against the triplestore.
*/
private static final Integer THREAD_POOL_SIZE = 5;

private ListeningExecutorService executorService =
listeningDecorator(newFixedThreadPool(THREAD_POOL_SIZE));


private static final Logger LOGGER = getLogger(SolrIndexer.class);

@Inject
Expand Down Expand Up @@ -87,10 +98,10 @@ public SolrIndexer(final SolrServer solrServer) {
* }]
*/
@Override
public ListenableFuture<UpdateResponse> update(final String pid,
public ListenableFutureTask<UpdateResponse> updateSynch(final String pid,
final Reader doc) {
LOGGER.debug("Received request for update to: {}", pid);
return run(ListenableFutureTask.create(new Callable<UpdateResponse>() {
return ListenableFutureTask.create(new Callable<UpdateResponse>() {

@Override
public UpdateResponse call() throws Exception {
Expand Down Expand Up @@ -120,13 +131,13 @@ public UpdateResponse call() throws Exception {
throw propagate(e);
}
}
}));
});
}

@Override
public ListenableFuture<UpdateResponse> remove(final String pid) throws IOException {
public ListenableFutureTask<UpdateResponse> removeSynch(final String pid) {
LOGGER.debug("Received request for removal of: {}", pid);
return run(ListenableFutureTask.create(new Callable<UpdateResponse>() {
return ListenableFutureTask.create(new Callable<UpdateResponse>() {

@Override
public UpdateResponse call() throws Exception {
Expand All @@ -148,17 +159,9 @@ public UpdateResponse call() throws Exception {
throw propagate(e);
}
}
}));
}

private <T> ListenableFuture<T> run(final ListenableFutureTask<T> task) {
LOGGER.debug("Executing Solr update/remove...");
task.run();
LOGGER.debug("Solr update/remove executed.");
return task;
});
}


/**
* @return the {@link SolrServer} in use
*/
Expand All @@ -171,4 +174,10 @@ public IndexerType getIndexerType() {
return NAMEDFIELDS;
}

@Override
public ListeningExecutorService executorService() {
return executorService;
}


}
Expand Up @@ -16,7 +16,6 @@

package org.fcrepo.indexer;

import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.fcrepo.indexer.Indexer.IndexerType.NO_CONTENT;
import static org.slf4j.LoggerFactory.getLogger;

Expand All @@ -28,7 +27,6 @@
import org.slf4j.Logger;

import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;


/**
Expand All @@ -38,17 +36,13 @@
* @author Esmé Cowles
* @date Nov 25, 2013
**/
public class TestIndexer extends AsynchIndexer<Boolean> {
public class TestIndexer extends SynchIndexer<Boolean> {

private static final Logger LOGGER = getLogger(TestIndexer.class);

private final Set<String> updates = new HashSet<>();
private final Set<String> removes = new HashSet<>();

private final ListeningExecutorService executorService =
sameThreadExecutor();


@Override
public ListenableFutureTask<Boolean> updateSynch(final String identifier,
final Reader content) {
Expand Down Expand Up @@ -104,8 +98,4 @@ public IndexerType getIndexerType() {
return NO_CONTENT;
}

@Override
public ListeningExecutorService executorService() {
return executorService;
}
}
Expand Up @@ -15,19 +15,24 @@
*/
package org.fcrepo.indexer.integration.solr;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
import java.io.StringReader;
import java.util.List;

import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.params.SolrParams;
import org.fcrepo.indexer.solr.SolrIndexer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
Expand All @@ -47,42 +52,53 @@ public class SolrIndexerIT {
@Autowired
private SolrServer server;

private static final Logger LOGGER = getLogger(SolrIndexerIT.class);

private static final long TIMEOUT = 15000;

private static final long TIME_TO_WAIT_STEP = 1000;

/**
* Test method for
* {@link org.fcrepo.indexer.solr.SolrIndexer#update(java.lang.String, java.lang.String)}
* .
*
* @throws SolrServerException
*/
@Test
public void testUpdate() throws SolrServerException {
public void testUpdate() throws SolrServerException, IOException, InterruptedException {
doUpdate("123");
}

private void doUpdate(final String pid) throws SolrServerException {
private void doUpdate(final String pid) throws SolrServerException, IOException, InterruptedException {
final String json =
"[{\"id\" : [\"" + pid + "\"]}]";
solrIndexer.update(pid, new StringReader(json));
final SolrParams params = new SolrQuery("id:" + pid);
final QueryResponse response = server.query(params);
assertEquals(pid, response.getResults().get(0).get("id"));
final SolrParams query = new SolrQuery("id:" + pid);
List<SolrDocument> results = server.query(query).getResults();
Boolean success = results.size() == 1;
final Long start = currentTimeMillis();
while ((currentTimeMillis() - start < TIMEOUT) && !success) {
LOGGER.debug("Waiting for index record to appear...");
sleep(TIME_TO_WAIT_STEP);
LOGGER.debug("Checking for presence of appropriate index record...");
results = server.query(query).getResults();
success = results.size() == 1;
}
assertEquals("Didn't find the record that should have been created!",
pid, results.get(0).get("id"));
}

/**
* Test method for
* {@link org.fcrepo.indexer.solr.SolrIndexer#remove(java.lang.String)}.
*
* @throws IOException
* @throws SolrServerException
*/
@Test
public void testRemove() throws IOException, SolrServerException {
public void testRemove() throws IOException, SolrServerException, InterruptedException {
doUpdate("345");
solrIndexer.remove("345");
final SolrParams params = new SolrQuery("id:345");
final QueryResponse response = server.query(params);
assertEquals(0, response.getResults().getNumFound());
final SolrParams query = new SolrQuery("id:345");
List<SolrDocument> results = server.query(query).getResults();
Boolean success = results.size() == 0;
final Long start = currentTimeMillis();
while ((currentTimeMillis() - start < TIMEOUT) && !success) {
LOGGER.debug("Waiting for index record to appear...");
sleep(TIME_TO_WAIT_STEP);
LOGGER.debug("Checking for presence of appropriate index record...");
results = server.query(query).getResults();
success = results.size() == 0;
}
assertEquals("Found a record that should have been deleted!", 0,
results.size());
}

}

0 comments on commit d273f7c

Please sign in to comment.