Skip to content

Commit

Permalink
Add list of properties and a System-property-configurable baseURL to …
Browse files Browse the repository at this point in the history
…JMS events

- FedoraEvent now always returns the node path for getPath(), not sometimes a property's path

Resolves: https://www.pivotaltracker.com/story/show/72262790
  • Loading branch information
escowles authored and Andrew Woods committed Jun 3, 2014
1 parent 2501668 commit 372fa30
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 18 deletions.
Expand Up @@ -52,6 +52,22 @@ public class DefaultMessageFactory implements JMSEventMessageFactory {
public static final String EVENT_TYPE_HEADER_NAME = REPOSITORY_NAMESPACE
+ "eventType";

public static final String BASE_URL_HEADER_NAME = REPOSITORY_NAMESPACE
+ "baseURL";

public static final String PROPERTIES_HEADER_NAME = REPOSITORY_NAMESPACE
+ "properties";

private String baseURL;

/**
* @param baseURL indicating the repository server host/port/etc
*/
public DefaultMessageFactory(final String baseURL) {
this.baseURL = baseURL;
log.debug("MessageFactory baseURL: {}", baseURL);
}

@Override
public Message getMessage(final FedoraEvent jcrEvent,
final javax.jms.Session jmsSession) throws RepositoryException,
Expand All @@ -61,6 +77,10 @@ public Message getMessage(final FedoraEvent jcrEvent,
message.setStringProperty(IDENTIFIER_HEADER_NAME, jcrEvent.getPath());
message.setStringProperty(EVENT_TYPE_HEADER_NAME, getEventURIs( jcrEvent
.getTypes()));
message.setStringProperty(BASE_URL_HEADER_NAME, baseURL);
message.setStringProperty(PROPERTIES_HEADER_NAME, Joiner.on(',').join(jcrEvent.getProperties()));

log.trace("getMessage() returning: {}", message);
return message;
}

Expand Down
Expand Up @@ -21,10 +21,14 @@
import static javax.jcr.observation.Event.NODE_REMOVED;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.fcrepo.jcr.FedoraJcrTypes.FEDORA_OBJECT;
import static org.fcrepo.jms.headers.DefaultMessageFactory.BASE_URL_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.EVENT_TYPE_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.IDENTIFIER_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.PROPERTIES_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.TIMESTAMP_HEADER_NAME;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.slf4j.LoggerFactory.getLogger;

Expand Down Expand Up @@ -113,6 +117,10 @@ public void testIngestion() throws RepositoryException,
success = true;
}
}

final String baseUrl = messages.iterator().next().getStringProperty(BASE_URL_HEADER_NAME);
assertNotNull("BaseUrl should not be null!", baseUrl);
assertEquals("Defined in spring-test/headers-jms.xml", "http://localhost:8080/rest", baseUrl);
}
LOGGER.debug("Waiting for next message...");
wait(1000);
Expand Down Expand Up @@ -169,10 +177,9 @@ public void testRemoval() throws RepositoryException, InterruptedException,
@Override
public void onMessage(final Message message) {
try {
LOGGER.debug(
"Received JMS message: {} with identifier: {}, timestamp: {}, and event type: {}",
message.getJMSMessageID(), getIdentifier(message),
getTimestamp(message), getEventTypes(message));
LOGGER.debug( "Received JMS message: {} with identifier: {}, timestamp: {}, event type: {}, properties: {},"
+ " and baseURL: {}", message.getJMSMessageID(), getIdentifier(message), getTimestamp(message),
getEventTypes(message), getProperties(message), getBaseURL(message));
} catch (final JMSException e) {
propagate(e);
}
Expand Down Expand Up @@ -220,4 +227,12 @@ private static Long getTimestamp(final Message msg) throws JMSException {
return msg.getLongProperty(TIMESTAMP_HEADER_NAME);
}

private static String getBaseURL(final Message msg) throws JMSException {
return msg.getStringProperty(BASE_URL_HEADER_NAME);
}

private static String getProperties(final Message msg) throws JMSException {
return msg.getStringProperty(PROPERTIES_HEADER_NAME);
}

}
Expand Up @@ -17,8 +17,10 @@

import static java.util.Collections.singleton;
import static javax.jcr.observation.Event.NODE_ADDED;
import static org.fcrepo.jms.headers.DefaultMessageFactory.BASE_URL_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.EVENT_TYPE_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.IDENTIFIER_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.PROPERTIES_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.TIMESTAMP_HEADER_NAME;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -60,7 +62,7 @@ public void setUp() throws JMSException {
initMocks(this);
when(mockSession.createMessage()).thenReturn(
new ActiveMQObjectMessage());
testDefaultMessageFactory = new DefaultMessageFactory();
testDefaultMessageFactory = new DefaultMessageFactory("base-url");
}

@Test
Expand All @@ -74,6 +76,8 @@ public void testBuildMessage() throws RepositoryException, IOException,
final String testReturnType =
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_ADDED).toString();
when(mockEvent.getTypes()).thenReturn(testTypes);
final String prop = "test-property";
when(mockEvent.getProperties()).thenReturn(singleton(prop));
final Message testMessage =
testDefaultMessageFactory.getMessage(mockEvent, mockSession);
assertEquals("Got wrong date in message!", testDate, (Long) testMessage
Expand All @@ -82,6 +86,8 @@ public void testBuildMessage() throws RepositoryException, IOException,
.getStringProperty(IDENTIFIER_HEADER_NAME));
assertEquals("Got wrong type in message!", testReturnType, testMessage
.getStringProperty(EVENT_TYPE_HEADER_NAME));
assertEquals("Got wrong base-url in message", "base-url", testMessage.getStringProperty(BASE_URL_HEADER_NAME));
assertEquals("Got wrong property in message", prop, testMessage.getStringProperty(PROPERTIES_HEADER_NAME));
}

}
4 changes: 3 additions & 1 deletion fcrepo-jms/src/test/resources/spring-test/headers-jms.xml
Expand Up @@ -19,6 +19,8 @@
<amq:connectionFactory id="connectionFactory"
brokerURL="vm://localhost?broker.persistent=false&amp;broker.useJmx=false&amp;broker.enableStatistics=false"/>

<bean class="org.fcrepo.jms.headers.DefaultMessageFactory"/>
<bean class="org.fcrepo.jms.headers.DefaultMessageFactory">
<constructor-arg value="http://localhost:8080/rest"/>
</bean>

</beans>
Expand Up @@ -18,6 +18,9 @@
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Multimaps.index;
import static org.slf4j.LoggerFactory.getLogger;
import static javax.jcr.observation.Event.PROPERTY_ADDED;
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static javax.jcr.observation.Event.PROPERTY_REMOVED;

import java.util.Iterator;

Expand Down Expand Up @@ -80,13 +83,21 @@ public FedoraEvent next() {
// because if
// there was no event at all, there would appear no entry in our
// Multimap under this key
final FedoraEvent fedoraEvent = new FedoraEvent(nodeSpecificEvents.next());
while (nodeSpecificEvents.hasNext()) {
// add the type of the event in hand to the event we are
// building up to emit
// we could aggregate other information here if that seems
// useful
fedoraEvent.addType(nodeSpecificEvents.next().getType());
final Event firstEvent = nodeSpecificEvents.next();
final FedoraEvent fedoraEvent = new FedoraEvent(firstEvent);

try {
addProperty(fedoraEvent, firstEvent);
while (nodeSpecificEvents.hasNext()) {
// add the event type and property name to the event we are building up to emit
// we could aggregate other information here if that seems useful
final Event otherEvent = nodeSpecificEvents.next();
fedoraEvent.addType(otherEvent.getType());
addProperty(fedoraEvent, otherEvent);
}

} catch (Exception ex) {
log.warn("Danger: swallowing exception", ex);
}
return fedoraEvent;
}
Expand All @@ -96,6 +107,21 @@ public void remove() {
// the underlying Multimap is immutable anyway
throw new UnsupportedOperationException();
}

private void addProperty( final FedoraEvent fedoraEvent, final Event e ) {
try {
if ( e.getType() == PROPERTY_ADDED ||
e.getType() == PROPERTY_CHANGED ||
e.getType() == PROPERTY_REMOVED ) {
fedoraEvent.addProperty( e.getPath().substring(e.getPath().lastIndexOf("/") + 1) );

} else {
log.trace("Not adding non-event property: {}, {}", fedoraEvent, e);
}
} catch (final RepositoryException ex) {
throw propagate(ex);
}
}
};
}

Expand Down
Expand Up @@ -16,20 +16,27 @@
package org.fcrepo.kernel.impl.observer.eventmappings;

import static com.google.common.collect.Iterators.size;
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static org.jgroups.util.UUID.randomUUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;

import com.google.common.collect.Iterators;
import org.fcrepo.kernel.observer.FedoraEvent;
import org.fcrepo.kernel.utils.iterators.EventIterator;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.util.Iterator;

/**
* <p>AllNodeEventsOneEventTest class.</p>
*
Expand Down Expand Up @@ -74,6 +81,7 @@ public void setUp() throws RepositoryException {
when(mockEvent2.getPath()).thenReturn(TEST_PATH2);
when(mockEvent3.getIdentifier()).thenReturn(TEST_IDENTIFIER3);
when(mockEvent3.getPath()).thenReturn(TEST_PATH3);
when(mockEvent3.getType()).thenReturn(PROPERTY_CHANGED);
when(mockIterator.next()).thenReturn(mockEvent1, mockEvent2, mockEvent3);
when(mockIterator.hasNext()).thenReturn(true, true, true, false);
testInput = new EventIterator(mockIterator);
Expand All @@ -96,4 +104,35 @@ public void testBadEvent() throws RepositoryException {
when(mockEvent1.getIdentifier()).thenThrow(new RepositoryException("Expected."));
testMapping.apply(testInput);
}

@Test
public void testPropertyEvents() throws RepositoryException {
final Iterator<FedoraEvent> iterator = testMapping.apply(testInput);
assertNotNull(iterator);
assertTrue("Iterator is empty!", iterator.hasNext());

boolean found = false;
while (iterator.hasNext()) {
final FedoraEvent event = iterator.next();
if (TEST_IDENTIFIER3.equals(event.getIdentifier())) {
assertEquals("Expected one event property", 1, event.getProperties().size());
found = true;
}

}
assertTrue("Third mock event was not found!", found);
}

@Test
public void testError() throws RepositoryException {
when(mockEvent3.getPath()).thenThrow(new RepositoryException("expected"));

final Iterator<FedoraEvent> iterator = testMapping.apply(testInput);

// It is expected that no exception is propagated, but instead the event is skipped
// ...leaving 2 events
assertNotNull(iterator);
assertEquals("There should be 2 events!", 2, Iterators.size(iterator));
}

}
Expand Up @@ -20,6 +20,9 @@
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Sets.union;
import static java.util.Collections.singleton;
import static javax.jcr.observation.Event.PROPERTY_ADDED;
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static javax.jcr.observation.Event.PROPERTY_REMOVED;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -49,6 +52,7 @@ public class FedoraEvent {
private Event e;

private Set<Integer> eventTypes = new HashSet<>();
private Set<String> eventProperties = new HashSet<>();

/**
* Wrap a JCR Event with our FedoraEvent decorators
Expand Down Expand Up @@ -76,11 +80,34 @@ public FedoraEvent addType(final Integer type) {
return this;
}

/**
* @return the property names of the underlying JCR property {@linkEvent}s
**/
public Set<String> getProperties() {
return eventProperties;
}

/**
* Add a property name to this event
* @param property property name
* @return this object for continued use
**/
public FedoraEvent addProperty( final String property ) {
eventProperties.add(property);
return this;
}

/**
* @return the path of the underlying JCR {@link Event}s
*/
public String getPath() throws RepositoryException {
return e.getPath();
if (e.getType() == PROPERTY_ADDED ||
e.getType() == PROPERTY_CHANGED ||
e.getType() == PROPERTY_REMOVED) {
return e.getPath().substring(0, e.getPath().lastIndexOf("/"));
} else {
return e.getPath();
}
}

/**
Expand Down Expand Up @@ -129,7 +156,9 @@ public String toString() {
public String apply(final Integer type) {
return EventType.valueOf(type).getName();
}
}))).add("Path:", getPath()).add("Date: ", getDate()).add("Info:", getInfo()).toString();
}))).add("Event properties:",
Joiner.on(',').join(eventProperties)).add("Path:", getPath()).add("Date: ",
getDate()).add("Info:", getInfo()).toString();
} catch (final RepositoryException e) {
throw propagate(e);
}
Expand Down

0 comments on commit 372fa30

Please sign in to comment.