Skip to content

Commit

Permalink
Add Awaitility library/logic to JMS testing.
Browse files Browse the repository at this point in the history
- Clean up HeadersJMSIT
- Add verify to JMSTopicPublisherTest

Resolves: https://jira.duraspace.org/browse/FCREPO-1260
  • Loading branch information
ajs6f authored and Andrew Woods committed Jan 12, 2015
1 parent 5c889d1 commit 41e3fb8
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 96 deletions.
6 changes: 6 additions & 0 deletions fcrepo-jms/pom.xml
Expand Up @@ -103,6 +103,12 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>1.6.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Expand Up @@ -13,28 +13,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.integration.jms.observer;

import static com.google.common.base.Throwables.propagate;
import static java.lang.System.currentTimeMillis;
import static com.google.common.collect.Iterables.any;
import static com.jayway.awaitility.Awaitility.await;
import static com.jayway.awaitility.Duration.ONE_SECOND;
import static javax.jcr.observation.Event.NODE_ADDED;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.fcrepo.kernel.FedoraJcrTypes.FEDORA_CONTAINER;
import static org.fcrepo.jms.headers.DefaultMessageFactory.BASE_URL_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.EVENT_TYPE_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.IDENTIFIER_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.PROPERTIES_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.TIMESTAMP_HEADER_NAME;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.junit.Assert.assertTrue;
import static org.jgroups.util.UUID.randomUUID;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;

import javax.inject.Inject;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
Expand All @@ -45,31 +47,55 @@
import javax.jms.MessageListener;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.fcrepo.kernel.models.Container;
import org.fcrepo.kernel.services.ContainerService;
import org.fcrepo.kernel.utils.EventType;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.modeshape.jcr.api.JcrTools;
import org.slf4j.Logger;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.google.common.base.Predicate;


/**
* <p>HeadersJMSIT class.</p>
* <p>
* HeadersJMSIT class.
* </p>
*
* @author ajs6f
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"/spring-test/headers-jms.xml", "/spring-test/repo.xml",
"/spring-test/eventing.xml"})
@ContextConfiguration({ "/spring-test/headers-jms.xml", "/spring-test/repo.xml",
"/spring-test/eventing.xml" })
@DirtiesContext
public class HeadersJMSIT implements MessageListener {

/**
* Time to wait for a set of test messages, in milliseconds.
*/
private static final long TIMEOUT = 20000;

private static final String testIngested = "/testMessageFromIngestion-" + randomUUID();

private static final String testRemoved = "/testMessageFromRemoval-" + randomUUID();

private static final String INGESTION_EVENT_TYPE = REPOSITORY_NAMESPACE + EventType.valueOf(NODE_ADDED).toString();

private static final String REMOVAL_EVENT_TYPE = REPOSITORY_NAMESPACE + EventType.valueOf(NODE_REMOVED).toString();

@Inject
private Repository repository;

@Inject
private ContainerService containerService;

@Inject
private ActiveMQConnectionFactory connectionFactory;

Expand All @@ -79,108 +105,74 @@ public class HeadersJMSIT implements MessageListener {

private MessageConsumer consumer;

private volatile Set<Message> messages;

private JcrTools jcrTools = new JcrTools(true);
private volatile Set<Message> messages = new HashSet<>();

private static final Logger LOGGER = getLogger(HeadersJMSIT.class);

/**
* Time to wait for a set of test messages.
*/
private static final long TIMEOUT = 2000;
@Test(timeout = TIMEOUT)
public void testIngestion() throws RepositoryException {

@Test
public void testIngestion() throws RepositoryException,
InterruptedException, JMSException {

final String pid = "/testIngestion";
final String expectedEventType =
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_ADDED).toString();
LOGGER.debug("Expecting a {} event", expectedEventType);
Boolean success = false;
LOGGER.debug("Expecting a {} event", INGESTION_EVENT_TYPE);

final Session session = repository.login();
try {
final Node node = jcrTools.findOrCreateNode(session, pid);
node.addMixin(FEDORA_CONTAINER);
containerService.findOrCreate(session, testIngested);
session.save();

final Long start = currentTimeMillis();
synchronized (this) {
while ((currentTimeMillis() - start < TIMEOUT) && (!success)) {
for (final Message message : messages) {
if (getIdentifier(message).equals(pid)) {
if (getEventTypes(message).contains(expectedEventType)) {
success = true;
}
}
}
LOGGER.debug("Waiting for next message...");
wait(1000);
}
}
awaitMessageOrFail(testIngested, INGESTION_EVENT_TYPE);
} finally {
session.logout();
}
assertTrue(
"Found no message with correct identifer and correct event type!",
success);
}

@Test
public void testRemoval() throws RepositoryException, InterruptedException,
JMSException {
private void awaitMessageOrFail(final String id, final String eventType) {
await().pollInterval(ONE_SECOND).until(new Callable<Boolean>() {

@Override
public Boolean call() {
return any(messages, new Predicate<Message>() {

@Override
public boolean apply(final Message message) {
try {
return getIdentifier(message).equals(id) &&
getEventTypes(message).contains(eventType);
} catch (final JMSException e) {
throw propagate(e);
}
}
});
}
});
}

final String pid = "/testRemoval";
final String expectedEventType =
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_REMOVED).toString();
LOGGER.debug("Expecting a {} event", expectedEventType);
Boolean success = false;
@Test(timeout = TIMEOUT)
public void testRemoval() throws RepositoryException {

LOGGER.debug("Expecting a {} event", REMOVAL_EVENT_TYPE);
final Session session = repository.login();
try {
final Node node = jcrTools.findOrCreateNode(session, pid);
node.addMixin(FEDORA_CONTAINER);
final Container resource = containerService.findOrCreate(session, testRemoved);
session.save();
node.remove();
resource.delete();
session.save();

final Long start = currentTimeMillis();
synchronized (this) {
while ((currentTimeMillis() - start < TIMEOUT) && (!success)) {
for (final Message message : messages) {
if (getIdentifier(message).equals(pid)) {
if (getEventTypes(message).contains(expectedEventType)) {
success = true;
}
}
}
LOGGER.debug("Waiting for next message...");
wait(1000);
}
}
awaitMessageOrFail(testRemoved, REMOVAL_EVENT_TYPE);
} finally {
session.logout();
}
assertTrue(
"Found no message with correct identifer and correct event type!",
success);
}

@Override
public void onMessage(final Message message) {
try {
LOGGER.debug( "Received JMS message: {} with identifier: {}, timestamp: {}, event type: {}, properties: {},"
+ " and baseURL: {}", message.getJMSMessageID(), getIdentifier(message), getTimestamp(message),
getEventTypes(message), getProperties(message), getBaseURL(message));
LOGGER.debug(
"Received JMS message: {} with identifier: {}, timestamp: {}, event type: {}, properties: {},"
+ " and baseURL: {}", message.getJMSMessageID(), getIdentifier(message),
getTimestamp(message),
getEventTypes(message), getProperties(message), getBaseURL(message));
} catch (final JMSException e) {
propagate(e);
}
messages.add(message);
synchronized (this) {
this.notifyAll();
}
}

@Before
Expand All @@ -190,7 +182,7 @@ public void acquireConnection() throws JMSException {
connection.start();
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic("fedora"));
messages = new HashSet<>();
messages.clear();
consumer.setMessageListener(this);
}

Expand Down
Expand Up @@ -47,10 +47,10 @@
*/
public class JMSTopicPublisherTest {

private JMSTopicPublisher testObj;
private JMSTopicPublisher testJMSTopicPublisher;

@Mock
private JMSEventMessageFactory mockEvents;
private JMSEventMessageFactory mockEventFactory;

@Mock
private MessageProducer mockProducer;
Expand All @@ -69,39 +69,40 @@ public class JMSTopicPublisherTest {

@Before
public void setUp() {
testObj = new JMSTopicPublisher();
testJMSTopicPublisher = new JMSTopicPublisher();
initMocks(this);
setField(testObj, "eventFactory", mockEvents);
setField(testObj, "producer", mockProducer);
setField(testObj, "connectionFactory", mockConnections);
setField(testObj, "eventBus", mockBus);
setField(testJMSTopicPublisher, "eventFactory", mockEventFactory);
setField(testJMSTopicPublisher, "producer", mockProducer);
setField(testJMSTopicPublisher, "connectionFactory", mockConnections);
setField(testJMSTopicPublisher, "eventBus", mockBus);
}

@Test
public void testAcquireConnections() throws JMSException {
when(mockConnections.createConnection()).thenReturn(mockConn);
when(mockConn.createSession(false, AUTO_ACKNOWLEDGE))
.thenReturn(mockJmsSession);
testObj.acquireConnections();
testJMSTopicPublisher.acquireConnections();
verify(mockBus).register(any());
}

@Test
public void testPublishJCREvent() throws RepositoryException, IOException, JMSException {
final Message mockMsg = mock(Message.class);
final FedoraEvent mockEvent = mock(FedoraEvent.class);
when(mockEvents.getMessage(eq(mockEvent), any(javax.jms.Session.class))).thenReturn(mockMsg);
testObj.publishJCREvent(mockEvent);
when(mockEventFactory.getMessage(eq(mockEvent), any(javax.jms.Session.class))).thenReturn(mockMsg);
testJMSTopicPublisher.publishJCREvent(mockEvent);
verify(mockProducer).send(mockMsg);
}

@Test
public void testReleaseConnections() throws JMSException {
setField(testObj, "connection", mockConn);
setField(testObj, "jmsSession", mockJmsSession);
testObj.releaseConnections();
setField(testJMSTopicPublisher, "connection", mockConn);
setField(testJMSTopicPublisher, "jmsSession", mockJmsSession);
testJMSTopicPublisher.releaseConnections();
verify(mockProducer).close();
verify(mockJmsSession).close();
verify(mockConn).close();
verify(mockBus).unregister(testObj);
verify(mockBus).unregister(testJMSTopicPublisher);
}
}
4 changes: 2 additions & 2 deletions fcrepo-jms/src/test/resources/spring-test/repo.xml
Expand Up @@ -9,8 +9,8 @@

<!-- Context that supports the actual ModeShape JCR itself -->

<context:annotation-config/>

<context:component-scan base-package="org.fcrepo"/>
<bean name="modeshapeRepofactory"
class="org.fcrepo.kernel.impl.spring.ModeShapeRepositoryFactoryBean"
p:repositoryConfiguration="${fcrepo.modeshape.configuration:config/testing/repository.json}"/>
Expand Down

0 comments on commit 41e3fb8

Please sign in to comment.