Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Improve event reporting facilities for node property changes
  • Loading branch information
ajs6f authored and Andrew Woods committed Mar 8, 2014
1 parent 3bf9967 commit 649a3dd
Show file tree
Hide file tree
Showing 36 changed files with 695 additions and 347 deletions.
3 changes: 3 additions & 0 deletions fcrepo-http-api/src/test/resources/spring-test/eventing.xml
Expand Up @@ -17,6 +17,9 @@
<!-- used by bean above to filter which events get put on the bus -->
<bean name="fedoraEventFilter" class="org.fcrepo.kernel.observer.DefaultFilter"/>

<!-- used by observer bean to map JCR events into Fedora events -->
<bean name="fedoraEventMapper" class="org.fcrepo.kernel.observer.eventmappings.AllNodeEventsOneEvent"/>

<!-- Fedora's lightweight internal event bus. Currently memory-resident.-->
<bean name="fedoraInternalEventBus" class="com.google.common.eventbus.EventBus"/>

Expand Down
7 changes: 7 additions & 0 deletions fcrepo-jms/pom.xml
Expand Up @@ -21,6 +21,13 @@
<artifactId>fcrepo-kernel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.fcrepo</groupId>
<artifactId>fcrepo-kernel</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.fcrepo</groupId>
<artifactId>fcrepo-http-commons</artifactId>
Expand Down
Expand Up @@ -17,20 +17,27 @@
package org.fcrepo.jms.headers;

import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
import java.util.Set;

import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jms.JMSException;
import javax.jms.Message;

import org.fcrepo.jms.observer.JMSEventMessageFactory;
import org.fcrepo.kernel.observer.FedoraEvent;
import org.fcrepo.kernel.utils.EventType;
import org.slf4j.Logger;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;

/**
* Generates JMS {@link Message}s composed entirely of headers, based entirely
* on information found in the JCR {@link Event} that triggers publication.
* on information found in the {@link FedoraEvent} that triggers publication.
*
* @author ajs6f
* @date Dec 2, 2013
Expand All @@ -47,19 +54,29 @@ public class DefaultMessageFactory implements JMSEventMessageFactory {
+ "eventType";

@Override
public Message getMessage(final Event jcrEvent,
public Message getMessage(final FedoraEvent jcrEvent,
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException {
final Message message = jmsSession.createMessage();
message.setLongProperty(TIMESTAMP_HEADER_NAME, jcrEvent.getDate());
message.setStringProperty(IDENTIFIER_HEADER_NAME, jcrEvent.getPath());
message.setStringProperty(EVENT_TYPE_HEADER_NAME, getEventURI(jcrEvent
.getType()));
message.setStringProperty(EVENT_TYPE_HEADER_NAME, getEventURIs( jcrEvent
.getTypes()));
return message;
}

private static String getEventURI(final int type) {
return REPOSITORY_NAMESPACE + EventType.valueOf(type);
private static String getEventURIs(final Set<Integer> types) {
final String uris = Joiner.on(',').join(Iterables.transform(types, new Function<Integer, String>() {

@Override
public String apply(final Integer type) {
return REPOSITORY_NAMESPACE + EventType.valueOf(type);
}
}));
log.debug("Constructed event type URIs: {}", uris);
return uris;
}

private static final Logger log = getLogger(DefaultMessageFactory.class);

}
Expand Up @@ -19,10 +19,11 @@
import java.io.IOException;

import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jms.JMSException;
import javax.jms.Message;

import org.fcrepo.kernel.observer.FedoraEvent;

/**
* Produce a JMS Message from a JCR Event
*/
Expand All @@ -31,7 +32,7 @@ public interface JMSEventMessageFactory {
/**
* Produce a JMS message from a JCR event with the
* given session
*
*
* @param jcrEvent
* @param jcrSession
* @param jmsSession
Expand All @@ -40,7 +41,7 @@ public interface JMSEventMessageFactory {
* @throws IOException
* @throws JMSException
*/
Message getMessage(final Event jcrEvent,
Message getMessage(final FedoraEvent jcrEvent,
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException;
}
Expand Up @@ -24,14 +24,14 @@
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.fcrepo.kernel.observer.FedoraEvent;
import org.slf4j.Logger;

import com.google.common.eventbus.EventBus;
Expand Down Expand Up @@ -70,7 +70,7 @@ public class JMSTopicPublisher {
* @throws IOException
*/
@Subscribe
public void publishJCREvent(final Event fedoraEvent) throws JMSException,
public void publishJCREvent(final FedoraEvent fedoraEvent) throws JMSException,
RepositoryException, IOException {
LOGGER.debug("Received an event from the internal bus.");
final Message tm =
Expand Down
Expand Up @@ -105,7 +105,7 @@ public void testIngestion() throws RepositoryException,
while ((currentTimeMillis() - start < TIMEOUT) && (!success)) {
for (final Message message : messages) {
if (getIdentifier(message).equals(pid)) {
if (getEventType(message).equals(expectedEventType)) {
if (getEventTypes(message).contains(expectedEventType)) {
success = true;
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testRemoval() throws RepositoryException, InterruptedException,
while ((currentTimeMillis() - start < TIMEOUT) && (!success)) {
for (final Message message : messages) {
if (getIdentifier(message).equals(pid)) {
if (getEventType(message).equals(expectedEventType)) {
if (getEventTypes(message).contains(expectedEventType)) {
success = true;
}
}
Expand All @@ -168,7 +168,7 @@ public void onMessage(final Message message) {
LOGGER.debug(
"Received JMS message: {} with identifier: {}, timestamp: {}, and event type: {}",
message.getJMSMessageID(), getIdentifier(message),
getTimestamp(message), getEventType(message));
getTimestamp(message), getEventTypes(message));
} catch (final JMSException e) {
propagate(e);
}
Expand Down Expand Up @@ -201,11 +201,15 @@ public void releaseConnection() throws JMSException {
}

private static String getIdentifier(final Message msg) throws JMSException {
return msg.getStringProperty(IDENTIFIER_HEADER_NAME);
final String id = msg.getStringProperty(IDENTIFIER_HEADER_NAME);
LOGGER.debug("Processing an event with identifier: {}", id);
return id;
}

private static String getEventType(final Message msg) throws JMSException {
return msg.getStringProperty(EVENT_TYPE_HEADER_NAME);
private static String getEventTypes(final Message msg) throws JMSException {
final String type = msg.getStringProperty(EVENT_TYPE_HEADER_NAME);
LOGGER.debug("Processing an event with type: {}", type);
return type;
}

private static Long getTimestamp(final Message msg) throws JMSException {
Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.fcrepo.jms.headers;

import static java.util.Collections.singleton;
import static javax.jcr.observation.Event.NODE_ADDED;
import static org.fcrepo.jms.headers.DefaultMessageFactory.EVENT_TYPE_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.IDENTIFIER_HEADER_NAME;
Expand All @@ -26,14 +27,15 @@
import static org.mockito.MockitoAnnotations.initMocks;

import java.io.IOException;
import java.util.Set;

import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.fcrepo.kernel.observer.FedoraEvent;
import org.fcrepo.kernel.utils.EventType;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -45,7 +47,7 @@ public class DefaultMessageFactoryTest {
private Session mockSession;

@Mock
private Event mockEvent;
private FedoraEvent mockEvent;

private DefaultMessageFactory testDefaultMessageFactory;

Expand All @@ -64,10 +66,10 @@ public void testBuildMessage() throws RepositoryException, IOException,
when(mockEvent.getDate()).thenReturn(testDate);
final String testPath = "super/calli/fragi/listic";
when(mockEvent.getPath()).thenReturn(testPath);
final Integer testType = NODE_ADDED;
final Set<Integer> testTypes = singleton(NODE_ADDED);
final String testReturnType =
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_ADDED).toString();
when(mockEvent.getType()).thenReturn(testType);
when(mockEvent.getTypes()).thenReturn(testTypes);
final Message testMessage =
testDefaultMessageFactory.getMessage(mockEvent, mockSession);
assertEquals("Got wrong date in message!", testDate, (Long) testMessage
Expand Down
Expand Up @@ -16,20 +16,25 @@

package org.fcrepo.jms.observer;

import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.fcrepo.kernel.utils.TestHelpers.setField;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import java.lang.reflect.Field;
import java.io.IOException;

import javax.jcr.observation.Event;
import javax.jcr.RepositoryException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.fcrepo.kernel.observer.FedoraEvent;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand All @@ -38,76 +43,61 @@

public class JMSTopicPublisherTest {

private JMSTopicPublisher testObj;

@Mock
private JMSEventMessageFactory mockEvents;

@Mock
JMSTopicPublisher testObj;
private MessageProducer mockProducer;

@Mock
JMSEventMessageFactory mockEvents;
private ActiveMQConnectionFactory mockConnections;

@Mock
MessageProducer mockProducer;
private EventBus mockBus;

@Mock
ActiveMQConnectionFactory mockConnections;
private javax.jms.Session mockJmsSession;

@Mock
EventBus mockBus;
private Connection mockConn;

@Before
public void setUp() throws Exception {
public void setUp() {
testObj = new JMSTopicPublisher();
mockEvents = mock(JMSEventMessageFactory.class);
mockProducer = mock(MessageProducer.class);
mockConnections = mock(ActiveMQConnectionFactory.class);
mockBus = mock(EventBus.class);
Field setField =
JMSTopicPublisher.class.getDeclaredField("eventFactory");
setField.setAccessible(true);
setField.set(testObj, mockEvents);
setField = JMSTopicPublisher.class.getDeclaredField("producer");
setField.setAccessible(true);
setField.set(testObj, mockProducer);
setField =
JMSTopicPublisher.class.getDeclaredField("connectionFactory");
setField.setAccessible(true);
setField.set(testObj, mockConnections);
setField = JMSTopicPublisher.class.getDeclaredField("eventBus");
setField.setAccessible(true);
setField.set(testObj, mockBus);

initMocks(this);
setField(testObj, "eventFactory", mockEvents);
setField(testObj, "producer", mockProducer);
setField(testObj, "connectionFactory", mockConnections);
setField(testObj, "eventBus", mockBus);
}

@Test
public void testAcquireConnections() throws Exception {
Connection mockConn = mock(Connection.class);
javax.jms.Session mockSession = mock(javax.jms.Session.class);
public void testAcquireConnections() throws JMSException {
when(mockConnections.createConnection()).thenReturn(mockConn);
when(mockConn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE))
.thenReturn(mockSession);
when(mockConn.createSession(false, AUTO_ACKNOWLEDGE))
.thenReturn(mockJmsSession);
testObj.acquireConnections();
verify(mockBus).register(any());
}

@Test
public void testPublishJCREvent() throws Exception {
Message mockMsg = mock(Message.class);
Event mockEvent = mock(Event.class);
when(
mockEvents.getMessage(eq(mockEvent),
any(javax.jms.Session.class))).thenReturn(mockMsg);
public void testPublishJCREvent() throws RepositoryException, IOException, JMSException {
final Message mockMsg = mock(Message.class);
final FedoraEvent mockEvent = mock(FedoraEvent.class);
when(mockEvents.getMessage(eq(mockEvent), any(javax.jms.Session.class))).thenReturn(mockMsg);
testObj.publishJCREvent(mockEvent);
}

@Test
public void testReleaseConnections() throws Exception {
Connection mockConn = mock(Connection.class);
javax.jms.Session mockJmsSession = mock(javax.jms.Session.class);
Field setField = JMSTopicPublisher.class.getDeclaredField("connection");
setField.setAccessible(true);
setField.set(testObj, mockConn);
setField = JMSTopicPublisher.class.getDeclaredField("jmsSession");
setField.setAccessible(true);
setField.set(testObj, mockJmsSession);
public void testReleaseConnections() throws JMSException {
setField(testObj, "connection", mockConn);
setField(testObj, "jmsSession", mockJmsSession);
testObj.releaseConnections();
verify(mockProducer).close();
verify(mockJmsSession).close();
verify(mockConn).close();
verify(mockBus).unregister(testObj);
}
}
5 changes: 4 additions & 1 deletion fcrepo-jms/src/test/resources/spring-test/eventing.xml
Expand Up @@ -16,7 +16,10 @@

<!-- used by bean above to filter which events get put on the bus -->
<bean name="fedoraEventFilter" class="org.fcrepo.kernel.observer.DefaultFilter"/>


<!-- used by observer bean to map JCR events into Fedora events -->
<bean name="fedoraEventMapper" class="org.fcrepo.kernel.observer.eventmappings.AllNodeEventsOneEvent"/>

<!-- Fedora's lightweight internal event bus. Currently memory-resident.-->
<bean name="fedoraInternalEventBus" class="com.google.common.eventbus.EventBus"/>

Expand Down

0 comments on commit 649a3dd

Please sign in to comment.