Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Better
Browse files Browse the repository at this point in the history
  • Loading branch information
ajs6f committed Dec 2, 2013
1 parent bde01a2 commit 0bfb60d
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 117 deletions.
Expand Up @@ -36,8 +36,9 @@
* Basic Indexer implementation that writes object content to timestamped files
* on disk.
*
* @author ajs6f
* @author Esmé Cowles
* Date: Aug 19, 2013
* @date Aug 19, 2013
**/
public class FileSerializer implements Indexer {

Expand Down
Expand Up @@ -25,8 +25,9 @@
* destination (Solr, triplestore, files, etc.) should have its own
* implementation.
*
* @author ajs6f
* @author Esmé Cowles
* Date: Aug 19, 2013
* @date Aug 19, 2013
**/
public interface Indexer {

Expand Down
Expand Up @@ -18,11 +18,11 @@

import static com.google.common.base.Throwables.propagate;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.http.HttpStatus.SC_OK;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashSet;
import java.util.Set;
import java.nio.charset.Charset;

Expand Down Expand Up @@ -52,7 +52,7 @@
*
* @author Esmé Cowles
* @author ajs6f
* @date Aug 19, 2013
* @date Aug 19 2013
**/
public class IndexerGroup implements MessageListener {

Expand All @@ -64,8 +64,6 @@ public class IndexerGroup implements MessageListener {

private Set<Indexer> indexers;

private Set<Listener> listeners = new HashSet<>();

private HttpClient httpclient;

/**
Expand Down Expand Up @@ -127,9 +125,7 @@ private String getPath(final java.util.List<Category> categories) {
**/
@Override
public void onMessage(final Message message) {
LOGGER.debug(
"Registered listeners for IndexerGroup: {} now include: {}",
this, listeners);try {
try {
LOGGER.debug("Received message: {}", message.getJMSMessageID());
} catch (final JMSException e) {
LOGGER.error("Received unparseable message: {}", e);
Expand All @@ -139,18 +135,27 @@ public void onMessage(final Message message) {
if (message instanceof TextMessage) {
// get pid from message
final String xml = ((TextMessage) message).getText();
LOGGER.debug("Received Atom message: {}", xml);
final Document<Entry> doc = atomParser.parse(new StringReader(xml));
final Entry entry = doc.getRoot();
// if the object is updated, fetch current content
String content = null;


final Boolean removal = "purgeObject".equals(entry.getTitle());

// if the object is updated, fetch current content
Boolean hasContent = false;
String content = null;
if (!removal) {
final HttpGet get = new HttpGet(
getPath(entry.getCategories("xsd:string")));
final HttpResponse response = httpclient.execute(get);
content = IOUtils.toString(response.getEntity()
.getContent(), Charset.forName("UTF-8"));
if (response.getStatusLine().getStatusCode() == SC_OK) {
content =
IOUtils.toString(response.getEntity().getContent(),
Charset.forName("UTF-8"));
hasContent = true;
}

}
// pid represents the full path. Alternative would be to send
// path separately in all calls
Expand All @@ -167,27 +172,18 @@ public void onMessage(final Message message) {
if (removal) {
indexer.remove(pid);
} else {
indexer.update(pid, content);
if (hasContent) {
indexer.update(pid, content);
} else {
LOGGER.error(
"Received update on {} but was unable to retrieve representation!",
pid);
}
}
} catch (final Exception e) {
LOGGER.error("Error indexing {}: {}!", pid, e);
}
}
LOGGER.debug(
"Registered listeners for IndexerGroup: {} now include: {}",
this, listeners);
for (final Listener l : listeners) {
LOGGER.debug("Notifying listener: {}", l);
if (removal) {
l.notifyRemove(pid, message);
} else {
l.notifyUpdate(pid, message);
}
}
synchronized (this) {
LOGGER.debug("Notifying waiting threads.");
notifyAll();
}
}
} catch (final JMSException e) {
LOGGER.error("Error processing JMS event!", e);
Expand All @@ -196,40 +192,4 @@ public void onMessage(final Message message) {
}
}

/**
* Adds a listener to be notified when an indexing operation takes place.
*
* @param l
*/
public void addListener(final Listener l) {
LOGGER.debug("Adding listener: {}", l);
LOGGER.debug(
"Registered listeners for IndexerGroup: {} now include: {}",
this, listeners);
listeners.add(l);

}

/**
* Implemented by classes that want to listen to the results of indexing.
*
* @author ajs6f
* @date Nov 26, 2013
*/
public static interface Listener {

/**
* @param pid
* @param msg
*/
void notifyUpdate(final String pid, final Message msg);

/**
* @param pid
* @param msg
*/
void notifyRemove(final String pid, final Message msg);

}

}
Expand Up @@ -35,8 +35,9 @@
* A Solr Indexer (stub) implementation that adds some basic information to
* a Solr index server.
*
* @author ajs6f
* @author yecao
*
* @date Nov 2013
*/
public class SolrIndexer implements Indexer {

Expand Down
Expand Up @@ -52,7 +52,7 @@ public class SparqlIndexer implements Indexer {
private String updateBase;
private boolean formUpdates = false;

private final Logger logger = getLogger(SparqlIndexer.class);
private static final Logger LOGGER = getLogger(SparqlIndexer.class);

/**
* Set whether to use SPARQL Update or form updates.
Expand Down Expand Up @@ -82,6 +82,7 @@ public void setUpdateBase( final String url ) {
**/
@Override
public ListenableFuture<Void> update( final String pid, final String content ) {
LOGGER.debug("Received update for: {}", pid);
// first remove old data
remove(pid);

Expand All @@ -97,7 +98,7 @@ public ListenableFuture<Void> update( final String pid, final String content ) {
}

// send update to server
logger.debug("Sending update request for pid: {}", pid);
LOGGER.debug("Sending update request for pid: {}", pid);
return exec(new UpdateRequest(new UpdateDataInsert(add)));
}

Expand All @@ -108,6 +109,7 @@ public ListenableFuture<Void> update( final String pid, final String content ) {
@Override
public ListenableFuture<Void> remove( final String subject ) {

LOGGER.debug("Received remove for: {}", subject);
// find triples/quads to delete
final String describeQuery = "DESCRIBE <" + subject + ">";
final QueryEngineHTTP qexec = new QueryEngineHTTP( queryBase, describeQuery );
Expand Down Expand Up @@ -140,7 +142,7 @@ public ListenableFuture<Void> remove( final String subject ) {
final UpdateRequest del = new UpdateRequest();
for ( final String uri : uris ) {
final String cmd = "delete where { <" + uri + "> ?p ?o }";
logger.debug(cmd);
LOGGER.debug(cmd);
del.add( cmd );
}

Expand Down
Expand Up @@ -22,18 +22,12 @@
import static org.junit.Assert.assertTrue;
import static org.slf4j.LoggerFactory.getLogger;

import javax.jms.Message;

import java.util.ArrayList;
import java.util.List;

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.fcrepo.indexer.IndexerGroup.Listener;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -45,12 +39,13 @@
import javax.inject.Inject;

/**
* @author ajs6f
* @author Esmé Cowles
* Date: Aug 19, 2013
* @date Aug 19, 2013
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"/spring-test/test-container.xml"})
public class IndexerGroupIT implements Listener {
public class IndexerGroupIT {

private static final Logger LOGGER = getLogger(IndexerGroupIT.class);

Expand All @@ -70,21 +65,13 @@ public class IndexerGroupIT implements Listener {
@Inject
private TestIndexer testIndexer;

private List<Message> messages = new ArrayList<>();
private List<String> updates = new ArrayList<>();
private List<String> removals = new ArrayList<>();

@Before
public void setup() {
client = new DefaultHttpClient(connectionManager);
}

@Test
public void testIndexerGroupUpdate() throws Exception {
LOGGER.debug("Adding a listener {} to the IndexerGroup under test.",
this);
indexerGroup.addListener(this);

doIndexerGroupUpdateTest("test_pid_0");
}

Expand All @@ -94,15 +81,15 @@ private void doIndexerGroupUpdateTest(final String pid) throws Exception {
final HttpPost method = new HttpPost(uri);
final HttpResponse response = client.execute(method);
assertEquals(201, response.getStatusLine().getStatusCode());
LOGGER.debug("Created object at: {}", pid);
LOGGER.debug("Created object at: {}", uri);

synchronized (indexerGroup) {
while (!updates.contains(uri)) {
LOGGER.debug("Waiting for next notification from IndexerGroup...");
LOGGER.debug("Updates currently received for pids: {}", updates);
indexerGroup.wait(1000);
synchronized (testIndexer) {
while (!testIndexer.receivedUpdate(uri)) {
LOGGER.debug("Waiting for next notification from TestIndexer...");
testIndexer.wait(1000);
}
}
LOGGER.debug("Received update at test indexer for uri: {}", uri);
assertTrue("Test indexer should have received an update message!", testIndexer
.receivedUpdate(uri));

Expand All @@ -111,39 +98,26 @@ private void doIndexerGroupUpdateTest(final String pid) throws Exception {
@Test
public void testIndexerGroupDelete() throws Exception {

LOGGER.debug("Adding a listener {} to the IndexerGroup under test.",
this);
indexerGroup.addListener(this);

// create and verify dummy object
final String pid = "test_pid_5";
final String uri = serverAddress + pid;
doIndexerGroupUpdateTest(pid);

Thread.sleep(1200); // Let the creation event persist

// delete dummy object
final HttpDelete method = new HttpDelete(uri);
final HttpResponse response = client.execute(method);
assertEquals(204, response.getStatusLine().getStatusCode());
LOGGER.debug("Deleted object at: {}", uri);
synchronized (testIndexer) {
while (!testIndexer.receivedRemove(uri)) {
LOGGER.debug("Waiting for next notification from TestIndexer...");
testIndexer.wait(1000);
}
}
LOGGER.debug("Received update at test indexer for uri: {}", uri);

assertTrue("Test indexer should have received delete message!", testIndexer
.receivedRemove(pid));
.receivedRemove(uri));

}

@Override
public void notifyUpdate(final String pid, final Message msg) {
updates.add(pid);
messages.add(msg);

}

@Override
public void notifyRemove(final String pid, final Message msg) {
removals.add(pid);
messages.add(msg);

}

}

0 comments on commit 0bfb60d

Please sign in to comment.