Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #178 from futures/FixTheHellOutOfJMS
  • Loading branch information
escowles committed Dec 3, 2013
2 parents ceae23e + 139cffb commit fff0aa7
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 5 deletions.
@@ -0,0 +1,66 @@
/**
* Copyright 2013 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.jms.headers;

import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;

import java.io.IOException;

import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.Event;
import javax.jms.JMSException;
import javax.jms.Message;

import org.fcrepo.jms.observer.JMSEventMessageFactory;
import org.fcrepo.kernel.utils.EventType;

/**
* Generates JMS {@link Message}s composed entirely of headers, based entirely
* on information found in the JCR {@link Event} that triggers publication.
*
* @author ajs6f
* @date Dec 2, 2013
*/
public class DefaultMessageFactory implements JMSEventMessageFactory {

public static final String TIMESTAMP_HEADER_NAME = REPOSITORY_NAMESPACE
+ "timestamp";

public static final String IDENTIFIER_HEADER_NAME = REPOSITORY_NAMESPACE
+ "identifier";

public static final String EVENT_TYPE_HEADER_NAME = REPOSITORY_NAMESPACE
+ "eventType";

@Override
public Message getMessage(final Event jcrEvent, final Session jcrSession,
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException {
final Message message = jmsSession.createMessage();
message.setLongProperty(TIMESTAMP_HEADER_NAME, jcrEvent.getDate());
message.setStringProperty(IDENTIFIER_HEADER_NAME, jcrEvent.getPath());
message.setStringProperty(EVENT_TYPE_HEADER_NAME, getEventURI(jcrEvent
.getType()));
return message;
}

private static String getEventURI(final int type) {
return REPOSITORY_NAMESPACE + EventType.valueOf(type).toString();
}

}
Expand Up @@ -69,7 +69,7 @@ public class JMSTopicPublisher {
/**
* When an EventBus mesage is received, map it to our JMS
* message payload and push it onto the queue.
*
*
* @param fedoraEvent
* @throws JMSException
* @throws RepositoryException
Expand All @@ -84,12 +84,12 @@ public void publishJCREvent(final Event fedoraEvent) throws JMSException,
LOGGER.debug("Transformed the event to a JMS message.");
producer.send(tm);

LOGGER.debug("Put event: \n{}\n onto JMS.", tm.getJMSMessageID());
LOGGER.debug("Put event: {} onto JMS.", tm.getJMSMessageID());
}

/**
* Connect to JCR Repostory and JMS queue
*
*
* @throws JMSException
* @throws RepositoryException
*/
Expand All @@ -108,7 +108,7 @@ public void acquireConnections() throws JMSException, RepositoryException {

/**
* Close external connections
*
*
* @throws JMSException
*/
@PreDestroy
Expand Down
Expand Up @@ -51,12 +51,14 @@
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"/spring-test/jms.xml", "/spring-test/repo.xml",
@ContextConfiguration({"/spring-test/atom-jms.xml", "/spring-test/repo.xml",
"/spring-test/eventing.xml"})
@DirtiesContext
public class AtomJMSIT implements MessageListener {

@Inject
Expand Down
@@ -0,0 +1,212 @@
/**
* Copyright 2013 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.integration.jms.observer;

import static com.google.common.base.Throwables.propagate;
import static java.lang.System.currentTimeMillis;
import static javax.jcr.observation.Event.NODE_ADDED;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.fcrepo.jcr.FedoraJcrTypes.FEDORA_OBJECT;
import static org.fcrepo.jms.headers.DefaultMessageFactory.EVENT_TYPE_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.IDENTIFIER_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.TIMESTAMP_HEADER_NAME;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.junit.Assert.assertTrue;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.HashSet;
import java.util.Set;

import javax.inject.Inject;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.fcrepo.kernel.utils.EventType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.modeshape.jcr.api.JcrTools;
import org.slf4j.Logger;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"/spring-test/headers-jms.xml", "/spring-test/repo.xml",
"/spring-test/eventing.xml"})
@DirtiesContext
public class HeadersJMSIT implements MessageListener {

@Inject
private Repository repository;

@Inject
private ActiveMQConnectionFactory connectionFactory;

private Connection connection;

private javax.jms.Session session;

private MessageConsumer consumer;

private volatile Set<Message> messages;

private JcrTools jcrTools = new JcrTools(true);

private static final Logger LOGGER = getLogger(HeadersJMSIT.class);

/**
* Time to wait for a set of test messages.
*/
private static final long TIMEOUT = 2000;

@Test
public void testIngestion() throws RepositoryException,
InterruptedException, JMSException {

final String pid = "/testIngestion";
final String expectedEventType =
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_ADDED).toString();
LOGGER.debug("Expecting a {} event", expectedEventType);
Boolean success = false;

final Session session = repository.login();
try {
final Node node = jcrTools.findOrCreateNode(session, pid);
node.addMixin(FEDORA_OBJECT);
session.save();

final Long start = currentTimeMillis();
synchronized (this) {
while ((currentTimeMillis() - start < TIMEOUT) && (!success)) {
for (final Message message : messages) {
if (getIdentifier(message).equals(pid)) {
if (getEventType(message).equals(expectedEventType)) {
success = true;
}
}
}
LOGGER.debug("Waiting for next message...");
wait(1000);
}
}
} finally {
session.logout();
}
assertTrue(
"Found no message with correct identifer and correct event type!",
success);
}

@Test
public void testRemoval() throws RepositoryException, InterruptedException,
JMSException {

final String pid = "/testRemoval";
final String expectedEventType =
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_REMOVED).toString();
LOGGER.debug("Expecting a {} event", expectedEventType);
Boolean success = false;

final Session session = repository.login();
try {
final Node node = jcrTools.findOrCreateNode(session, pid);
node.addMixin(FEDORA_OBJECT);
session.save();
node.remove();
session.save();

final Long start = currentTimeMillis();
synchronized (this) {
while ((currentTimeMillis() - start < TIMEOUT) && (!success)) {
for (final Message message : messages) {
if (getIdentifier(message).equals(pid)) {
if (getEventType(message).equals(expectedEventType)) {
success = true;
}
}
}
LOGGER.debug("Waiting for next message...");
wait(1000);
}
}
} finally {
session.logout();
}
assertTrue(
"Found no message with correct identifer and correct event type!",
success);
}

@Override
public void onMessage(final Message message) {
try {
LOGGER.debug(
"Received JMS message: {} with identifier: {}, timestamp: {}, and event type: {}",
message.getJMSMessageID(), getIdentifier(message),
getTimestamp(message), getEventType(message));
} catch (final JMSException e) {
propagate(e);
}
messages.add(message);
synchronized (this) {
this.notifyAll();
}
}

@Before
public void acquireConnection() throws JMSException {
LOGGER.debug(this.getClass().getName() + " acquiring JMS connection.");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic("fedora"));
messages = new HashSet<>();
consumer.setMessageListener(this);
}

@After
public void releaseConnection() throws JMSException {
LOGGER.debug(this.getClass().getName() + " releasing JMS connection.");
consumer.close();
session.close();
connection.close();
}

private static String getIdentifier(final Message msg) throws JMSException {
return msg.getStringProperty(IDENTIFIER_HEADER_NAME);
}

private static String getEventType(final Message msg) throws JMSException {
return msg.getStringProperty(EVENT_TYPE_HEADER_NAME);
}

private static Long getTimestamp(final Message msg) throws JMSException {
return msg.getLongProperty(TIMESTAMP_HEADER_NAME);
}

}

0 comments on commit fff0aa7

Please sign in to comment.