Skip to content

Commit

Permalink
Merge pull request #26 from futures/AtomJMSOutput
Browse files Browse the repository at this point in the history
Changed JMS Topic to be in Atom format
  • Loading branch information
ajs6f committed Feb 2, 2013
2 parents ab77635 + ca9f967 commit c1c7116
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 45 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -144,6 +144,11 @@
<artifactId>guava</artifactId>
<version>13.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.abdera</groupId>
<artifactId>abdera-parser</artifactId>
<version>1.1.3</version>
</dependency>
</dependencies>

<build>
Expand Down
29 changes: 26 additions & 3 deletions src/main/java/org/fcrepo/modeshape/observer/JMSTopicPublisher.java
@@ -1,18 +1,27 @@
package org.fcrepo.modeshape.observer;

import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.abdera.Abdera;
import org.apache.abdera.model.Entry;
import org.apache.abdera.model.Text.Type;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

Expand All @@ -28,13 +37,27 @@ public class JMSTopicPublisher {
private Session session;
private MessageProducer producer;

final static private Abdera abdera = new Abdera();

final private static Map<Integer, String> operationsMappings = ImmutableMap
.of(Event.NODE_ADDED, "ingest");

final private Logger logger = LoggerFactory
.getLogger(JMSTopicPublisher.class);

@Subscribe
public void publishJCREvent(Event jcrEvent) throws JMSException {
logger.debug("Putting event: " + jcrEvent.toString() + "onto JMS.");
producer.send(session.createTextMessage(jcrEvent.toString()));
public void publishJCREvent(Event jcrEvent) throws JMSException,
RepositoryException, IOException {
Entry entry = abdera.newEntry();
entry.setTitle(operationsMappings.get(jcrEvent.getType()), Type.TEXT)
.setBaseUri("http://localhost:8080/rest");
entry.addCategory("xsd:string", jcrEvent.getPath(), "fedora-types:pid");
StringWriter writer = new StringWriter();
entry.writeTo(writer);
String atomMessage = writer.toString();
producer.send(session.createTextMessage(atomMessage));

logger.debug("Put event: \n" + atomMessage + "\n onto JMS.");
}

@PostConstruct
Expand Down
118 changes: 118 additions & 0 deletions src/test/java/org/fcrepo/modeshape/observer/AtomJMSTest.java
@@ -0,0 +1,118 @@
package org.fcrepo.modeshape.observer;

import static org.junit.Assert.assertEquals;

import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.util.List;

import javax.inject.Inject;
import javax.jcr.LoginException;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.abdera.Abdera;
import org.apache.abdera.model.Category;
import org.apache.abdera.model.Entry;
import org.apache.abdera.parser.ParseException;
import org.apache.abdera.parser.Parser;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.modeshape.common.SystemFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({ "/spring/eventing.xml", "/spring/repo.xml" })
public class AtomJMSTest implements MessageListener {

@Inject
private Repository repository;

@Inject
private ActiveMQConnectionFactory connectionFactory;

private Connection connection;
private javax.jms.Session session;
private MessageConsumer consumer;

static Parser parser = new Abdera().getParser();
private Entry entry;

final private Logger logger = LoggerFactory.getLogger(AtomJMSTest.class);

@Test
public void testNodeCreation() throws LoginException, RepositoryException {
Session session = repository.login();
session.getRootNode().addNode("test1").addMixin("fedora:object");
session.save();
session.logout();
while (entry == null)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<Category> categories = entry.getCategories("xsd:string");
String path = null;
for (Category cat : categories) {
if (cat.getLabel().equals("fedora-types:pid")) {
logger.debug("Found Category with term: " + cat.getTerm());
path = cat.getTerm();
}
}
assertEquals("Got wrong pid!", "/test1", path);
assertEquals("Got wrong method!", "ingest", entry.getTitle());
}

@Override
public void onMessage(Message message) {
logger.debug("Received JMS message: " + message.toString());
TextMessage tMessage = (TextMessage) message;
try {
entry = (Entry) parser.parse(
new ByteArrayInputStream(tMessage.getText().getBytes(
"UTF-8"))).getRoot();
logger.debug("Parsed Entry: " + entry.toString());
} catch (ParseException e) {
throw new SystemFailureException(e);
} catch (JMSException e) {
throw new SystemFailureException(e);
} catch (UnsupportedEncodingException e) {
throw new SystemFailureException(e);
}

}

@Before
public void acquireConnection() throws JMSException {
logger.debug(this.getClass().getName() + " acquiring JMS connection.");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,
javax.jms.Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic("fedora"));
consumer.setMessageListener(this);
}

@After
public void releaseConnection() throws JMSException {
logger.debug(this.getClass().getName() + " releasing JMS connection.");
consumer.close();
session.close();
connection.close();
}

}
57 changes: 15 additions & 42 deletions src/test/java/org/fcrepo/modeshape/observer/SimpleObserverTest.java
Expand Up @@ -3,18 +3,12 @@
import static junit.framework.Assert.assertEquals;

import javax.inject.Inject;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.Event;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -26,47 +20,21 @@

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({ "/spring/eventing.xml", "/spring/repo.xml" })
public class SimpleObserverTest implements MessageListener {

private Integer eventBusMessageCount = 0;
private Integer jmsMessageCount = 0;
public class SimpleObserverTest {

private Integer eventBusMessageCount;
@Inject
private Repository repository;

@Inject
private SimpleObserver o;

@Inject
private EventBus eventBus;

@Inject
private ActiveMQConnectionFactory connectionFactory;

private Connection connection;
private javax.jms.Session session;

@Before
public void acquireConnections() throws JMSException {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,
javax.jms.Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session
.createTopic("fedora"));
consumer.setMessageListener(this);
}

@Test
public void TestSimpleIntegration() throws RepositoryException {

eventBus.register(this);
public void TestEventBusPublishing() throws RepositoryException {

Session se = repository.login();
Node testnode = se.getRootNode().addNode("/object1");
testnode.addMixin("fedora:object");
Node testnode2 = se.getRootNode().addNode("/object2");
testnode2.addMixin("fedora:object");
se.getRootNode().addNode("/object1").addMixin("fedora:object");
se.getRootNode().addNode("/object2").addMixin("fedora:object");
se.save();
se.logout();

Expand All @@ -81,17 +49,22 @@ public void TestSimpleIntegration() throws RepositoryException {

assertEquals("Where are my messages!?", (Integer) 2,
eventBusMessageCount);
assertEquals("Where are my messages!?", (Integer) 2, jmsMessageCount);

}

@Subscribe
public void countMessages(Event e) {
eventBusMessageCount++;
}

@Override
public void onMessage(Message message) {
jmsMessageCount++;
@Before
public void acquireConnections() {
eventBusMessageCount = 0;
eventBus.register(this);
}

@After
public void releaseConnections() {
eventBus.unregister(this);
}
}
20 changes: 20 additions & 0 deletions src/test/resources/logback.xml
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%p %d{HH:mm:ss.SSS} \(%c{0}\) %m%n</pattern>
</encoder>
</appender>
<logger name="org.fcrepo" additivity="false" level="DEBUG">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="org.modeshape" additivity="false" level="INFO">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="org.apache.abdera" additivity="false" level="DEBUG">
<appender-ref ref="STDOUT"/>
</logger>
<root additivity="false" level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

0 comments on commit c1c7116

Please sign in to comment.