Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Factoring new strategies into observer types
  • Loading branch information
ajs6f committed Mar 6, 2014
1 parent bfeea04 commit d047bcd
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 183 deletions.
Expand Up @@ -18,21 +18,21 @@
import javax.jcr.Session;
import javax.jcr.observation.Event;

import com.google.common.base.Function;
import com.google.common.base.Predicate;

/**
* Filter JCR events to remove extraneous events
* @author eddies
* @author ajs6f
* @date Feb 7, 2013
*/
public interface EventFilter extends Function<Event, Event> {
public interface EventFilter extends Predicate<Event> {

/**
* Return a function to wrap Events, or transform to null if they should be
* skipped
* Return a {@link Predicate} with which to filter JCR {@link Event}s.
*
* @param session
* @return
*/
Function<Event, Event> getFilter(final Session session);
Predicate<Event> getFilter(final Session session);
}
Expand Up @@ -31,24 +31,29 @@
*/
public class EventIterator extends ForwardingIterator<Event> implements Iterable<Event> {

private javax.jcr.observation.EventIterator i;
private Iterator<Event> i;

/**
* Wrap the given EventIterator with the generic Iterator<Event>
*
* @param i
*/
@SuppressWarnings("unchecked")
public EventIterator(final javax.jcr.observation.EventIterator i) {
super();
this.i = i;
}

public EventIterator(final Iterator<Event> i) {
super();
this.i = i;
}

@Override
public Iterator<Event> iterator() {
return this;
}

@SuppressWarnings("unchecked")
@Override
protected Iterator<Event> delegate() {
return i;
Expand Down
Expand Up @@ -13,17 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.observer;

import static com.google.common.base.Throwables.propagate;
import static javax.jcr.observation.Event.NODE_ADDED;
import static javax.jcr.observation.Event.NODE_MOVED;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static javax.jcr.observation.Event.PROPERTY_ADDED;
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static javax.jcr.observation.Event.PROPERTY_REMOVED;
import static org.fcrepo.jcr.FedoraJcrTypes.FEDORA_DATASTREAM;
import static org.fcrepo.jcr.FedoraJcrTypes.FEDORA_OBJECT;
import static org.fcrepo.kernel.utils.EventType.valueOf;
import static org.fcrepo.kernel.utils.FedoraTypesUtils.isFedoraObject;
import static org.fcrepo.kernel.utils.FedoraTypesUtils.isFedoraDatastream;
import static org.slf4j.LoggerFactory.getLogger;
Expand All @@ -34,20 +28,21 @@
import javax.jcr.Session;
import javax.jcr.observation.Event;

import com.google.common.base.Function;
import com.google.common.base.Predicate;

import org.slf4j.Logger;

/**
* EventFilter that passes only events emitted from nodes with
* a Fedora JCR type, or properties attached to them.
* {@link EventFilter} that passes only events emitted from nodes with a Fedora
* JCR type, or properties attached to them, except in the case of a node
* removal. In that case, since we cannot test the node for its types, we assume
* that any non-JCR namespaced node is fair game.
*
* @author ajs6f
* @author barmintor
* @date Dec 2013
*
* @author eddies
* @date Feb 7, 2013
*
* @author escowles
* @date Oct 3, 2013
*/
Expand All @@ -71,49 +66,37 @@ private DefaultFilter(final Session session) {
}

@Override
public Function<Event, Event> getFilter(final Session session) {
public Predicate<Event> getFilter(final Session session) {
return new DefaultFilter(session);
}

@Override
public Event apply(final Event event) {
public boolean apply(final Event event) {
try {
String nPath = event.getPath();
final int nType = event.getType();
switch (nType) {
case NODE_ADDED:
break;
switch (valueOf(event.getType())) {
case NODE_REMOVED:
return event;
case PROPERTY_ADDED:
nPath = nPath.substring(0, nPath.lastIndexOf('/'));
break;
case PROPERTY_REMOVED:
nPath = nPath.substring(0, nPath.lastIndexOf('/'));
break;
case PROPERTY_CHANGED:
nPath = nPath.substring(0, nPath.lastIndexOf('/'));
break;
case NODE_MOVED:
final String path = event.getPath();
// only propagate non-jcr node removals, a simple test, but
// we cannot use the predicates we use below in the absence
// of a node to test
if (!path.startsWith("jcr:", path.lastIndexOf('/') + 1)) {
return true;
}
break;
default:
return null;
}

final Node n = session.getNode(nPath);
if (isFedoraObject.apply(n)) {
return new FedoraEvent(event, FEDORA_OBJECT);
}
if (isFedoraDatastream.apply(n)) {
return new FedoraEvent(event, FEDORA_DATASTREAM);
final String nodeId = event.getIdentifier();
final Node n = session.getNodeByIdentifier(nodeId);
if (isFedoraObject.apply(n) || isFedoraDatastream.apply(n)) {
return true;
}
}
} catch (final PathNotFoundException e) {
LOGGER.trace("Dropping event from outside our assigned workspace", e);
return null;
LOGGER.trace("Dropping event from outside our assigned workspace:\n", e);
return false;
} catch (final RepositoryException e) {
throw propagate(e);
}
return null;
return false;
}

}
Expand Up @@ -13,34 +13,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.observer;

import javax.jcr.Session;
import javax.jcr.observation.Event;

import com.google.common.base.Function;
import com.google.common.base.Predicate;

/**
* Simple EventFilter that does no filtering.
* Simple {@link EventFilter} that does no filtering.
*
* @author eddies
* @date Feb 7, 2013
*
* @author ajs6f
* @author barmintor
* @date Dec 2013
*
*/
public class NOOPFilter implements EventFilter {

@Override
public Function<Event, Event> getFilter(final Session session) {
public Predicate<Event> getFilter(final Session session) {
return this;
}

@Override
public Event apply(final Event event) {
return event;
public boolean apply(final Event event) {
return true;
}

}
Expand Up @@ -13,11 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.observer;

import static com.codahale.metrics.MetricRegistry.name;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Iterators.filter;
import static javax.jcr.observation.Event.NODE_ADDED;
import static javax.jcr.observation.Event.NODE_MOVED;
import static javax.jcr.observation.Event.NODE_REMOVED;
Expand All @@ -27,8 +28,7 @@
import static org.fcrepo.metrics.RegistryService.getMetrics;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -38,18 +38,18 @@
import javax.jcr.observation.Event;
import javax.jcr.observation.EventListener;

import org.fcrepo.kernel.observer.eventmappings.InternalExternalEventMapper;
import org.fcrepo.kernel.utils.iterators.EventIterator;
import org.modeshape.jcr.api.Repository;
import org.slf4j.Logger;

import com.codahale.metrics.Counter;
import com.google.common.eventbus.EventBus;


/**
* Simple JCR EventListener that filters JCR Events through a Fedora
* EventFilter and puts the resulting stream onto the internal
* Fedora EventBus as a stream of FedoraEvents.
* Simple JCR EventListener that filters JCR Events through a Fedora EventFilter
* and puts the resulting stream onto the internal Fedora EventBus as a stream
* of FedoraEvents.
*
* @author eddies
* @author ajs6f
Expand All @@ -64,39 +64,47 @@ public class SimpleObserver implements EventListener {
*/
static final Counter EVENT_COUNTER = getMetrics().counter(name(SimpleObserver.class, "onEvent"));

static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED +
PROPERTY_ADDED + PROPERTY_CHANGED + PROPERTY_REMOVED;
static final Integer EVENT_TYPES = NODE_ADDED + NODE_REMOVED + NODE_MOVED + PROPERTY_ADDED + PROPERTY_CHANGED
+ PROPERTY_REMOVED;

@Inject
private Repository repository;

@Inject
private EventBus eventBus;

@Inject
private InternalExternalEventMapper eventMapper;

@Inject
private EventFilter eventFilter;

// THIS SESSION SHOULD NOT BE USED TO LOOK UP NODES
// it is used only to register and deregister this observer to the JCR
private Session session;

/**
* Register this observer with the JCR event listeners
*
* @throws RepositoryException
*/
@PostConstruct
public void buildListener() throws RepositoryException {
LOGGER.debug("Constructing an observer for JCR events...");
session = repository.login();
session.getWorkspace().getObservationManager().addEventListener(this,
EVENT_TYPES, "/", true, null, null, false);
session.getWorkspace().getObservationManager()
.addEventListener(this, EVENT_TYPES, "/", true, null, null, false);
session.save();
}

/**
* logout of the session
*
* @throws RepositoryException
*/
@PreDestroy
public void stopListening() throws RepositoryException {
LOGGER.debug("Destroying an observer for JCR events...");
session.getWorkspace().getObservationManager().removeEventListener(this);
session.logout();
}
Expand All @@ -108,52 +116,22 @@ public void stopListening() throws RepositoryException {
*/
@Override
public void onEvent(final javax.jcr.observation.EventIterator events) {
// keep track of nodes that trigger events to prevent duplicates
// size to minimize resizing.
final Set<String> posted = new HashSet<>((int)events.getSize() * 2 / 3);

Session lookupSession = null;
try {
// post non-duplicate events approved by the filter
lookupSession = repository.login();
for (final Event e : transform(new EventIterator(events), eventFilter.getFilter(lookupSession))) {
if (e != null) {
final String nodePath;
final String eventPath = e.getPath();
final int nType = e.getType();
// is jump table faster than two bitwise comparisons?
switch(nType) {
case NODE_ADDED:
case NODE_REMOVED:
case NODE_MOVED:
nodePath = eventPath;
break;
case PROPERTY_ADDED:
case PROPERTY_REMOVED:
case PROPERTY_CHANGED:
nodePath = eventPath.substring(0, eventPath.lastIndexOf('/'));
break;
default:
nodePath = null;
break;
}
if ( nodePath != null && !posted.contains(nodePath) ) {
EVENT_COUNTER.inc();
LOGGER.debug("Putting event: {} ({}) on the bus", nodePath, nType);
eventBus.post(e);
posted.add(nodePath);
} else {
LOGGER.debug("Skipping event: {} ({}) on the bus", nodePath, nType);
}
}
final Iterator<Event> filteredEvents =
filter(new EventIterator(events), eventFilter.getFilter(lookupSession));
final Iterator<FedoraEvent> publishableEvents = eventMapper.apply(new EventIterator(filteredEvents));
while (publishableEvents.hasNext()) {
eventBus.post(publishableEvents.next());
EVENT_COUNTER.inc();
}
} catch ( final RepositoryException ex ) {
} catch (final RepositoryException ex) {
throw propagate(ex);
} finally {
if (lookupSession != null) {
lookupSession.logout();
}
}
}

}

0 comments on commit d047bcd

Please sign in to comment.