Navigation Menu

Skip to content

Commit

Permalink
Merge pull request #24 from futures/EventingSpring
Browse files Browse the repository at this point in the history
Eventing spring
  • Loading branch information
cbeer committed Jan 31, 2013
2 parents 55641d4 + 3e7f536 commit 18326dc
Show file tree
Hide file tree
Showing 12 changed files with 390 additions and 10 deletions.
21 changes: 14 additions & 7 deletions pom.xml
Expand Up @@ -16,6 +16,7 @@
<netbeans.hint.j2eeVersion>1.6</netbeans.hint.j2eeVersion>
<modeshape.version>3.1.0.Final</modeshape.version>
<cxf.version>2.7.2</cxf.version>
<activemq.version>5.7.0</activemq.version>
</properties>


Expand Down Expand Up @@ -73,11 +74,6 @@
<artifactId>freemarker</artifactId>
<version>2.3.19</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-collections</artifactId>
<version>r03</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
Expand Down Expand Up @@ -117,8 +113,19 @@
<version>${cxf.version}</version>
<scope>test</scope>
</dependency>



<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>${activemq.version}</version>
</dependency>


<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>13.0.1</version>
</dependency>
</dependencies>

<build>
Expand Down
68 changes: 68 additions & 0 deletions src/main/java/org/fcrepo/modeshape/observer/DefaultFilter.java
@@ -0,0 +1,68 @@
package org.fcrepo.modeshape.observer;

import java.util.Set;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.jcr.LoginException;
import javax.jcr.Node;
import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.nodetype.NodeType;
import javax.jcr.observation.Event;

import org.modeshape.common.SystemFailureException;
import org.modeshape.jcr.api.Repository;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;

public class DefaultFilter implements EventFilter {

@Inject
private Repository repository;

// it's safe to keep the session around, because this code does not mutate
// the state of the repository
private Session session;

@Override
public boolean apply(Event event) {

Predicate<NodeType> isFedoraNodeType = new Predicate<NodeType>() {
@Override
public boolean apply(NodeType type) {
return type.getName().startsWith("fedora:");
}
};

try {
Node node = null;
try {
node = session.getNode(event.getPath());
} catch (PathNotFoundException e) {
return false; // not a node in the fedora workspace
}
Set<NodeType> types = Sets.newHashSet(node.getMixinNodeTypes());
return Iterables.any(types, isFedoraNodeType);

} catch (LoginException e) {
throw new SystemFailureException(e);
} catch (RepositoryException e) {
throw new SystemFailureException(e);
}
}

@PostConstruct
public void acquireSession() throws LoginException, RepositoryException {
session = repository.login();
}

@PreDestroy
public void releaseSession() {
session.logout();
}
}
9 changes: 9 additions & 0 deletions src/main/java/org/fcrepo/modeshape/observer/EventFilter.java
@@ -0,0 +1,9 @@
package org.fcrepo.modeshape.observer;

import javax.jcr.observation.Event;

import com.google.common.base.Predicate;

public interface EventFilter extends Predicate<Event> {

}
10 changes: 10 additions & 0 deletions src/main/java/org/fcrepo/modeshape/observer/NOOPFilter.java
@@ -0,0 +1,10 @@
package org.fcrepo.modeshape.observer;

import javax.jcr.observation.Event;

public class NOOPFilter implements EventFilter {
@Override
public boolean apply(Event event) {
return true;
}
}
49 changes: 49 additions & 0 deletions src/main/java/org/fcrepo/modeshape/observer/SimpleObserver.java
@@ -0,0 +1,49 @@
package org.fcrepo.modeshape.observer;

import static com.google.common.collect.Collections2.filter;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;

import org.modeshape.jcr.api.Repository;

import com.google.common.collect.ImmutableList.Builder;
import com.google.common.eventbus.EventBus;

public class SimpleObserver implements EventListener {

final private Integer eventTypes = Event.NODE_ADDED + Event.NODE_REMOVED
+ Event.NODE_MOVED + Event.PROPERTY_ADDED + Event.PROPERTY_CHANGED
+ Event.PROPERTY_REMOVED;

@Inject
private Repository repository;

@Inject
private EventBus eventBus;

@Inject
private EventFilter eventFilter;

@PostConstruct
public void buildListener() throws RepositoryException {
repository
.login("fedora")
.getWorkspace()
.getObservationManager()
.addEventListener(this, eventTypes, "/", true, null, null,
false);
}

@Override
public void onEvent(EventIterator events) {
for (Event e : filter(new Builder<Event>().addAll(events).build(),
eventFilter))
eventBus.post(e);
}

}
147 changes: 147 additions & 0 deletions src/main/resources/spring/activemq.xml
@@ -0,0 +1,147 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>

<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

<!--
For better performances use VM cursor and small memory limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>


<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>

<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>


<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
If using ActiveMQ embedded - the following limits could safely be used:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>

<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
</transportConnectors>

<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>

</broker>

<!--
Enable web consoles, REST and Ajax APIs and demos
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
<import resource="jetty.xml"/>
-->
</beans>
27 changes: 27 additions & 0 deletions src/main/resources/spring/eventing.xml
@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<import resource="classpath:/spring/repo.xml"/>

<context:annotation-config/>

<bean class="org.fcrepo.modeshape.observer.SimpleObserver"/>

<bean name="fedoraEventFilter" class="org.fcrepo.modeshape.observer.DefaultFilter" />

<bean name="fedoraInternalEventBus" class="com.google.common.eventbus.EventBus"/>

<!--
<bean name="jmsBroker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:/spring/activemq.xml" />
<property name="start" value="true" />
</bean>
-->
</beans>
File renamed without changes.
Expand Up @@ -11,7 +11,7 @@
<import resource="classpath:META-INF/cxf/cxf.xml"/>
<import resource="classpath:META-INF/cxf/osgi/cxf-extension-osgi.xml"/>

<import resource="classpath:/repo.xml"/>
<import resource="classpath:/spring/repo.xml"/>

<context:annotation-config/>

Expand Down
2 changes: 1 addition & 1 deletion src/main/webapp/WEB-INF/web.xml
Expand Up @@ -9,7 +9,7 @@

<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>WEB-INF/classes/rest.xml</param-value>
<param-value>WEB-INF/classes/spring/*.xml</param-value>
</context-param>

<listener>
Expand Down

0 comments on commit 18326dc

Please sign in to comment.