Skip to content

Commit

Permalink
Getting baseURL for JMS messages from event user data field, which is…
Browse files Browse the repository at this point in the history
… set using by HTTP API from UriInfo baseURL

Resolves: https://www.pivotaltracker.com/story/show/72351918
  • Loading branch information
escowles authored and Andrew Woods committed Jul 29, 2014
1 parent 384c059 commit ae12d03
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 18 deletions.
31 changes: 31 additions & 0 deletions fcrepo-http-api/src/main/java/org/fcrepo/http/api/FedoraNodes.java
Expand Up @@ -69,6 +69,7 @@
import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.ObservationManager;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -140,6 +141,29 @@ public class FedoraNodes extends AbstractResource {
protected Session session;

private static final Logger LOGGER = getLogger(FedoraNodes.class);
private boolean baseURLSet = false;

This comment has been minimized.

Copy link
@escowles

escowles Aug 23, 2014

Author Contributor

I think baseURLSet needs to be static -- otherwise it's going to be initialized for each request and the observation manager will be loaded each time. I can reproduce @jcoyne's problem with about 2000 updates, and making baseURLSet fixes it.

@awoods: Reproducing this bug takes around 30 seconds on my machine. Should I go ahead and create an IT to test this anyway?

This comment has been minimized.

Copy link
@awoods

awoods Aug 24, 2014

@escowles, thanks for resolving this. We do not want to add an additional 30sec to each build. Let's add a SystemProperty that is required for the long-running test to be enabled, like:
https://github.com/fcrepo4/fcrepo4/blob/master/fcrepo-http-api/src/test/java/org/fcrepo/integration/http/api/FedoraCrudConcurrentIT.java#L45

I will then set up a CI configuration that runs the long-version of the build.


/**
* Set the baseURL for JMS events.
**/
private void init( final UriInfo uriInfo ) {
if ( !baseURLSet ) {
// set to true the first time this is run. if there is an exception the first time, there
// will likely be an exception every time. since this is run on each repository update,
// we should fail fast rather than retrying over and over.
baseURLSet = true;
try {
final URI baseURL = uriInfo.getBaseUri();
LOGGER.debug("FedoraNodes.init(): baseURL = " + baseURL.toString());
final ObservationManager obs = session.getWorkspace().getObservationManager();

This comment has been minimized.

Copy link
@cbeer

cbeer Aug 22, 2014

Contributor

@escowles, if I'm reading a stack trace right, I think this is leaking threads. After making 1000 API calls to createObject, I'm seeing 1000 modeshape event listener threads hanging around, e.g.:

java.lang.Thread.<init>(ThreadGroup, Runnable, String, long) Thread.java
org.modeshape.common.util.NamedThreadFactory.newThread(Runnable) NamedThreadFactory.java:42
java.util.concurrent.ThreadPoolExecutor.execute(Runnable) ThreadPoolExecutor.java:1371
org.modeshape.common.collection.ring.RingBuffer.addConsumer(Object, int) RingBuffer.java:248
org.modeshape.common.collection.ring.RingBuffer.addConsumer(Object) RingBuffer.java:220
org.modeshape.jcr.bus.RepositoryChangeBus.register(ChangeSetListener) RepositoryChangeBus.java:79
org.modeshape.jcr.JcrObservationManager.<init>(JcrSession, Observable, RepositoryStatistics) JcrObservationManager.java:174
org.modeshape.jcr.JcrWorkspace.observationManager() JcrWorkspace.java:620
org.modeshape.jcr.JcrWorkspace.getObservationManager() JcrWorkspace.java:612
org.fcrepo.http.api.FedoraNodes.init(UriInfo) FedoraNodes.java:158
org.fcrepo.http.api.FedoraNodes.createObject(List, String, String, String, MediaType, String, HttpServletResponse, UriInfo, InputStream) FedoraNodes.java:550
com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(Object, HttpContext) ResourceJavaMethodDispatcher.java:75
com.codahale.metrics.jersey.InstrumentedResourceMethodDispatchProvider$TimedRequestDispatcher.dispatch(Object, HttpContext) InstrumentedResourceMethodDispatchProvider.java:30
java.lang.Thread.run() Thread.java:744

Is there cleanup we should be doing?

final String json = "{\"baseURL\":\"" + baseURL.toString() + "\"}";
obs.setUserData(json);
LOGGER.trace("FedoraNodes.init(): done");
} catch ( Exception ex ) {
LOGGER.warn("Error setting baseURL", ex);
}
}
}

/**
* Retrieve the node headers
Expand Down Expand Up @@ -357,6 +381,7 @@ public Response updateSparql(@PathParam("path")
throws RepositoryException, IOException {
throwIfPathIncludesJcr(pathList, "PATCH");

init(uriInfo);
final String path = toPath(pathList);
LOGGER.debug("Attempting to update path: {}", path);

Expand Down Expand Up @@ -436,6 +461,7 @@ public Response createOrReplaceObjectRdf(
@Context final HttpServletResponse servletResponse) throws RepositoryException, ParseException,
IOException, InvalidChecksumException, URISyntaxException {
throwIfPathIncludesJcr(pathList, "PUT");
init(uriInfo);

final String path = toPath(pathList);
LOGGER.debug("Attempting to replace path: {}", path);
Expand Down Expand Up @@ -521,6 +547,7 @@ public Response createObject(@PathParam("path")
throws RepositoryException, ParseException, IOException,
InvalidChecksumException, URISyntaxException {
throwIfPathIncludesJcr(pathList, "POST");
init(uriInfo);

String pid;
final String newObjectPath;
Expand Down Expand Up @@ -700,6 +727,7 @@ public Response createObjectFromFormPost(
@FormDataParam("file") final InputStream file
) throws RepositoryException, URISyntaxException, InvalidChecksumException, ParseException, IOException {
throwIfPathIncludesJcr(pathList, "POST with multipart attachment");
init(uriInfo);

final MediaType effectiveContentType = file == null ? null : MediaType.APPLICATION_OCTET_STREAM_TYPE;
return createObject(pathList, mixin, null, null, effectiveContentType, slug, servletResponse, uriInfo, file);
Expand All @@ -719,6 +747,7 @@ public Response deleteObject(@PathParam("path")
final List<PathSegment> pathList,
@Context final Request request) throws RepositoryException {
throwIfPathIncludesJcr(pathList, "DELETE");
init(uriInfo);

try {

Expand Down Expand Up @@ -747,6 +776,7 @@ public Response copyObject(@PathParam("path") final List<PathSegment> path,
@HeaderParam("Destination") final String destinationUri)
throws RepositoryException, URISyntaxException {
throwIfPathIncludesJcr(path, "COPY");
init(uriInfo);

try {

Expand Down Expand Up @@ -794,6 +824,7 @@ public Response moveObject(@PathParam("path") final List<PathSegment> pathList,
@Context final Request request)
throws RepositoryException, URISyntaxException {
throwIfPathIncludesJcr(pathList, "MOVE");
init(uriInfo);

try {

Expand Down
7 changes: 6 additions & 1 deletion fcrepo-jms/pom.xml
Expand Up @@ -81,6 +81,11 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
<!-- axiom-api is a dependency of abdera-parser. It is only explicitly declared
here because abdera-parser depends on an older version of jaxen (1.1.1) than axiom-api.
If/when abdera-parser catches up, we can remove this explicit dependency on axiom-api -->
Expand Down Expand Up @@ -125,4 +130,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Expand Up @@ -33,6 +33,8 @@
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

/**
* Generates JMS {@link Message}s composed entirely of headers, based entirely
Expand Down Expand Up @@ -61,17 +63,28 @@ public class DefaultMessageFactory implements JMSEventMessageFactory {
private String baseURL;

/**
* @param baseURL indicating the repository server host/port/etc
* Set baseURL.
* @param event Fedora event object containing user data with baseURL specified.
*/
public DefaultMessageFactory(final String baseURL) {
this.baseURL = baseURL;
log.debug("MessageFactory baseURL: {}", baseURL);
private void setBaseURL(final FedoraEvent event) {
try {
final JsonObject json = new JsonParser().parse(event.getUserData()).getAsJsonObject();
this.baseURL = json.get("baseURL").getAsString();
log.debug("MessageFactory baseURL: {}", baseURL);
} catch ( Exception ex ) {
log.warn("Error setting baseURL", ex);
}
}

@Override
public Message getMessage(final FedoraEvent jcrEvent,
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException {

if ( baseURL == null ) {
setBaseURL(jcrEvent);
}

final Message message = jmsSession.createMessage();
message.setLongProperty(TIMESTAMP_HEADER_NAME, jcrEvent.getDate());
message.setStringProperty(IDENTIFIER_HEADER_NAME, jcrEvent.getPath());
Expand Down
Expand Up @@ -27,8 +27,6 @@
import static org.fcrepo.jms.headers.DefaultMessageFactory.PROPERTIES_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.TIMESTAMP_HEADER_NAME;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.slf4j.LoggerFactory.getLogger;

Expand Down Expand Up @@ -117,10 +115,6 @@ public void testIngestion() throws RepositoryException,
success = true;
}
}

final String baseUrl = messages.iterator().next().getStringProperty(BASE_URL_HEADER_NAME);
assertNotNull("BaseUrl should not be null!", baseUrl);
assertEquals("Defined in spring-test/headers-jms.xml", "http://localhost:8080/rest", baseUrl);
}
LOGGER.debug("Waiting for next message...");
wait(1000);
Expand Down
Expand Up @@ -62,14 +62,15 @@ public void setUp() throws JMSException {
initMocks(this);
when(mockSession.createMessage()).thenReturn(
new ActiveMQObjectMessage());
testDefaultMessageFactory = new DefaultMessageFactory("base-url");
testDefaultMessageFactory = new DefaultMessageFactory();
}

@Test
public void testBuildMessage() throws RepositoryException, IOException,
JMSException {
final Long testDate = 46647758568747L;
when(mockEvent.getDate()).thenReturn(testDate);
when(mockEvent.getUserData()).thenReturn("{\"baseURL\":\"base-url\"}");
final String testPath = "super/calli/fragi/listic";
when(mockEvent.getPath()).thenReturn(testPath);
final Set<Integer> testTypes = singleton(NODE_ADDED);
Expand Down
4 changes: 1 addition & 3 deletions fcrepo-jms/src/test/resources/spring-test/headers-jms.xml
Expand Up @@ -19,8 +19,6 @@
<amq:connectionFactory id="connectionFactory"
brokerURL="vm://localhost?broker.persistent=false&amp;broker.useJmx=false&amp;broker.enableStatistics=false"/>

<bean class="org.fcrepo.jms.headers.DefaultMessageFactory">
<constructor-arg value="http://localhost:8080/rest"/>
</bean>
<bean class="org.fcrepo.jms.headers.DefaultMessageFactory"/>

</beans>
4 changes: 1 addition & 3 deletions fcrepo-webapp/src/main/resources/spring/jms.xml
Expand Up @@ -19,8 +19,6 @@
p:config="classpath:/config/activemq.xml" p:start="true"/>

<!-- translates events into JMS header-only format-->
<bean class="org.fcrepo.jms.headers.DefaultMessageFactory">
<constructor-arg value="${jms.base-url:http://localhost:8080/rest}"/>
</bean>
<bean class="org.fcrepo.jms.headers.DefaultMessageFactory"/>

</beans>

0 comments on commit ae12d03

Please sign in to comment.