Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix for #17
  • Loading branch information
ajs6f committed Feb 14, 2013
1 parent 35cda0b commit b89a146
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 145 deletions.
305 changes: 166 additions & 139 deletions fcrepo-jms/src/main/java/org/fcrepo/observer/JMSTopicAtomPublisher.java
@@ -1,3 +1,4 @@

package org.fcrepo.observer;

import static com.google.common.collect.Iterables.any;
Expand All @@ -7,6 +8,8 @@
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static javax.jcr.observation.Event.PROPERTY_REMOVED;
import static org.apache.abdera.model.Text.Type.TEXT;
import static org.fcrepo.utils.FedoraTypesUtils.isFedoraDatastream;
import static org.fcrepo.utils.FedoraTypesUtils.isFedoraObject;

import java.io.IOException;
import java.io.StringWriter;
Expand All @@ -16,6 +19,7 @@
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.jcr.LoginException;
import javax.jcr.Node;
import javax.jcr.PathNotFoundException;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
Expand All @@ -28,7 +32,6 @@

import org.apache.abdera.Abdera;
import org.apache.abdera.model.Entry;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,143 +43,167 @@

public class JMSTopicAtomPublisher {

@Inject
EventBus eventBus;

@Inject
private Repository repo;

@Inject
private ActiveMQConnectionFactory connectionFactory;

private Connection connection;
private Session jmsSession;
private MessageProducer producer;

// Atom engine
final static private Abdera abdera = new Abdera();

// maps JCR mutations to Fedora Classic API method types
private OperationsMappings operationsMappings;

final private Logger logger = LoggerFactory
.getLogger(JMSTopicAtomPublisher.class);

@Subscribe
public void publishJCREvent(Event jcrEvent) throws JMSException,
RepositoryException, IOException {

Entry entry = abdera.newEntry();

entry.setTitle(operationsMappings.getFedoraMethodType(jcrEvent), TEXT)
.setBaseUri("http://localhost:8080/rest");

// assume that the PID is the last section of the node path
String path = jcrEvent.getPath();
String pid = path.substring(path.lastIndexOf('/') + 1, path.length());
entry.addCategory("xsd:string", pid, "fedora-types:pid");

StringWriter writer = new StringWriter();
entry.writeTo(writer);
String atomMessage = writer.toString();
producer.send(jmsSession.createTextMessage(atomMessage));

logger.debug("Put event: \n" + atomMessage + "\n onto JMS.");
}

@PostConstruct
public void acquireConnections() throws JMSException, LoginException,
RepositoryException {
logger.debug("Initializing: " + this.getClass().getCanonicalName());

operationsMappings = new OperationsMappings();

connection = connectionFactory.createConnection();
connection.start();
jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = jmsSession.createProducer(jmsSession.createTopic("fedora"));
eventBus.register(this);
}

@PreDestroy
public void releaseConnections() throws JMSException {
logger.debug("Tearing down: " + this.getClass().getCanonicalName());

operationsMappings.session.logout();

producer.close();
jmsSession.close();
connection.close();
eventBus.unregister(this);
}

// maps JCR mutations to Fedora Classic API method types
final private class OperationsMappings {

// this actor will never mutate the state of the repo,
// so we keep the session live for efficiency
private javax.jcr.Session session;

public String getFedoraMethodType(Event jcrEvent)
throws PathNotFoundException, RepositoryException {

// we need to know if this is an object or a datastream
Set<NodeType> nodeTypes = Sets.newHashSet(session.getNode(
jcrEvent.getPath()).getMixinNodeTypes());

// Now we can select from the combination of JCR Event type
// and resource type to determine a Fedora Classic API method
Integer eventType = jcrEvent.getType();

if (any(nodeTypes, isObjectNodeType)) {
switch (eventType) {
case NODE_ADDED:
return "ingest";
case NODE_REMOVED:
return "purgeObject";
case PROPERTY_ADDED:
return "modifyObject";
case PROPERTY_CHANGED:
return "modifyObject";
case PROPERTY_REMOVED:
return "modifyObject";
}
}
if (any(nodeTypes, isDatastreamNodeType)) {
switch (eventType) {
case NODE_ADDED:
return "addDatastream";
case NODE_REMOVED:
return "purgeDatastream";
case PROPERTY_ADDED:
return "modifyDatastream";
case PROPERTY_CHANGED:
return "modifyDatastream";
case PROPERTY_REMOVED:
return "modifyDatastream";
}
}
return null;
}

private Predicate<NodeType> isObjectNodeType = new Predicate<NodeType>() {
@Override
public boolean apply(NodeType type) {
return type.getName().equals("fedora:object");
}
};

private Predicate<NodeType> isDatastreamNodeType = new Predicate<NodeType>() {
@Override
public boolean apply(NodeType type) {
return type.getName().equals("fedora:datastream");
}
};

OperationsMappings() throws LoginException, RepositoryException {
session = repo.login();
}

}
@Inject
EventBus eventBus;

@Inject
private Repository repo;

@Inject
private ActiveMQConnectionFactory connectionFactory;

private Connection connection;

private Session jmsSession;

private MessageProducer producer;

// Atom engine
final static private Abdera abdera = new Abdera();

// maps JCR mutations to Fedora Classic API method types
private OperationsMappings operationsMappings;

final private Logger logger = LoggerFactory
.getLogger(JMSTopicAtomPublisher.class);

private javax.jcr.Session session;

@Subscribe
public void publishJCREvent(Event jcrEvent) throws JMSException,
RepositoryException, IOException {

String path = jcrEvent.getPath();
Node resource = session.getNode(path);

Entry entry = abdera.newEntry();

entry.setTitle(operationsMappings.getFedoraMethodType(jcrEvent), TEXT)
.setBaseUri("http://localhost:8080/rest");

//do the following only for datastreams
if (isFedoraDatastream.apply(resource)) {
entry.addCategory("xsd:string", resource.getParent().getName(),
"fedora-types:pid");
entry.addCategory("xsd:string", resource.getName(),
"fedora-types:dsID");
}
//do the following only for objects
if (isFedoraObject.apply(resource)) {
entry.addCategory("xsd:string", resource.getName(),
"fedora-types:pid");
}

StringWriter writer = new StringWriter();
entry.writeTo(writer);
String atomMessage = writer.toString();
producer.send(jmsSession.createTextMessage(atomMessage));

logger.debug("Put event: \n" + atomMessage + "\n onto JMS.");
}

@PostConstruct
public void acquireConnections() throws JMSException, LoginException,
RepositoryException {
logger.debug("Initializing: " + this.getClass().getCanonicalName());

operationsMappings = new OperationsMappings();

connection = connectionFactory.createConnection();
connection.start();
jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = jmsSession.createProducer(jmsSession.createTopic("fedora"));
eventBus.register(this);

session = repo.login();
}

@PreDestroy
public void releaseConnections() throws JMSException {
logger.debug("Tearing down: " + this.getClass().getCanonicalName());

operationsMappings.session.logout();

producer.close();
jmsSession.close();
connection.close();
eventBus.unregister(this);
session.logout();
}

// maps JCR mutations to Fedora Classic API method types
final private class OperationsMappings {

// this actor will never mutate the state of the repo,
// so we keep the session live for efficiency
private javax.jcr.Session session;

public String getFedoraMethodType(Event jcrEvent)
throws PathNotFoundException, RepositoryException {

// we need to know if this is an object or a datastream
Set<NodeType> nodeTypes =
Sets.newHashSet(session.getNode(jcrEvent.getPath())
.getMixinNodeTypes());

// Now we can select from the combination of JCR Event type
// and resource type to determine a Fedora Classic API method
Integer eventType = jcrEvent.getType();

if (any(nodeTypes, isObjectNodeType)) {
switch (eventType) {
case NODE_ADDED:
return "ingest";
case NODE_REMOVED:
return "purgeObject";
case PROPERTY_ADDED:
return "modifyObject";
case PROPERTY_CHANGED:
return "modifyObject";
case PROPERTY_REMOVED:
return "modifyObject";
}
}
if (any(nodeTypes, isDatastreamNodeType)) {
switch (eventType) {
case NODE_ADDED:
return "addDatastream";
case NODE_REMOVED:
return "purgeDatastream";
case PROPERTY_ADDED:
return "modifyDatastream";
case PROPERTY_CHANGED:
return "modifyDatastream";
case PROPERTY_REMOVED:
return "modifyDatastream";
}
}
return null;
}

private Predicate<NodeType> isObjectNodeType =
new Predicate<NodeType>() {

@Override
public boolean apply(NodeType type) {
return type.getName().equals("fedora:object");
}
};

private Predicate<NodeType> isDatastreamNodeType =
new Predicate<NodeType>() {

@Override
public boolean apply(NodeType type) {
return type.getName().equals("fedora:datastream");
}
};

OperationsMappings()
throws LoginException, RepositoryException {
session = repo.login();
}

}

}

0 comments on commit b89a146

Please sign in to comment.