Skip to content

Commit

Permalink
Add transaction support following the Camel Transactional Client
Browse files Browse the repository at this point in the history
- Pattern described here: http://camel.apache.org/transactional-client.html

Resolves: #59
  • Loading branch information
acoburn authored and Andrew Woods committed Mar 8, 2015
1 parent 70c2545 commit 8183293
Show file tree
Hide file tree
Showing 18 changed files with 1,253 additions and 53 deletions.
36 changes: 36 additions & 0 deletions README.md
Expand Up @@ -199,13 +199,49 @@ You can get the HTTP response code from the `fcrepo` component by getting
the value from the Out message header with `Exchange.HTTP_RESPONSE_CODE`.


Transactions
------------

The `fcrepo-camel` component follows the [Transactional Client](http://camel.apache.org/transactional-client.html)
pattern when using transactions with a Fedora Repository. A route can begin using transactions by simply
identifying the route as `transacted()` like so:

from("direct:foo")
.transacted()
.to("fcrepo:localhost:8080/rest")
.process(new MyProcessor())
.to("fcrepo:localhost:8080/rest")
.process(new MyOtherProcessor())
.to("fcrepo:localhost:8080/rest");

A single transaction can span multiple routes so long as the transaction is run within a single thread.
That is, if the `direct` endpoint is used, a transacted workflow may be divided among multiple routes
(do not use `seda` or `vm`).

In order to enable a transactional client, a `TransactionManager` must be added to the Spring configuration:
for this to work, the built-in `FcrepoTransactionManager` needs to know the `baseUrl` of the underlying
repository. Authentication information, if necessary, can also be added in the bean configuration.

<bean id="fcrepoTxManager" class="org.fcrepo.camel.FcrepoTransactionManager">
<property name="baseUrl" value="http://localhost:8080/rest"/>
</bean>

<bean id="fcrepo" class="org.fcrepo.camel.FcrepoComponent">
<property name="transactionManager" ref="fcrepoTxManager"/>
</bean>

Like with other transactional clients, if an error is encountered anywhere in the route, all transacted
operations will be rolled back.


Building the component
----------------------

The `fcrepo-camel` compnent can be built with Maven:

mvn clean install


Fcrepo messaging
----------------

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -207,6 +207,12 @@
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>

<!-- logging -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
23 changes: 19 additions & 4 deletions src/main/java/org/fcrepo/camel/FcrepoClient.java
Expand Up @@ -57,6 +57,8 @@ public class FcrepoClient {

private static final String CONTENT_TYPE = "Content-Type";

private static final String LOCATION = "Location";

private CloseableHttpClient httpclient;

private Boolean throwExceptionOnFailure = true;
Expand Down Expand Up @@ -288,10 +290,11 @@ private HttpResponse executeRequest(final HttpRequestBase request) throws Fcrepo
private FcrepoResponse fcrepoGenericResponse(final URI url, final HttpResponse response,
final Boolean throwExceptionOnFailure) throws FcrepoOperationFailedException {
final int status = response.getStatusLine().getStatusCode();
final URI locationHeader = getLocationHeader(response);
final String contentTypeHeader = getContentTypeHeader(response);

if ((status >= HttpStatus.SC_OK && status < HttpStatus.SC_BAD_REQUEST) || !throwExceptionOnFailure) {
return new FcrepoResponse(url, status, contentTypeHeader, null, getEntityContent(response));
return new FcrepoResponse(url, status, contentTypeHeader, locationHeader, getEntityContent(response));
} else {
throw new FcrepoOperationFailedException(url, status,
response.getStatusLine().getReasonPhrase());
Expand All @@ -316,13 +319,25 @@ private static InputStream getEntityContent(final HttpResponse response) {
}
}

/**
* Extract the location header value
*/
private static URI getLocationHeader(final HttpResponse response) {
final Header location = response.getFirstHeader(LOCATION);
if (location != null) {
return URI.create(location.getValue());
} else {
return null;
}
}

/**
* Extract the content-type header value
*/
private static String getContentTypeHeader(final HttpResponse response) {
final Header[] contentTypes = response.getHeaders(CONTENT_TYPE);
if (contentTypes.length > 0) {
return contentTypes[0].getValue();
final Header contentType = response.getFirstHeader(CONTENT_TYPE);
if (contentType != null) {
return contentType.getValue();
} else {
return null;
}
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/org/fcrepo/camel/FcrepoComponent.java
Expand Up @@ -23,6 +23,7 @@
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
import org.slf4j.Logger;
import org.springframework.transaction.PlatformTransactionManager;

/**
* Represents the component that manages {@link FcrepoEndpoint}.
Expand All @@ -33,6 +34,8 @@ public class FcrepoComponent extends UriEndpointComponent {

private FcrepoConfiguration configuration;

private PlatformTransactionManager transactionManager;

private static final Logger LOGGER = getLogger(FcrepoComponent.class);

/**
Expand Down Expand Up @@ -78,6 +81,24 @@ public void setConfiguration(final FcrepoConfiguration config) {
this.configuration = config;
}

/**
* Set the transaction manager for the component
*
* @param transactionManager the transaction manager for this component
*/
public void setTransactionManager(final PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

/**
* Get the transaction manager for the component
*
* @return the transaction manager for this component
*/
public PlatformTransactionManager getTransactionManager() {
return transactionManager;
}

/**
* set the authUsername value component-wide.
* @param username the authentication username.
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/org/fcrepo/camel/FcrepoConfiguration.java
Expand Up @@ -18,6 +18,7 @@
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.RuntimeCamelException;
import org.springframework.transaction.PlatformTransactionManager;

/**
* An FcrepoConfiguration class.
Expand Down Expand Up @@ -63,6 +64,9 @@ public class FcrepoConfiguration implements Cloneable {
@UriParam
private String preferOmit = null;

@UriParam
private PlatformTransactionManager transactionManager = null;

/**
* Create a new FcrepoConfiguration object
*/
Expand Down Expand Up @@ -299,4 +303,23 @@ public void setPreferOmit(final String omit) {
public String getPreferOmit() {
return preferOmit;
}


/**
* transactionManager setter
*
* @param transactionManager the transaction manager for handling transactions
*/
public void setTransactionManager(final PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

/**
* transactionManger getter
*
* @return the transaction manager for handling transactions
*/
public PlatformTransactionManager getTransactionManager() {
return transactionManager;
}
}
85 changes: 84 additions & 1 deletion src/main/java/org/fcrepo/camel/FcrepoEndpoint.java
Expand Up @@ -15,6 +15,8 @@
*/
package org.fcrepo.camel;

import java.net.URI;

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
Expand All @@ -23,6 +25,9 @@
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;

/**
* Represents a Fcrepo endpoint.
Expand All @@ -35,6 +40,10 @@ public class FcrepoEndpoint extends DefaultEndpoint {

private FcrepoConfiguration configuration;

private PlatformTransactionManager transactionManager;

public static final int DEFAULT_HTTPS_PORT = 443;

/**
* Create a FcrepoEndpoint with a uri, path and component
* @param uri the endpoint uri (without path values)
Expand All @@ -46,7 +55,8 @@ public FcrepoEndpoint(final String uri, final String remaining, final FcrepoComp
final FcrepoConfiguration configuration) {
super(uri, component);
this.configuration = configuration;
this.setBaseUrl(remaining);
this.transactionManager = component.getTransactionManager();
setBaseUrl(remaining);
}

/**
Expand Down Expand Up @@ -77,6 +87,59 @@ public boolean isSingleton() {
return true;
}

/**
* Create a template for use in transactions
*
* @return a transaction template
*/
public TransactionTemplate createTransactionTemplate() {
TransactionTemplate transactionTemplate;

if (getTransactionManager() != null) {
transactionTemplate = new TransactionTemplate(getTransactionManager());
} else {
final FcrepoTransactionManager txMgr = new FcrepoTransactionManager();
txMgr.setBaseUrl(getBaseUrlWithScheme());
txMgr.setAuthUsername(getAuthUsername());
txMgr.setAuthPassword(getAuthPassword());
txMgr.setAuthHost(getAuthHost());
transactionTemplate = new TransactionTemplate(txMgr);
}
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
transactionTemplate.afterPropertiesSet();
return transactionTemplate;
}

/**
* Get the repository baseUrl with a full scheme.
* The base URL may be any of the following:
* localhost:8080/rest
* fedora.institution.org:8983/rest
* http://localhost:8080/fcrepo/rest
* https://fedora.institution.org/rest
* fedora.insitution.org:443/rest
*
* This method ensures that the url (fragment) is properly prefixed
* with either the http or https scheme, suitable for sending to the
* httpclient.
*
* @return String
*/
public String getBaseUrlWithScheme() {
final StringBuilder url = new StringBuilder();
final String baseUrl = getBaseUrl();

if (!baseUrl.startsWith("http:") && !baseUrl.startsWith("https:")) {
if (URI.create("http://" + baseUrl).getPort() == DEFAULT_HTTPS_PORT) {
url.append("https://");
} else {
url.append("http://");
}
}
url.append(baseUrl);
return url.toString();
}

/**
* configuration getter
*
Expand Down Expand Up @@ -113,6 +176,26 @@ public String getBaseUrl() {
return getConfiguration().getBaseUrl();
}

/**
* transactionManager setter
*
* @param transactionManager the transaction manager for this endpoint
*/
@ManagedAttribute(description = "Transaction Manager")
public void setTransactionManager(final PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

/**
* transactionManager getter
*
* @return the transaction manager for this endpoint
*/
@ManagedAttribute(description = "Transaction Manager")
public PlatformTransactionManager getTransactionManager() {
return transactionManager;
}

/**
* accept setter
*
Expand Down

0 comments on commit 8183293

Please sign in to comment.