Skip to content
This repository has been archived by the owner on Jan 3, 2019. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
Non-functional update that makes tests less brittle, introducing a ne…
…w TestIndexer class that allows ITs to check whether messages have been received without having to count files/triples.
  • Loading branch information
escowles committed Dec 3, 2013
1 parent b7660d4 commit 64812b9
Show file tree
Hide file tree
Showing 18 changed files with 436 additions and 368 deletions.
Expand Up @@ -16,37 +16,43 @@

package org.fcrepo.indexer;

import static org.apache.commons.io.IOUtils.write;
import static org.apache.commons.lang.StringUtils.substringAfterLast;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;

/**
* Basic Indexer implementation that writes object content to timestamped files
* on disk.
*
* @author ajs6f
* @author Esmé Cowles
* Date: Aug 19, 2013
* @date Aug 19, 2013
**/
public class FileSerializer implements Indexer {

private final Logger logger = LoggerFactory.getLogger(FileSerializer.class);
private static final Logger LOGGER = getLogger(FileSerializer.class);

private static SimpleDateFormat fmt =
new SimpleDateFormat("yyyyMMddHHmmss");

private static SimpleDateFormat fmt
= new SimpleDateFormat("yyyyMMddHHmmss");
private File path;

/**
* Set path to write files.
**/
public void setPath( String pathName ) {
public void setPath( final String pathName ) {
this.path = new File(pathName);
if (!this.path.exists()) {
this.path.mkdirs();
Expand All @@ -61,31 +67,45 @@ public String getPath() {

/**
* Create or update an index entry for the object.
* @return
**/
public void update(String pid, String content) throws IOException {
@Override
public ListenableFuture<File> update(final String pid, final String content) throws IOException {
// timestamped filename
String fn = pid + "." + fmt.format( new Date() );
if ( fn.indexOf('/') != -1 ) {
fn = StringUtils.substringAfterLast(fn, "/");
String fn = pid + "." + fmt.format(new Date());
if (fn.indexOf('/') != -1) {
fn = substringAfterLast(fn, "/");
}
final File file = new File(path, fn);
return run(ListenableFutureTask.create(new Callable<File>() {

@Override
public File call() {
// write content to disk
try (Writer fw = new FileWriter(file)) {
write(content, fw);
} catch (final IOException ex) {
LOGGER.error("Error writing file", ex);
}
return file;
}
}));

// write content to disk
FileWriter fw = null;
try {
fw = new FileWriter( new File(path,fn) );
IOUtils.write( content, fw );
} catch (IOException ex) {
logger.warn("Error writing file", ex);
} finally {
fw.close();
}
}


/**
* Remove the object from the index.
**/
public void remove(String pid) throws IOException {
@Override
public ListenableFuture<File> remove(final String pid) throws IOException {
// empty update
update(pid,"");
return update(pid,"");
}

private static <T> ListenableFuture<T> run(
final ListenableFutureTask<T> task) {
task.run();
return task;
}
}
Expand Up @@ -18,22 +18,26 @@

import java.io.IOException;

import com.google.common.util.concurrent.ListenableFuture;

/**
* Main interface for individual indexers to implement. Each type of
* destination (Solr, triplestore, files, etc.) should have its own
* implementation.
*
* @author ajs6f
* @author Esmé Cowles
* Date: Aug 19, 2013
* @date Aug 19, 2013
**/
public interface Indexer {

/**
* Create or update an index entry for the object.
**/
public void update(String pid, String doc) throws IOException;
public ListenableFuture<?> update(final String pid, final String doc) throws IOException;

/**
* Remove the object from the index.
**/
public void remove(String pid) throws IOException;
public ListenableFuture<?> remove(final String pid) throws IOException;
}
Expand Up @@ -16,12 +16,14 @@

package org.fcrepo.indexer;

import static com.google.common.base.Throwables.propagate;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.http.HttpStatus.SC_OK;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.nio.charset.Charset;

import javax.jms.JMSException;
Expand All @@ -42,20 +44,24 @@
import org.apache.http.impl.conn.PoolingClientConnectionManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* MessageListener implementation that retrieves objects from the repository and
* invokes one or more indexers to index the content.
*
*
* @author Esmé Cowles
* Date: Aug 19, 2013
* @author ajs6f
* @date Aug 19 2013
**/
public class IndexerGroup implements MessageListener {
private final Logger logger = LoggerFactory.getLogger(IndexerGroup.class);

private static final Logger LOGGER = getLogger(IndexerGroup.class);

private Parser atomParser = new Abdera().getParser();

private String repositoryURL;

private Set<Indexer> indexers;

private HttpClient httpclient;
Expand All @@ -64,16 +70,17 @@ public class IndexerGroup implements MessageListener {
* Default constructor.
**/
public IndexerGroup() {
PoolingClientConnectionManager p = new PoolingClientConnectionManager();
LOGGER.debug("Creating IndexerGroup: {}", this);
final PoolingClientConnectionManager p = new PoolingClientConnectionManager();
p.setDefaultMaxPerRoute(5);
p.closeIdleConnections(3, TimeUnit.SECONDS);
p.closeIdleConnections(3, SECONDS);
httpclient = new DefaultHttpClient(p);
}

/**
* Set repository URL.
**/
public void setRepositoryURL(String repositoryURL) {
public void setRepositoryURL(final String repositoryURL) {
this.repositoryURL = repositoryURL;
}

Expand All @@ -87,16 +94,9 @@ public String getRepositoryURL() {
/**
* Set indexers for this group.
**/
public void setIndexers(Set indexers) {
for (Iterator it = indexers.iterator(); it.hasNext();) {
Object o = it.next();
if (o instanceof Indexer) {
if (this.indexers == null) {
this.indexers = new HashSet<Indexer>();
}
this.indexers.add((Indexer) o);
}
}
public void setIndexers(final Set<Indexer> indexers) {
this.indexers = indexers;
LOGGER.debug("Using indexer complement: {} ", indexers);
}

/**
Expand All @@ -108,10 +108,11 @@ public Set<Indexer> getIndexers() {

/**
* Extract node path from Atom category list
*
* @return Node path or repositoryUrl if it's not found
*/
private String getPath(java.util.List<Category> categories) {
for (Category c : categories) {
private String getPath(final java.util.List<Category> categories) {
for (final Category c : categories) {
if (c.getLabel().equals("path")) {
return repositoryURL + c.getTerm();
}
Expand All @@ -122,46 +123,73 @@ private String getPath(java.util.List<Category> categories) {
/**
* Handle a JMS message representing an object update or deletion event.
**/
public void onMessage(Message message) {
@Override
public void onMessage(final Message message) {
try {
LOGGER.debug("Received message: {}", message.getJMSMessageID());
} catch (final JMSException e) {
LOGGER.error("Received unparseable message: {}", e);
propagate(e);
}
try {
if (message instanceof TextMessage) {
// get pid from message
final String xml = ((TextMessage) message).getText();
Document<Entry> doc = atomParser.parse(new StringReader(xml));
Entry entry = doc.getRoot();
LOGGER.debug("Received Atom message: {}", xml);
final Document<Entry> doc = atomParser.parse(new StringReader(xml));
final Entry entry = doc.getRoot();


final Boolean removal = "purgeObject".equals(entry.getTitle());

// if the object is updated, fetch current content
Boolean hasContent = false;
String content = null;
if (!"purgeObject".equals(entry.getTitle())) {
HttpGet get = new HttpGet(
if (!removal) {
final HttpGet get = new HttpGet(
getPath(entry.getCategories("xsd:string")));
HttpResponse response = httpclient.execute(get);
content = IOUtils.toString(response.getEntity()
.getContent(), Charset.forName("UTF-8"));
final HttpResponse response = httpclient.execute(get);
if (response.getStatusLine().getStatusCode() == SC_OK) {
content =
IOUtils.toString(response.getEntity().getContent(),
Charset.forName("UTF-8"));
hasContent = true;
}

}
// pid represents the full path. Alternative would be to send
// path separately in all calls
// String pid = getPath(entry.getCategories("xsd:string"))
// .replace("//objects", "/objects");
String pid = getPath(entry.getCategories("xsd:string"));

final String pid = getPath(entry.getCategories("xsd:string"));
LOGGER.debug("Operating with pid: {}", pid);

// call each registered indexer
for (Indexer indexer : indexers) {
LOGGER.debug("It is {} that this is a removal operation.",
removal);
for (final Indexer indexer : indexers) {
try {
if ("purgeObject".equals(entry.getTitle())) {
if (removal) {
indexer.remove(pid);
} else {
indexer.update(pid, content);
if (hasContent) {
indexer.update(pid, content);
} else {
LOGGER.error(
"Received update on {} but was unable to retrieve representation!",
pid);
}
}
} catch (Exception innerex) {
logger.warn("Error indexing {}, {}", pid, innerex);
} catch (final Exception e) {
LOGGER.error("Error indexing {}: {}!", pid, e);
}
}
}
} catch (JMSException e) {
logger.warn("Error processing JMS event", e);
} catch (IOException e) {
logger.warn("Error retrieving object from repository", e);
} catch (final JMSException e) {
LOGGER.error("Error processing JMS event!", e);
} catch (final IOException e) {
LOGGER.error("Error retrieving object from repository!", e);
}
}

}

0 comments on commit 64812b9

Please sign in to comment.