Skip to content

Commit

Permalink
Working integration tests\!
Browse files Browse the repository at this point in the history
  • Loading branch information
ajs6f committed Dec 2, 2013
1 parent c1e9267 commit fc49cc3
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 7 deletions.
Expand Up @@ -49,8 +49,8 @@ public class DefaultMessageFactory implements JMSEventMessageFactory {

@Override
public Message getMessage(final Event jcrEvent, final Session jcrSession,
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException {
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException {
final Message message = jmsSession.createMessage();
message.setLongProperty(TIMESTAMP_HEADER_NAME, jcrEvent.getDate());
message.setStringProperty(IDENTIFIER_HEADER_NAME, jcrEvent.getPath());
Expand Down
Expand Up @@ -69,7 +69,7 @@ public class JMSTopicPublisher {
/**
* When an EventBus mesage is received, map it to our JMS
* message payload and push it onto the queue.
*
*
* @param fedoraEvent
* @throws JMSException
* @throws RepositoryException
Expand All @@ -84,12 +84,12 @@ public void publishJCREvent(final Event fedoraEvent) throws JMSException,
LOGGER.debug("Transformed the event to a JMS message.");
producer.send(tm);

LOGGER.debug("Put event: \n{}\n onto JMS.", tm.getJMSMessageID());
LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID());
}

/**
* Connect to JCR Repostory and JMS queue
*
*
* @throws JMSException
* @throws RepositoryException
*/
Expand All @@ -108,7 +108,7 @@ public void acquireConnections() throws JMSException, RepositoryException {

/**
* Close external connections
*
*
* @throws JMSException
*/
@PreDestroy
Expand Down
Expand Up @@ -51,12 +51,14 @@
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"/spring-test/jms.xml", "/spring-test/repo.xml",
@ContextConfiguration({"/spring-test/atom-jms.xml", "/spring-test/repo.xml",
"/spring-test/eventing.xml"})
@DirtiesContext
public class AtomJMSIT implements MessageListener {

@Inject
Expand Down
@@ -0,0 +1,221 @@
/**
* Copyright 2013 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.integration.jms.observer;

import static com.google.common.base.Throwables.propagate;
import static java.lang.System.currentTimeMillis;
import static javax.jcr.observation.Event.NODE_ADDED;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.fcrepo.jcr.FedoraJcrTypes.FEDORA_OBJECT;
import static org.fcrepo.jms.headers.DefaultMessageFactory.EVENT_TYPE_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.IDENTIFIER_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.TIMESTAMP_HEADER_NAME;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.HashSet;
import java.util.Set;

import javax.inject.Inject;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.fcrepo.kernel.utils.EventType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.modeshape.jcr.api.JcrTools;
import org.slf4j.Logger;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"/spring-test/headers-jms.xml", "/spring-test/repo.xml",
"/spring-test/eventing.xml"})
@DirtiesContext
public class HeadersJMSIT implements MessageListener {

@Inject
private Repository repository;

@Inject
private ActiveMQConnectionFactory connectionFactory;

private Connection connection;

private javax.jms.Session session;

private MessageConsumer consumer;

private volatile Set<Message> messages;

private JcrTools jcrTools = new JcrTools(true);

private static final Logger LOGGER = getLogger(HeadersJMSIT.class);

/**
* Time to wait for a set of test messages.
*/
private static final long TIMEOUT = 2000;

@Test
public void testIngestion() throws RepositoryException,
InterruptedException, JMSException {

final String pid = "/testIngestion";
final String expectedEventType =
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_ADDED).toString();
LOGGER.debug("Expecting a {} event", expectedEventType);

final Session session = repository.login();
final Node node = jcrTools.findOrCreateNode(session, pid);
node.addMixin(FEDORA_OBJECT);
session.save();

waitForEntry(1);
session.logout();

if (messages.isEmpty()) {
fail("Waited a second, got no messages");
}

Boolean success = false;
for (final Message message : messages) {
if (getIdentifier(message).equals(pid)) {
if (getEventType(message).equals(expectedEventType)) {
success = true;
}
}
}
assertTrue(
"Found no message with correct identifer and correct event type!",
success);
}

@Test
public void testRemoval() throws RepositoryException, InterruptedException,
JMSException {

final String pid = "/testRemoval";
final String expectedEventType =
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_REMOVED).toString();
LOGGER.debug("Expecting a {} event", expectedEventType);

final int minEntriesSize = 2;
final Session session = repository.login();
final Node node = jcrTools.findOrCreateNode(session, pid);
node.addMixin(FEDORA_OBJECT);
session.save();
node.remove();
session.save();
waitForEntry(minEntriesSize);
session.logout();

if (messages.isEmpty()) {
fail("Waited a second, got no messages");
}
assertEquals("Entries size not " + minEntriesSize, minEntriesSize,
messages.size());

Boolean success = false;
for (final Message message : messages) {
if (getIdentifier(message).equals(pid)) {
if (getEventType(message).equals(expectedEventType)) {
success = true;
}
}
}
assertTrue(
"Found no message with correct identifer and correct event type!",
success);
}

@Override
public void onMessage(final Message message) {
try {
LOGGER.debug(
"Received JMS message: {} with identifier: {}, timestamp: {}, and event type: {}",
message.getJMSMessageID(), getIdentifier(message),
getTimestamp(message), getEventType(message));
} catch (final JMSException e) {
propagate(e);
}
messages.add(message);
synchronized (this) {
this.notifyAll();
}
}

@Before
public void acquireConnection() throws JMSException {
LOGGER.debug(this.getClass().getName() + " acquiring JMS connection.");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic("fedora"));
messages = new HashSet<>();
consumer.setMessageListener(this);
}

@After
public void releaseConnection() throws JMSException {
LOGGER.debug(this.getClass().getName() + " releasing JMS connection.");
consumer.close();
session.close();
connection.close();
}

private void waitForEntry(final int expectedNumOfMsgs)
throws InterruptedException {
// wait to recv event
final Long start = currentTimeMillis();
synchronized (this) {
while ((currentTimeMillis() - start < TIMEOUT)
&& (messages.size() < expectedNumOfMsgs)) {
LOGGER.debug("Waiting for next message...");
wait(1000);
}
}
}

private static String getIdentifier(final Message msg) throws JMSException {
return msg.getStringProperty(IDENTIFIER_HEADER_NAME);
}

private static String getEventType(final Message msg) throws JMSException {
return msg.getStringProperty(EVENT_TYPE_HEADER_NAME);
}

private static Long getTimestamp(final Message msg) throws JMSException {
return msg.getLongProperty(TIMESTAMP_HEADER_NAME);
}

}
3 changes: 3 additions & 0 deletions fcrepo-jms/src/test/resources/logback-test.xml
Expand Up @@ -10,6 +10,9 @@
<logger name="org.fcrepo.jms" additivity="false" level="DEBUG">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="org.fcrepo.integration.jms" additivity="false" level="DEBUG">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="org.fcrepo" additivity="false" level="INFO">
<appender-ref ref="STDOUT"/>
</logger>
Expand Down
24 changes: 24 additions & 0 deletions fcrepo-jms/src/test/resources/spring-test/headers-jms.xml
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<!-- Context that supports JMS publication-->

<context:annotation-config/>

<!-- publishes events from the internal bus to JMS, in Atom format -->
<bean class="org.fcrepo.jms.observer.JMSTopicPublisher"/>

<amq:connectionFactory id="connectionFactory"
brokerURL="vm://localhost?broker.persistent=false&amp;broker.useJmx=false&amp;broker.enableStatistics=false"/>

<bean class="org.fcrepo.jms.headers.DefaultMessageFactory"/>

</beans>

0 comments on commit fc49cc3

Please sign in to comment.