Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add streaming support to the fedora client
  • Loading branch information
acoburn authored and Andrew Woods committed Dec 11, 2014
1 parent 377509b commit 3a9adee
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 24 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Expand Up @@ -23,6 +23,7 @@
<fcrepo.version>4.0.1-SNAPSHOT</fcrepo.version>
<camel.version>2.14.0</camel.version>
<commons.lang3.version>3.3.2</commons.lang3.version>
<commons.io.version>2.4</commons.io.version>
<grizzly.version>2.3.16</grizzly.version>
<httpclient.version>4.3.5</httpclient.version>
<jena.fuseki.version>1.1.0</jena.fuseki.version>
Expand Down Expand Up @@ -192,6 +193,13 @@
<version>2.0.1</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons.io.version}</version>
</dependency>


<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/org/fcrepo/camel/FedoraClient.java
Expand Up @@ -15,6 +15,8 @@
*/
package org.fcrepo.camel;

import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand Down Expand Up @@ -45,6 +47,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;

/**
* Represents a client to interact with Fedora's HTTP API.
Expand All @@ -64,6 +67,8 @@ public class FedoraClient {

private Boolean throwExceptionOnFailure = true;

private static final Logger LOGGER = getLogger(FedoraClient.class);

/**
* Create a FedoraClient with a set of authentication values.
* @param username the username for the repository
Expand Down Expand Up @@ -152,7 +157,7 @@ public FedoraResponse put(final URI url, final InputStream body, final String co
if ((status >= HttpStatus.SC_OK && status < HttpStatus.SC_BAD_REQUEST) || !this.throwExceptionOnFailure) {
final HttpEntity entity = response.getEntity();
return new FedoraResponse(url, status, contentTypeHeader, null,
entity != null ? EntityUtils.toString(entity) : null);
entity != null ? entity.getContent() : null);
} else {
throw buildHttpOperationFailedException(url, response);
}
Expand All @@ -179,8 +184,7 @@ public FedoraResponse patch(final URI url, final InputStream body)

if ((status >= HttpStatus.SC_OK && status < HttpStatus.SC_BAD_REQUEST) || !this.throwExceptionOnFailure) {
final HttpEntity entity = response.getEntity();
return new FedoraResponse(url, status, contentType, null,
entity != null ? EntityUtils.toString(entity) : null);
return new FedoraResponse(url, status, contentType, null, entity != null ? entity.getContent() : null);
} else {
throw buildHttpOperationFailedException(url, response);
}
Expand Down Expand Up @@ -208,7 +212,7 @@ public FedoraResponse post(final URI url, final InputStream body, final String c
if ((status >= HttpStatus.SC_OK && status < HttpStatus.SC_BAD_REQUEST) || !this.throwExceptionOnFailure) {
final HttpEntity entity = response.getEntity();
return new FedoraResponse(url, status, contentTypeHeader, null,
entity != null ? EntityUtils.toString(entity) : null);
entity != null ? entity.getContent() : null);
} else {
throw buildHttpOperationFailedException(url, response);
}
Expand All @@ -228,8 +232,7 @@ public FedoraResponse delete(final URI url)

if ((status >= HttpStatus.SC_OK && status < HttpStatus.SC_BAD_REQUEST) || !this.throwExceptionOnFailure) {
final HttpEntity entity = response.getEntity();
return new FedoraResponse(url, status, contentType, null,
entity != null ? EntityUtils.toString(entity) : null);
return new FedoraResponse(url, status, contentType, null, entity != null ? entity.getContent() : null);
} else {
throw buildHttpOperationFailedException(url, response);
}
Expand Down Expand Up @@ -261,7 +264,7 @@ public FedoraResponse get(final URI url, final String accept)
describedBy = links.get(0);
}
return new FedoraResponse(url, status, contentType, describedBy,
entity != null ? EntityUtils.toString(entity) : null);
entity != null ? entity.getContent() : null);
} else {
throw buildHttpOperationFailedException(url, response);
}
Expand Down Expand Up @@ -334,4 +337,6 @@ protected static List<URI> getLinkHeaders(final HttpResponse response, final Str
}
return uris;
}


}
31 changes: 26 additions & 5 deletions src/main/java/org/fcrepo/camel/FedoraProducer.java
Expand Up @@ -37,8 +37,10 @@
import org.apache.camel.Message;
import org.apache.camel.component.http4.HttpMethods;
import org.apache.camel.component.http4.HttpOperationFailedException;
import org.apache.camel.converter.stream.CachedOutputStream;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
import org.slf4j.Logger;

/**
Expand Down Expand Up @@ -91,19 +93,19 @@ public void process(final Exchange exchange) throws HttpOperationFailedException
switch (method) {
case PATCH:
response = client.patch(getMetadataUri(client, url), in.getBody(InputStream.class));
exchange.getIn().setBody(response.getBody());
exchange.getIn().setBody(extractResponseBodyAsStream(response.getBody(), exchange));
break;
case PUT:
response = client.put(create(url), in.getBody(InputStream.class), contentType);
exchange.getIn().setBody(response.getBody());
exchange.getIn().setBody(extractResponseBodyAsStream(response.getBody(), exchange));
break;
case POST:
response = client.post(create(url), in.getBody(InputStream.class), contentType);
exchange.getIn().setBody(response.getBody());
exchange.getIn().setBody(extractResponseBodyAsStream(response.getBody(), exchange));
break;
case DELETE:
response = client.delete(create(url));
exchange.getIn().setBody(response.getBody());
exchange.getIn().setBody(extractResponseBodyAsStream(response.getBody(), exchange));
break;
case HEAD:
response = client.head(create(url));
Expand All @@ -112,7 +114,7 @@ public void process(final Exchange exchange) throws HttpOperationFailedException
case GET:
default:
response = client.get(endpoint.getMetadata() ? getMetadataUri(client, url) : create(url), accept);
exchange.getIn().setBody(response.getBody());
exchange.getIn().setBody(extractResponseBodyAsStream(response.getBody(), exchange));
}
exchange.getIn().setHeader(CONTENT_TYPE, response.getContentType());
exchange.getIn().setHeader(HTTP_RESPONSE_CODE, response.getStatusCode());
Expand Down Expand Up @@ -216,4 +218,23 @@ protected String getUrl(final Exchange exchange) {
}
return url.toString();
}

private static InputStream extractResponseBodyAsStream(final InputStream is, final Exchange exchange)
throws IOException {
// As httpclient is using a AutoCloseInputStream, it will be closed when the connection is closed
// we need to cache the stream for it.
if (is == null) {
return null;
} else {
try (final CachedOutputStream cos = new CachedOutputStream(exchange, false)) {
// This CachedOutputStream will not be closed when the exchange is onCompletion
IOHelper.copy(is, cos);
// When the InputStream is closed, the CachedOutputStream will be closed
return cos.getWrappedInputStream();

} finally {
IOHelper.close(is, "Extracting response body", LOGGER);
}
}
}
}
9 changes: 5 additions & 4 deletions src/main/java/org/fcrepo/camel/FedoraResponse.java
Expand Up @@ -16,6 +16,7 @@
package org.fcrepo.camel;

import java.net.URI;
import java.io.InputStream;

/**
* Represents a response from a fedora repository using a {@link FedoraClient}.
Expand All @@ -33,15 +34,15 @@ public class FedoraResponse {

private URI location;

private String body;
private InputStream body;

private String contentType;

/**
* Create a FedoraResponse object from the http response
*/
public FedoraResponse(final URI url, final int statusCode,
final String contentType, final URI location, final String body) {
final String contentType, final URI location, final InputStream body) {
this.setUrl(url);
this.setStatusCode(statusCode);
this.setLocation(location);
Expand Down Expand Up @@ -82,15 +83,15 @@ public void setStatusCode(final int statusCode) {
/**
* body getter
*/
public String getBody() {
public InputStream getBody() {
return body;
}

/**
* body setter
* @param body the contents of the response body
*/
public void setBody(final String body) {
public void setBody(final InputStream body) {
this.body = body;
}

Expand Down
10 changes: 8 additions & 2 deletions src/test/java/org/fcrepo/camel/FedoraAuthTest.java
Expand Up @@ -107,6 +107,7 @@ public void configure() {

from("direct:auth1")
.to(fcrepo_uri + "?authUsername=foo")
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand All @@ -115,6 +116,7 @@ public void configure() {

from("direct:auth2")
.to(fcrepo_uri + "?authPassword=foo")
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand All @@ -123,6 +125,7 @@ public void configure() {

from("direct:auth3")
.to(fcrepo_uri + "?authPassword=foo&authUsername=")
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand All @@ -131,6 +134,7 @@ public void configure() {

from("direct:auth4")
.to(fcrepo_uri + "?authPassword=&authUsername=")
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand All @@ -139,6 +143,7 @@ public void configure() {

from("direct:auth5")
.to(fcrepo_uri + "?authPassword=")
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand All @@ -147,6 +152,7 @@ public void configure() {

from("direct:auth6")
.to(fcrepo_uri + "?authUsername=")
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand All @@ -155,6 +161,7 @@ public void configure() {

from("direct:auth7")
.to(fcrepo_uri + "?authUsername=foo&authPassword=bar&authHost=localhost")
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand All @@ -163,14 +170,13 @@ public void configure() {

from("direct:auth8")
.to(fcrepo_uri + "?authUsername=foo&authPassword=bar")
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
.split(titleXpath)
.to("mock:result");



from("direct:teardown")
.to(fcrepo_uri)
.to(fcrepo_uri + "?tombstone=true");
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/fcrepo/camel/FedoraPostTest.java
Expand Up @@ -100,6 +100,7 @@ public void configure() {

from("direct:start")
.to(fcrepo_uri)
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand Down
19 changes: 13 additions & 6 deletions src/test/java/org/fcrepo/camel/FedoraResponseTest.java
Expand Up @@ -18,11 +18,16 @@

import static org.junit.Assert.assertEquals;
import static java.net.URI.create;
import static java.nio.charset.StandardCharsets.UTF_8;

import java.net.URI;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.IOException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.commons.io.IOUtils;
import org.mockito.runners.MockitoJUnitRunner;

/**
Expand All @@ -32,20 +37,20 @@
public class FedoraResponseTest {

@Test
public void testResponse() {
public void testResponse() throws IOException {
final URI uri = create("http://localhost/path/a/b");
final int status = 200;
final String contentType = "text/plain";
final URI location = create("http://localhost/path/a/b/c");
final String body = "Text response";

final FedoraResponse response = new FedoraResponse(uri, status, contentType, location, body);
final InputStream bodyStream = new ByteArrayInputStream(body.getBytes(UTF_8));
final FedoraResponse response = new FedoraResponse(uri, status, contentType, location, bodyStream);

assertEquals(response.getUrl(), uri);
assertEquals(response.getStatusCode(), status);
assertEquals(response.getContentType(), contentType);
assertEquals(response.getLocation(), location);
assertEquals(response.getBody(), body);
assertEquals(IOUtils.toString(response.getBody(), UTF_8), body);

response.setUrl(create("http://example.org/path/a/b"));
assertEquals(response.getUrl(), create("http://example.org/path/a/b"));
Expand All @@ -59,8 +64,10 @@ public void testResponse() {
response.setLocation(create("http://example.org/path/a/b/c"));
assertEquals(response.getLocation(), create("http://example.org/path/a/b/c"));

response.setBody("<http://example.org/book/3> <dc:title> \"Title\" .");
assertEquals(response.getBody(), "<http://example.org/book/3> <dc:title> \"Title\" .");
response.setBody(new ByteArrayInputStream(
"<http://example.org/book/3> <dc:title> \"Title\" .".getBytes(UTF_8)));
assertEquals(IOUtils.toString(response.getBody(), UTF_8),
"<http://example.org/book/3> <dc:title> \"Title\" .");
}

}
Expand Up @@ -100,6 +100,7 @@ public void configure() {

from("direct:start")
.to(fcrepo_uri)
.convertBodyTo(org.w3c.dom.Document.class)
.filter().xpath(
"/rdf:RDF/rdf:Description/rdf:type" +
"[@rdf:resource='http://fedora.info/definitions/v4/repository#Resource']", ns)
Expand Down

0 comments on commit 3a9adee

Please sign in to comment.