Skip to content

Commit

Permalink
Merge pull request #25 from futures/ActiveMQTopicPublishing
Browse files Browse the repository at this point in the history
Publish to JMS topic
  • Loading branch information
cbeer committed Jan 31, 2013
2 parents a3373cd + b32ef8f commit 42973f2
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 22 deletions.
57 changes: 57 additions & 0 deletions src/main/java/org/fcrepo/modeshape/observer/JMSTopicPublisher.java
@@ -0,0 +1,57 @@
package org.fcrepo.modeshape.observer;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.jcr.observation.Event;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class JMSTopicPublisher {

@Inject
EventBus eventBus;

@Inject
private ActiveMQConnectionFactory connectionFactory;

private Connection connection;
private Session session;
private MessageProducer producer;

final private Logger logger = LoggerFactory
.getLogger(JMSTopicPublisher.class);

@Subscribe
public void publishJCREvent(Event jcrEvent) throws JMSException {
logger.debug("Putting event: " + jcrEvent.toString() + "onto JMS.");
producer.send(session.createTextMessage(jcrEvent.toString()));
}

@PostConstruct
public void acquireConnections() throws JMSException {
logger.debug("Initializing " + this.getClass().getCanonicalName());
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createTopic("fedora"));
eventBus.register(this);
}

@PreDestroy
public void releaseConnections() throws JMSException {
producer.close();
session.close();
connection.close();
eventBus.unregister(this);
}
}
Expand Up @@ -10,6 +10,8 @@
import javax.jcr.observation.EventListener;

import org.modeshape.jcr.api.Repository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList.Builder;
import com.google.common.eventbus.EventBus;
Expand All @@ -29,6 +31,8 @@ public class SimpleObserver implements EventListener {
@Inject
private EventFilter eventFilter;

final private Logger logger = LoggerFactory.getLogger(SimpleObserver.class);

@PostConstruct
public void buildListener() throws RepositoryException {
repository
Expand All @@ -42,8 +46,10 @@ public void buildListener() throws RepositoryException {
@Override
public void onEvent(EventIterator events) {
for (Event e : filter(new Builder<Event>().addAll(events).build(),
eventFilter))
eventFilter)) {
logger.debug("Putting event: " + e.toString() + " on the bus.");
eventBus.post(e);
}
}

}
24 changes: 14 additions & 10 deletions src/main/resources/spring/eventing.xml
Expand Up @@ -2,24 +2,28 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<context:annotation-config/>

<bean class="org.fcrepo.modeshape.observer.SimpleObserver"/>

<bean name="fedoraEventFilter" class="org.fcrepo.modeshape.observer.DefaultFilter" />


<bean class="org.fcrepo.modeshape.observer.JMSTopicPublisher"/>

<bean name="fedoraEventFilter" class="org.fcrepo.modeshape.observer.DefaultFilter"/>

<bean name="fedoraInternalEventBus" class="com.google.common.eventbus.EventBus"/>

<!--
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
depends-on="jmsBroker">
<property name="brokerURL" value="vm://localhost"/>
</bean>

<bean name="jmsBroker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:/spring/activemq.xml" />
<property name="start" value="true" />
<property name="config" value="classpath:/spring/activemq.xml"/>
<property name="start" value="true"/>
</bean>
-->

</beans>
57 changes: 46 additions & 11 deletions src/test/java/org/fcrepo/modeshape/observer/SimpleObserverTest.java
@@ -1,5 +1,21 @@
package org.fcrepo.modeshape.observer;

import static junit.framework.Assert.assertEquals;

import javax.inject.Inject;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.Event;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
Expand All @@ -8,17 +24,12 @@
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

import javax.inject.Inject;
import javax.jcr.*;
import javax.jcr.observation.Event;

import static junit.framework.Assert.assertEquals;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/spring/eventing.xml")
public class SimpleObserverTest {
@ContextConfiguration({ "/spring/eventing.xml", "/spring/repo.xml" })
public class SimpleObserverTest implements MessageListener {

private Integer messageCount = 0;
private Integer eventBusMessageCount = 0;
private Integer jmsMessageCount = 0;

@Inject
private Repository repository;
Expand All @@ -29,6 +40,23 @@ public class SimpleObserverTest {
@Inject
private EventBus eventBus;

@Inject
private ActiveMQConnectionFactory connectionFactory;

private Connection connection;
private javax.jms.Session session;

@Before
public void acquireConnections() throws JMSException {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,
javax.jms.Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session
.createTopic("fedora"));
consumer.setMessageListener(this);
}

@Test
public void TestSimpleIntegration() throws RepositoryException {

Expand All @@ -51,12 +79,19 @@ public void TestSimpleIntegration() throws RepositoryException {
// Should be two messages, for each time
// each node becomes a Fedora object

assertEquals("Where are my messages!?", (Integer) 2, messageCount);
assertEquals("Where are my messages!?", (Integer) 2,
eventBusMessageCount);
assertEquals("Where are my messages!?", (Integer) 2, jmsMessageCount);
}

@Subscribe
public void countMessages(Event e) {
messageCount++;
eventBusMessageCount++;
}

@Override
public void onMessage(Message message) {
jmsMessageCount++;
}

}

0 comments on commit 42973f2

Please sign in to comment.