Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
FCREPO-1599
  • Loading branch information
ajs6f committed Jun 25, 2015
1 parent f13ca21 commit d09ce5a
Show file tree
Hide file tree
Showing 18 changed files with 131 additions and 207 deletions.
Expand Up @@ -15,10 +15,8 @@
*/
package org.fcrepo.kernel.impl.observer;

import static com.google.common.base.Functions.toStringFunction;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Collections2.transform;
import static com.google.common.collect.Sets.newHashSet;
import static java.util.Arrays.stream;
import static org.fcrepo.kernel.FedoraJcrTypes.FEDORA_BINARY;
import static org.fcrepo.kernel.FedoraJcrTypes.FEDORA_NON_RDF_SOURCE_DESCRIPTION;
import static org.fcrepo.kernel.FedoraJcrTypes.FEDORA_CONTAINER;
Expand All @@ -27,18 +25,16 @@

import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.nodetype.NodeType;
import javax.jcr.observation.Event;

import com.google.common.base.Predicate;

import org.fcrepo.kernel.exception.RepositoryRuntimeException;
import org.fcrepo.kernel.observer.EventFilter;

import org.slf4j.Logger;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.stream.Stream;

/**
* {@link EventFilter} that passes only events emitted from nodes with a Fedora
Expand All @@ -61,35 +57,24 @@ public class DefaultFilter implements EventFilter {
private static final HashSet<String> fedoraMixins =
newHashSet(FEDORA_BINARY, FEDORA_CONTAINER, FEDORA_NON_RDF_SOURCE_DESCRIPTION, FEDORA_RESOURCE);

/**
* Default constructor.
*/
public DefaultFilter() {
}

@Override
public Predicate<Event> getFilter(final Session session) {
return new DefaultFilter();
}

@Override
public boolean apply(final Event event) {
public boolean test(final Event event) {
try {
return !Collections.disjoint(getMixinTypes(event), fedoraMixins);
return getMixinTypes(event).anyMatch(fedoraMixins::contains);
} catch (final PathNotFoundException e) {
LOGGER.trace("Dropping event from outside our assigned workspace:\n", e);
return false;
} catch (final RepositoryException e) {
throw propagate(e);
throw new RepositoryRuntimeException(e);
}
}

protected static Collection<String> getMixinTypes(final Event event)
protected static Stream<String> getMixinTypes(final Event event)
throws PathNotFoundException, RepositoryException {
try {
final org.modeshape.jcr.api.observation.Event modeEvent =
(org.modeshape.jcr.api.observation.Event) event;
return transform(Arrays.asList(modeEvent.getMixinNodeTypes()), toStringFunction());
return stream(modeEvent.getMixinNodeTypes()).map(NodeType::toString);
} catch (final ClassCastException e) {
throw new ClassCastException(event + " is not a Modeshape Event");
}
Expand Down
Expand Up @@ -15,8 +15,10 @@
*/
package org.fcrepo.kernel.impl.observer;

import com.google.common.base.Function;
import java.util.function.Function;

import org.fcrepo.kernel.observer.FedoraEvent;

import org.slf4j.Logger;

import javax.jcr.NamespaceRegistry;
Expand All @@ -31,13 +33,14 @@

/**
* @author Andrew Woods
* @author ajs6f
* @since 11/22/14
*/
public class GetNamespacedProperties implements Function<FedoraEvent, FedoraEvent> {

private static final Logger LOGGER = getLogger(SimpleObserver.class);

private Session session;
private final Session session;

/**
* Constructor
Expand All @@ -53,7 +56,7 @@ public FedoraEvent apply(final FedoraEvent evt) {
final NamespaceRegistry namespaceRegistry = getNamespaceRegistry(session);

final FedoraEvent event = new FedoraEvent(evt);
for (String property : evt.getProperties()) {
for (final String property : evt.getProperties()) {
final String[] parts = property.split(":", 2);
if (parts.length == 2) {
final String prefix = parts[0];
Expand All @@ -66,7 +69,7 @@ public FedoraEvent apply(final FedoraEvent evt) {
} else {
try {
event.addProperty(namespaceRegistry.getURI(prefix) + parts[1]);
} catch (RepositoryException ex) {
} catch (final RepositoryException ex) {
LOGGER.debug("Prefix could not be dereferenced using the namespace registry: {}", property);
event.addProperty(property);
}
Expand All @@ -76,7 +79,7 @@ public FedoraEvent apply(final FedoraEvent evt) {
}
}

for (Integer type : evt.getTypes()) {
for (final Integer type : evt.getTypes()) {
event.addType(type);
}
return event;
Expand Down
Expand Up @@ -15,10 +15,8 @@
*/
package org.fcrepo.kernel.impl.observer;

import javax.jcr.Session;
import javax.jcr.observation.Event;

import com.google.common.base.Predicate;
import org.fcrepo.kernel.observer.EventFilter;

/**
Expand All @@ -33,12 +31,7 @@
public class NOOPFilter implements EventFilter {

@Override
public Predicate<Event> getFilter(final Session session) {
return this;
}

@Override
public boolean apply(final Event event) {
public boolean test(final Event event) {
return true;
}

Expand Down
Expand Up @@ -16,7 +16,6 @@
package org.fcrepo.kernel.impl.observer;

import static com.codahale.metrics.MetricRegistry.name;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterators.filter;
import static com.google.common.collect.Iterators.transform;
import static javax.jcr.observation.Event.NODE_ADDED;
Expand All @@ -26,10 +25,10 @@
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static javax.jcr.observation.Event.PROPERTY_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;

import org.fcrepo.metrics.RegistryService;

import java.util.Iterator;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
Expand All @@ -38,19 +37,20 @@
import javax.jcr.observation.Event;
import javax.jcr.observation.EventListener;

import org.fcrepo.kernel.exception.RepositoryRuntimeException;
import org.fcrepo.kernel.observer.EventFilter;
import org.fcrepo.kernel.observer.FedoraEvent;
import org.fcrepo.kernel.observer.eventmappings.InternalExternalEventMapper;

import org.modeshape.jcr.api.Repository;
import org.slf4j.Logger;

import com.codahale.metrics.Counter;
import com.google.common.eventbus.EventBus;

/**
* Simple JCR EventListener that filters JCR Events through a Fedora EventFilter
* and puts the resulting stream onto the internal Fedora EventBus as a stream
* of FedoraEvents.
* Simple JCR EventListener that filters JCR Events through a Fedora EventFilter, maps the results through a mapper,
* and puts the resulting stream onto the internal Fedora EventBus as a stream of FedoraEvents.
*
* @author eddies
* @author ajs6f
Expand Down Expand Up @@ -106,9 +106,12 @@ public void buildListener() throws RepositoryException {
*/
@PreDestroy
public void stopListening() throws RepositoryException {
LOGGER.debug("Destroying an observer for JCR events...");
session.getWorkspace().getObservationManager().removeEventListener(this);
session.logout();
try {
LOGGER.debug("Destroying an observer for JCR events...");
session.getWorkspace().getObservationManager().removeEventListener(this);
} finally {
session.logout();
}
}

/**
Expand All @@ -121,23 +124,22 @@ public void onEvent(final javax.jcr.observation.EventIterator events) {
Session lookupSession = null;
try {
lookupSession = repository.login();

@SuppressWarnings("unchecked")
final Iterator<Event> filteredEvents = filter(events, eventFilter.getFilter(lookupSession));
final Iterator<Event> filteredEvents = filter(events, eventFilter::test);
final Iterator<FedoraEvent> publishableEvents = eventMapper.apply(filteredEvents);
final Iterator<FedoraEvent> namespacedEvents =
transform(publishableEvents, new GetNamespacedProperties(lookupSession));

while (namespacedEvents.hasNext()) {
eventBus.post(namespacedEvents.next());
EVENT_COUNTER.inc();
}
transform(publishableEvents, new GetNamespacedProperties(lookupSession)::apply)
.forEachRemaining(this::post);
} catch (final RepositoryException ex) {
throw propagate(ex);
throw new RepositoryRuntimeException(ex);
} finally {
if (lookupSession != null) {
lookupSession.logout();
}
}
}

private void post(final FedoraEvent evt) {
eventBus.post(evt);
EVENT_COUNTER.inc();
}
}
Expand Up @@ -19,49 +19,40 @@

import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.Event;

import com.google.common.base.Predicate;

import org.fcrepo.kernel.exception.RepositoryRuntimeException;
import org.fcrepo.kernel.observer.EventFilter;

import org.slf4j.Logger;

import java.util.Collections;
import java.util.Set;

/**
* {@link EventFilter} that extends {@link DefaultFilter} to also suppress events
* emitted by nodes with a provided set of mixins.
* Suppresses events emitted by nodes with a provided set of mixins.
*
* @author escowles
* @author ajs6f
* @since 2015-04-15
*/
public class SuppressByMixinFilter extends DefaultFilter implements EventFilter {
public class SuppressByMixinFilter extends DefaultFilter {

private static final Logger LOGGER = getLogger(SuppressByMixinFilter.class);
private Set<String> suppressedMixins;
private final Set<String> suppressedMixins;

/**
* @param suppressedMixins Resources with these mixins will be filtered out
*/
public SuppressByMixinFilter(final Set<String> suppressedMixins) {
this.suppressedMixins = suppressedMixins;
for (String mixin : suppressedMixins) {
for (final String mixin : suppressedMixins) {
LOGGER.info("Suppressing events for nodes with mixin: {}", mixin);
}
}

@Override
public Predicate<Event> getFilter(final Session session) {
return this;
}

@Override
public boolean apply(final Event event) {
public boolean test(final Event event) {
try {
return super.apply(event) && Collections.disjoint(getMixinTypes(event), suppressedMixins);
return super.test(event) && !getMixinTypes(event).anyMatch(suppressedMixins::contains);
} catch (final PathNotFoundException e) {
LOGGER.trace("Dropping event from outside our assigned workspace:\n", e);
return false;
Expand Down

0 comments on commit d09ce5a

Please sign in to comment.