Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Update DefaultFilter to propagate events on Fedora Objects/Datastream and properties attached to them,
Update SimpleObserver to suppress duplicate events
  • Loading branch information
escowles authored and Andrew Woods committed Oct 4, 2013
1 parent 80a6b9c commit 1e55ee3
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 21 deletions.
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.StringWriter;

import javax.jcr.Item;
import javax.jcr.Node;
import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
Expand Down Expand Up @@ -52,7 +53,8 @@ public Message getMessage(final Event jcrEvent,
LOGGER.trace("Received an event to transform.");
final String path = jcrEvent.getPath();
LOGGER.trace("Retrieved path from event.");
final Node resource = jcrSession.getNode(path);
final Item item = jcrSession.getItem(path);
final Node resource = item.isNode() ? (Node)item : item.getParent();
LOGGER.trace("Retrieved node from event.");
final LegacyMethod legacy = new LegacyMethod(jcrEvent, resource);
final StringWriter writer = new StringWriter();
Expand Down
Expand Up @@ -58,7 +58,8 @@ public void testGetMessage() throws Exception {
when(mockType.getName()).thenReturn(FedoraJcrTypes.FEDORA_OBJECT);
NodeType[] mockTypes = new NodeType[] {mockType};
when(mockSource.getMixinNodeTypes()).thenReturn(mockTypes);
when(mockJCR.getNode(testPath)).thenReturn(mockSource);
when(mockSource.isNode()).thenReturn(true);
when(mockJCR.getItem(testPath)).thenReturn(mockSource);
testObj.getMessage(mockEvent, mockJCR, mockJMS);
verify(mockText).setStringProperty("methodName", "ingest");
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.jcr.Item;
import javax.jcr.Node;
import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
Expand All @@ -32,10 +33,13 @@

/**
* EventFilter that passes only events emitted from nodes with
* a Fedora JCR type.
* a Fedora JCR type, or properties attached to them.
*
* @author eddies
* @date Feb 7, 2013
*
* @author escowles
* @date Oct 3, 2013
*/
public class DefaultFilter implements EventFilter {

Expand All @@ -48,19 +52,17 @@ public class DefaultFilter implements EventFilter {

/**
* Filter observer events to only include events on a FedoraObject or
* Datastream
* Datastream, or properties of an FedoraObject or Datastream.
*
* @param event the original event
* @return
*/
@Override
public boolean apply(final Event event) {

try {
final Node resource = session.getNode(event.getPath());
return isFedoraObject.apply(resource) ||
isFedoraDatastream.apply(resource);

final Item item = session.getItem(event.getPath());
final Node n = item.isNode() ? (Node)item : item.getParent();
return isFedoraObject.apply(n) || isFedoraDatastream.apply(n);
} catch (final PathNotFoundException e) {
// not a node in the fedora workspace
return false;
Expand Down
Expand Up @@ -25,9 +25,15 @@
import static javax.jcr.observation.Event.PROPERTY_REMOVED;
import static org.fcrepo.metrics.RegistryService.getMetrics;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.base.Throwables.propagate;

import java.util.HashSet;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.jcr.Item;
import javax.jcr.Node;
import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.Event;
Expand All @@ -40,6 +46,7 @@
import com.codahale.metrics.Counter;
import com.google.common.eventbus.EventBus;


/**
* Simple JCR EventListener that filters JCR Events through a Fedora
* EventFilter and puts the resulting stream onto the internal
Expand Down Expand Up @@ -71,17 +78,18 @@ public class SimpleObserver implements EventListener {
@Inject
private EventFilter eventFilter;

private Session session;

/**
* Register this observer with the JCR event listeners
* @throws RepositoryException
*/
@PostConstruct
public void buildListener() throws RepositoryException {
final Session session = repository.login();
session = repository.login();
session.getWorkspace().getObservationManager().addEventListener(this,
EVENT_TYPES, "/", true, null, null, false);
session.save();
session.logout();
}

/**
Expand All @@ -91,13 +99,34 @@ public void buildListener() throws RepositoryException {
*/
@Override
public void onEvent(final javax.jcr.observation.EventIterator events) {
// keep track of nodes that trigger events to prevent duplicates
final Set<Node> posted = new HashSet<Node>();

// post non-duplicate events approved by the filter
for (final Event e : filter(new EventIterator(events), eventFilter)) {

EVENT_COUNTER.inc();

LOGGER.debug("Putting event: " + e.toString() + " on the bus.");
eventBus.post(new FedoraEvent(e));
try {
final Item item = session.getItem(e.getPath());
Node n = null;
if ( item.isNode() ) {
n = (Node)item;
} else {
n = item.getParent();
}
if ( n != null && !posted.contains(n) ) {
EVENT_COUNTER.inc();
LOGGER.debug("Putting event: " + e.toString()
+ " on the bus.");
eventBus.post(new FedoraEvent(e));
posted.add(n);
} else {
LOGGER.debug("Skipping: " + e);
}
} catch (final PathNotFoundException ex) {
// we can ignore these
LOGGER.trace("Not a node in the Fedora workspace: " + e);
} catch ( RepositoryException ex ) {
throw propagate(ex);
}
}
}

Expand Down
Expand Up @@ -80,7 +80,7 @@ public void shouldApplyToObject() throws Exception {
final Event mockEvent = mock(Event.class);
when(mockEvent.getPath()).thenReturn(testPath);
final Node mockNode = mock(Node.class);
when(mockSession.getNode(testPath)).thenReturn(mockNode);
when(mockSession.getItem(testPath)).thenReturn(mockNode);
assertTrue(testObj.apply(mockEvent));
} finally {
isFedoraDatastream = holdDS;
Expand All @@ -105,7 +105,7 @@ public void shouldApplyToDatastream() throws Exception {
final Event mockEvent = mock(Event.class);
when(mockEvent.getPath()).thenReturn(testPath);
final Node mockNode = mock(Node.class);
when(mockSession.getNode(testPath)).thenReturn(mockNode);
when(mockSession.getItem(testPath)).thenReturn(mockNode);
assertTrue(testObj.apply(mockEvent));
} finally {
isFedoraDatastream = holdDS;
Expand All @@ -120,10 +120,10 @@ public void shouldNotApplyToNonExistentNodes() throws Exception {
final String testPath = "/foo/bar";
final Event mockEvent = mock(Event.class);
when(mockEvent.getPath()).thenReturn(testPath);
when(mockSession.getNode(testPath)).thenThrow(
when(mockSession.getItem(testPath)).thenThrow(
PathNotFoundException.class);
assertEquals(false, testObj.apply(mockEvent));
verify(mockSession).getNode(testPath);
verify(mockSession).getItem(testPath);
}

@Test
Expand All @@ -140,7 +140,7 @@ public void shouldNotApplyToSystemNodes() throws Exception {
final Event mockEvent = mock(Event.class);
when(mockEvent.getPath()).thenReturn(testPath);
final Node mockNode = mock(Node.class);
when(mockSession.getNode(testPath)).thenReturn(mockNode);
when(mockSession.getItem(testPath)).thenReturn(mockNode);
assertEquals(false, testObj.apply(mockEvent));
} finally {
isFedoraDatastream = holdDS;
Expand Down
Expand Up @@ -31,6 +31,7 @@
import java.lang.reflect.Field;
import java.util.List;

import javax.jcr.Node;
import javax.jcr.Session;
import javax.jcr.Workspace;
import javax.jcr.observation.Event;
Expand Down Expand Up @@ -98,7 +99,12 @@ public void testBuildListener() throws Exception {
public void testOnEvent() throws Exception {
setField("eventBus", testObj, mockBus);
setField("eventFilter", testObj, mockFilter);
setField("session", testObj, mockSession);
final Event mockEvent = mock(Event.class);
when(mockEvent.getPath()).thenReturn("/foo/bar");
final Node mockNode = mock(Node.class);
when(mockNode.isNode()).thenReturn(true);
when(mockSession.getItem(any(String.class))).thenReturn(mockNode);
final EventIterator mockEvents = mock(EventIterator.class);
final List<Event> iterable = asList(new Event[] {mockEvent});
mockStatic(Iterables.class);
Expand Down

1 comment on commit 1e55ee3

@awoods
Copy link

@awoods awoods commented on 1e55ee3 Oct 11, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would appear that this commit provokes the new (somewhat consistent) Jenkins failure:
org.fcrepo.integration.jms.observer.AtomJMSIT

Please sign in to comment.