Skip to content

Commit

Permalink
Propagating Stream API out into event-handling code
Browse files Browse the repository at this point in the history
  • Loading branch information
ajs6f committed Feb 5, 2015
1 parent de328de commit d9ded23
Show file tree
Hide file tree
Showing 22 changed files with 211 additions and 243 deletions.
Expand Up @@ -15,8 +15,8 @@
*/
package org.fcrepo.kernel.impl.observer;

import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterables.transform;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toCollection;
import static org.fcrepo.kernel.FedoraJcrTypes.FEDORA_BINARY;
import static org.fcrepo.kernel.FedoraJcrTypes.FEDORA_NON_RDF_SOURCE_DESCRIPTION;
import static org.fcrepo.kernel.FedoraJcrTypes.FEDORA_CONTAINER;
Expand All @@ -29,16 +29,15 @@
import javax.jcr.nodetype.NodeType;
import javax.jcr.observation.Event;

import com.google.common.base.Function;
import com.google.common.base.Predicate;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.fcrepo.kernel.exception.RepositoryRuntimeException;
import org.fcrepo.kernel.observer.EventFilter;

import org.slf4j.Logger;

import java.util.Collection;
import java.util.List;
import java.util.HashSet;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* {@link EventFilter} that passes only events emitted from nodes with a Fedora
Expand Down Expand Up @@ -83,12 +82,11 @@ public Predicate<Event> getFilter(final Session session) {
}

@Override
public boolean apply(final Event event) {
public boolean test(final Event event) {
try {
final org.modeshape.jcr.api.observation.Event modeEvent = getJcr21Event(event);

final List<NodeType> nodeTypes = ImmutableList.copyOf(modeEvent.getMixinNodeTypes());
final Collection<String> mixinTypes = ImmutableSet.copyOf(transform(nodeTypes, nodetype2string));
final Collection<String> mixinTypes =
stream(modeEvent.getMixinNodeTypes()).map(nodetype2string).collect(toCollection(HashSet::new));
return mixinTypes.contains(FEDORA_RESOURCE)
|| mixinTypes.contains(FEDORA_BINARY)
|| mixinTypes.contains(FEDORA_NON_RDF_SOURCE_DESCRIPTION)
Expand All @@ -97,7 +95,7 @@ public boolean apply(final Event event) {
LOGGER.trace("Dropping event from outside our assigned workspace:\n", e);
return false;
} catch (final RepositoryException e) {
throw propagate(e);
throw new RepositoryRuntimeException(e);
}
}

Expand Down
Expand Up @@ -15,8 +15,10 @@
*/
package org.fcrepo.kernel.impl.observer;

import com.google.common.base.Function;
import java.util.function.Function;

import org.fcrepo.kernel.observer.FedoraEvent;

import org.slf4j.Logger;

import javax.jcr.NamespaceRegistry;
Expand All @@ -28,13 +30,14 @@

/**
* @author Andrew Woods
* @author ajs6f
* @since 11/22/14
*/
public class GetNamespacedProperties implements Function<FedoraEvent, FedoraEvent> {

private static final Logger LOGGER = getLogger(SimpleObserver.class);

private Session session;
private final Session session;

/**
* Constructor
Expand All @@ -50,24 +53,23 @@ public FedoraEvent apply(final FedoraEvent evt) {
final NamespaceRegistry namespaceRegistry = getNamespaceRegistry(session);

final FedoraEvent event = new FedoraEvent(evt);
for (String property : evt.getProperties()) {
final String[] parts = property.split(":", 2);
if (parts.length == 2) {
final String prefix = parts[0];
try {
event.addProperty(namespaceRegistry.getURI(prefix) + parts[1]);
} catch (RepositoryException ex) {
LOGGER.trace("Prefix could not be dereferenced using the namespace registry: {}", property);
event.addProperty(property);
}
} else {
event.addProperty(property);
}
}

for (Integer type : evt.getTypes()) {
event.addType(type);
}
evt.getProperties().stream().forEach(
property -> {
final String[] parts = property.split(":", 2);
if (parts.length == 2) {
final String prefix = parts[0];
try {
event.addProperty(namespaceRegistry.getURI(prefix) + parts[1]);
} catch (final RepositoryException ex) {
LOGGER.trace("Prefix could not be dereferenced using the namespace registry: {}",
property);
event.addProperty(property);
}
} else {
event.addProperty(property);
}
});
evt.getTypes().stream().forEach(t -> event.addType(t));
return event;
}

Expand Down
Expand Up @@ -15,10 +15,11 @@
*/
package org.fcrepo.kernel.impl.observer;

import java.util.function.Predicate;

import javax.jcr.Session;
import javax.jcr.observation.Event;

import com.google.common.base.Predicate;
import org.fcrepo.kernel.observer.EventFilter;

/**
Expand All @@ -38,7 +39,7 @@ public Predicate<Event> getFilter(final Session session) {
}

@Override
public boolean apply(final Event event) {
public boolean test(final Event event) {
return true;
}

Expand Down
Expand Up @@ -16,16 +16,15 @@
package org.fcrepo.kernel.impl.observer;

import static com.codahale.metrics.MetricRegistry.name;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterators.filter;
import static com.google.common.collect.Iterators.transform;
import static javax.jcr.observation.Event.NODE_ADDED;
import static javax.jcr.observation.Event.NODE_MOVED;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static javax.jcr.observation.Event.PROPERTY_ADDED;
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static javax.jcr.observation.Event.PROPERTY_REMOVED;
import static org.fcrepo.kernel.impl.utils.Streams.fromIterator;
import static org.slf4j.LoggerFactory.getLogger;

import org.fcrepo.metrics.RegistryService;

import java.util.Iterator;
Expand All @@ -38,9 +37,10 @@
import javax.jcr.observation.Event;
import javax.jcr.observation.EventListener;

import org.fcrepo.kernel.exception.RepositoryRuntimeException;
import org.fcrepo.kernel.observer.EventFilter;
import org.fcrepo.kernel.observer.FedoraEvent;
import org.fcrepo.kernel.observer.eventmappings.InternalExternalEventMapper;

import org.modeshape.jcr.api.Repository;
import org.slf4j.Logger;

Expand Down Expand Up @@ -112,7 +112,8 @@ public void stopListening() throws RepositoryException {
}

/**
* Filter JCR events and transform them into our own FedoraEvents.
* Filter JCR events, transform them into our own {@link FedoraEvent}s, and post them to the internal repository
* event bus.
*
* @param events
*/
Expand All @@ -121,19 +122,16 @@ public void onEvent(final javax.jcr.observation.EventIterator events) {
Session lookupSession = null;
try {
lookupSession = repository.login();

@SuppressWarnings("unchecked")
final Iterator<Event> filteredEvents = filter(events, eventFilter.getFilter(lookupSession));
final Iterator<FedoraEvent> publishableEvents = eventMapper.apply(filteredEvents);
final Iterator<FedoraEvent> namespacedEvents =
transform(publishableEvents, new GetNamespacedProperties(lookupSession));

while (namespacedEvents.hasNext()) {
eventBus.post(namespacedEvents.next());
final Iterator<Event> eventsIterator = events;
eventMapper.apply(fromIterator(eventsIterator).filter(eventFilter.getFilter(lookupSession))).map(
new GetNamespacedProperties(lookupSession)).forEach(ne -> {
LOGGER.warn("Posting to repository bus event: {}", ne);
eventBus.post(ne);
EVENT_COUNTER.inc();
}
});
} catch (final RepositoryException ex) {
throw propagate(ex);
throw new RepositoryRuntimeException(ex);
} finally {
if (lookupSession != null) {
lookupSession.logout();
Expand Down
Expand Up @@ -16,6 +16,7 @@
package org.fcrepo.kernel.impl.observer.eventmappings;

import static com.google.common.collect.Multimaps.index;
import static org.fcrepo.kernel.impl.utils.Streams.fromIterator;
import static org.slf4j.LoggerFactory.getLogger;
import static java.util.Arrays.asList;
import static javax.jcr.observation.Event.PROPERTY_ADDED;
Expand All @@ -24,6 +25,7 @@

import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;

import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
Expand Down Expand Up @@ -51,30 +53,28 @@ public class AllNodeEventsOneEvent implements InternalExternalEventMapper {
private static final List<Integer> PROPERTY_EVENT_TYPES = asList(PROPERTY_ADDED, PROPERTY_CHANGED,
PROPERTY_REMOVED);

private final static Logger log = getLogger(AllNodeEventsOneEvent.class);

/**
* Extracts the node identifier from a JCR {@link Event}.
*/
private static final Function<Event, String> EXTRACT_NODE_ID = new Function<Event, String>() {

@Override
public String apply(final Event ev) {
try {
final String id = ev.getIdentifier();
log.debug("Sorting an event by identifier: {}", id);
return id;
} catch (final RepositoryException e) {
throw new RepositoryRuntimeException(e);
}
private static final Function<Event, String> EXTRACT_NODE_ID = ev -> {
try {
final String id = ev.getIdentifier();
log.trace("Sorting an event by identifier: {}", id);
return id;
} catch (final RepositoryException e) {
throw new RepositoryRuntimeException(e);
}
};

@Override
public Iterator<FedoraEvent> apply(final Iterator<Event> events) {
public Stream<FedoraEvent> apply(final Stream<Event> events) {

return new Iterator<FedoraEvent>() {
final Iterator<FedoraEvent> iterator = new Iterator<FedoraEvent>() {

// sort JCR events into a Multimap keyed by the node ID involved
final Multimap<String, Event> sortedEvents = index(events, EXTRACT_NODE_ID);
final Multimap<String, Event> sortedEvents = index(events.iterator(), EXTRACT_NODE_ID);

final Iterator<String> nodeIds = sortedEvents.keySet().iterator();

Expand Down Expand Up @@ -104,12 +104,6 @@ public FedoraEvent next() {
return fedoraEvent;
}

@Override
public void remove() {
// the underlying Multimap is immutable anyway
throw new UnsupportedOperationException();
}

private void addProperty( final FedoraEvent fedoraEvent, final Event ev ) {
try {
if (PROPERTY_EVENT_TYPES.contains(ev.getType())) {
Expand All @@ -123,7 +117,7 @@ private void addProperty( final FedoraEvent fedoraEvent, final Event ev ) {
}
}
};
return fromIterator(iterator);
}

private final static Logger log = getLogger(AllNodeEventsOneEvent.class);
}
}
Expand Up @@ -19,16 +19,15 @@

package org.fcrepo.kernel.impl.observer.eventmappings;

import static com.google.common.collect.Iterators.transform;

import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Stream;

import javax.jcr.observation.Event;

import org.fcrepo.kernel.observer.FedoraEvent;
import com.google.common.base.Function;
import org.fcrepo.kernel.observer.eventmappings.InternalExternalEventMapper;


/**
* Maps each JCR {@link Event} to a single {@link FedoraEvent}
*
Expand All @@ -37,16 +36,10 @@
*/
public class OneToOne implements InternalExternalEventMapper {

private static final Function<Event, FedoraEvent> TO_FEDORA_EVENT = new Function<Event, FedoraEvent>() {

@Override
public FedoraEvent apply(final Event e) {
return new FedoraEvent(e);
}
};
private static final Function<Event, FedoraEvent> TO_FEDORA_EVENT = e -> new FedoraEvent(e);

@Override
public Iterator<FedoraEvent> apply(final Iterator<Event> jcrEvents) {
return transform(jcrEvents, TO_FEDORA_EVENT);
public Stream<FedoraEvent> apply(final Stream<Event> jcrEvents) {
return jcrEvents.map(TO_FEDORA_EVENT);
}
}
Expand Up @@ -15,31 +15,33 @@
*/
package org.fcrepo.kernel.impl.services.functions;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.propagate;
import static java.util.Objects.requireNonNull;

import java.util.function.Function;

import javax.jcr.Property;
import javax.jcr.RepositoryException;

import org.fcrepo.kernel.exception.RepositoryRuntimeException;

import org.modeshape.jcr.value.BinaryKey;
import org.modeshape.jcr.value.BinaryValue;

import com.google.common.base.Function;

/**
* Get the internal Modeshape BinaryKey for a binary property
*
* @author awoods
* @author ajs6f
*/
public class GetBinaryKey implements Function<Property, BinaryKey> {

@Override
public BinaryKey apply(final Property input) {
checkArgument(input != null, "null cannot have a Binarykey!");
requireNonNull(input, "null cannot have a Binarykey!");
try {
return ((BinaryValue) input.getBinary()).getKey();
} catch (final RepositoryException e) {
throw propagate(e);
throw new RepositoryRuntimeException(e);
}
}

Expand Down

0 comments on commit d9ded23

Please sign in to comment.