Skip to content

Commit

Permalink
added timeout mechanism for transactions and integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
fasseg committed May 14, 2013
1 parent 2ee3e43 commit 45bc0a3
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 80 deletions.
162 changes: 91 additions & 71 deletions fcrepo-http-api/src/main/java/org/fcrepo/api/FedoraTransactions.java
@@ -1,3 +1,4 @@

package org.fcrepo.api;

import java.util.Iterator;
Expand All @@ -6,6 +7,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.ws.rs.GET;
Expand All @@ -22,6 +24,8 @@
import org.fcrepo.Transaction;
import org.fcrepo.Transaction.State;
import org.fcrepo.services.ObjectService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
Expand All @@ -30,83 +34,99 @@
@Path("/rest/fcr:tx")
public class FedoraTransactions extends AbstractResource {

@Autowired
private ObjectService objectService;
private static final Logger logger = LoggerFactory
.getLogger(FedoraTransactions.class);

@Autowired
private ObjectService objectService;

/* TODO: since transactions have to be available on all nodes, they have to be either persisted or written to a */
/* distributed map or sth, not just this plain hashmap that follows */
private static Map<String, Transaction> transactions = new ConcurrentHashMap<String, Transaction>();
/*
* TODO: since transactions have to be available on all nodes, they have to
* be either persisted or written to a
*/
/* distributed map or sth, not just this plain hashmap that follows */
private static Map<String, Transaction> transactions =
new ConcurrentHashMap<String, Transaction>();

@Scheduled(fixedRate=1000)
public void removeAndRollbackExpired(){
synchronized(transactions){
Iterator<Entry<String, Transaction>> txs = transactions.entrySet().iterator();
while (txs.hasNext()){
Transaction tx = txs.next().getValue();
if (tx.getExpires().getTime() > System.currentTimeMillis()){
txs.remove();
}
}
}
}
@Scheduled(fixedRate = 100)
public void removeAndRollbackExpired() {
Iterator<Entry<String, Transaction>> txs =
transactions.entrySet().iterator();
while (txs.hasNext()) {
Transaction tx = txs.next().getValue();
if (tx.getExpires().getTime() <= System.currentTimeMillis()) {
logger.debug("timeout for transaction " + tx.getId());
txs.remove();
}
}
}

@POST
@Produces({ MediaType.APPLICATION_JSON, MediaType.TEXT_XML })
public Transaction createTransaction() throws RepositoryException {
Session sess = getAuthenticatedSession();
Transaction tx = new Transaction(sess);
transactions.put(tx.getId(), tx);
return tx;
}
@POST
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
public Transaction createTransaction() throws RepositoryException {
Session sess = getAuthenticatedSession();
Transaction tx = new Transaction(sess);
transactions.put(tx.getId(), tx);
return tx;
}

@GET
@Path("/{txid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.TEXT_XML })
public Transaction getTransaction(@PathParam("txid") final String txid) throws RepositoryException {
Transaction tx = transactions.get(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid + " is not available");
}
return tx;
}
@GET
@Path("/{txid}")
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
public Transaction getTransaction(@PathParam("txid")
final String txid) throws RepositoryException {
Transaction tx = transactions.get(txid);
if (tx == null) {
throw new PathNotFoundException(txid + "is not available");
}
return tx;
}

@POST
@Path("/{txid}/fcr:commit")
@Produces({ MediaType.APPLICATION_JSON, MediaType.TEXT_XML })
public Transaction commit(@PathParam("txid") final String txid) throws RepositoryException {
Transaction tx = transactions.remove(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid + " is not available");
}
tx.getSession().save();
tx.setState(State.COMMITED);
return tx;
}
@POST
@Path("/{txid}/fcr:commit")
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
public Transaction commit(@PathParam("txid")
final String txid) throws RepositoryException {
Transaction tx = transactions.remove(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid +
" is not available");
}
tx.getSession().save();
tx.setState(State.COMMITED);
return tx;
}

@POST
@Path("/{txid}/fcr:rollback")
@Produces({ MediaType.APPLICATION_JSON, MediaType.TEXT_XML })
public Transaction rollback(@PathParam("txid") final String txid) throws RepositoryException {
Transaction tx = transactions.remove(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid + " is not available");
}
tx.setState(State.ROLLED_BACK);
return tx;
}
@POST
@Path("/{txid}/fcr:rollback")
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
public Transaction rollback(@PathParam("txid")
final String txid) throws RepositoryException {
Transaction tx = transactions.remove(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid +
" is not available");
}
tx.setState(State.ROLLED_BACK);
return tx;
}

@POST
@Path("/{txid}/{path: .*}/fcr:newhack")
@Produces({MediaType.TEXT_PLAIN})
public Response createObjectInTransaction(@PathParam("txid") final String txid, @PathParam("path") final List<PathSegment> pathlist)throws RepositoryException {
Transaction tx = transactions.get(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid + " is not available");
}
final String path = toPath(pathlist);
final FedoraObject obj = objectService.createObject(tx.getSession(), path);
tx.setState(State.DIRTY);
return Response.ok(path).build();
}
@POST
@Path("/{txid}/{path: .*}/fcr:newhack")
@Produces({MediaType.TEXT_PLAIN})
public Response createObjectInTransaction(@PathParam("txid")
final String txid, @PathParam("path")
final List<PathSegment> pathlist) throws RepositoryException {
Transaction tx = transactions.get(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid +
" is not available");
}
final String path = toPath(pathlist);
final FedoraObject obj =
objectService.createObject(tx.getSession(), path);
tx.setState(State.DIRTY);
return Response.ok(path).build();
}

}
Expand Up @@ -4,8 +4,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import javax.validation.constraints.AssertTrue;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
Expand All @@ -14,7 +12,10 @@
import org.fcrepo.Transaction.State;
import org.junit.Test;


public class FedoraTransactionsIT extends AbstractResourceIT {


@Test
public void testCreateAndGetTransaction() throws Exception {
/* create a tx */
Expand All @@ -23,21 +24,50 @@ public void testCreateAndGetTransaction() throws Exception {
assertTrue(resp.getStatusLine().getStatusCode() == 200);
ObjectMapper mapper = new ObjectMapper();
Transaction tx = mapper.readValue(resp.getEntity().getContent(), Transaction.class);
createTx.releaseConnection();
assertNotNull(tx);
assertNotNull(tx.getId());
assertNotNull(tx.getState());
assertNotNull(tx.getCreated());
assertTrue(tx.getState() == State.NEW);

/* fetch the create dtx from the endpoint */
/* fetch the created tx from the endpoint */
HttpGet getTx = new HttpGet(serverAddress + "fcr:tx/" + tx.getId());
resp = execute(getTx);
Transaction fetched = mapper.readValue(resp.getEntity().getContent(), Transaction.class);
getTx.releaseConnection();
assertEquals(tx.getId(), fetched.getId());
assertEquals(tx.getState(), fetched.getState());
assertEquals(tx.getCreated(), fetched.getCreated());
}


@Test
public void testCreateAndTimeoutTransaction() throws Exception {
System.setProperty("fcrepo4.tx.timeout", "10");
/* create a tx */
HttpPost createTx = new HttpPost(serverAddress + "fcr:tx");
HttpResponse resp = execute(createTx);
assertTrue(resp.getStatusLine().getStatusCode() == 200);
ObjectMapper mapper = new ObjectMapper();
Transaction tx = mapper.readValue(resp.getEntity().getContent(), Transaction.class);
assertNotNull(tx);
assertNotNull(tx.getId());
assertNotNull(tx.getState());
assertNotNull(tx.getCreated());
assertTrue(tx.getState() == State.NEW);
createTx.releaseConnection();

Thread.sleep(200); // wait for the tx to expire

/* check that the tx is removed */
HttpGet getTx = new HttpGet(serverAddress + "fcr:tx/" + tx.getId());
resp = execute(getTx);
getTx.releaseConnection();
assertEquals(404, resp.getStatusLine().getStatusCode());
System.clearProperty("fcrepo4.tx.timeout");
}

@Test
public void testCreateAndCommitTransaction() throws Exception {
/* create a tx */
Expand All @@ -46,33 +76,39 @@ public void testCreateAndCommitTransaction() throws Exception {
assertTrue(resp.getStatusLine().getStatusCode() == 200);
ObjectMapper mapper = new ObjectMapper();
Transaction tx = mapper.readValue(resp.getEntity().getContent(), Transaction.class);
createTx.releaseConnection();
assertNotNull(tx);
assertNotNull(tx.getId());
assertNotNull(tx.getState());
assertNotNull(tx.getCreated());
assertTrue(tx.getState() == State.NEW);

/* create a new object inside the tx */
HttpPost postNew = new HttpPost(serverAddress + "fcr:tx/" + tx.getId() + "/objects/testobj1/fcr:newhack");
resp = execute(postNew);
postNew.releaseConnection();
assertTrue(resp.getStatusLine().getStatusCode() == 200);

/* check if the object is already there, before the commit */
HttpGet getObj = new HttpGet(serverAddress + "/objects/testobj1");
resp = execute(getObj);
getObj.releaseConnection();
assertTrue(resp.getStatusLine().toString(),resp.getStatusLine().getStatusCode() == 404);

/* commit the tx */
HttpPost commitTx = new HttpPost(serverAddress + "fcr:tx/" + tx.getId() + "/fcr:commit");
resp = execute(commitTx);
assertTrue(resp.getStatusLine().getStatusCode() == 200);
Transaction committed = mapper.readValue(resp.getEntity().getContent(), Transaction.class);
commitTx.releaseConnection();
assertEquals(committed.getState(), State.COMMITED);

/* check if the object is there, after the commit */
resp = execute(getObj);
assertTrue(resp.getStatusLine().toString(),resp.getStatusLine().getStatusCode() == 200);
getObj.releaseConnection();
}

@Test
public void testCreateAndRollbackTransaction() throws Exception {
/* create a tx */
Expand All @@ -81,6 +117,7 @@ public void testCreateAndRollbackTransaction() throws Exception {
assertTrue(resp.getStatusLine().getStatusCode() == 200);
ObjectMapper mapper = new ObjectMapper();
Transaction tx = mapper.readValue(resp.getEntity().getContent(), Transaction.class);
createTx.releaseConnection();
assertNotNull(tx);
assertNotNull(tx.getId());
assertNotNull(tx.getState());
Expand All @@ -91,21 +128,25 @@ public void testCreateAndRollbackTransaction() throws Exception {
HttpPost postNew = new HttpPost(serverAddress + "fcr:tx/" + tx.getId() + "/objects/testobj2/fcr:newhack");
resp = execute(postNew);
assertTrue(resp.getStatusLine().getStatusCode() == 200);

postNew.releaseConnection();

/* check if the object is already there, before the commit */
HttpGet getObj = new HttpGet(serverAddress + "/objects/testobj2");
resp = execute(getObj);
getObj.releaseConnection();
assertTrue(resp.getStatusLine().toString(),resp.getStatusLine().getStatusCode() == 404);

/* and rollback */
HttpPost rollbackTx = new HttpPost(serverAddress + "fcr:tx/" + tx.getId() + "/fcr:rollback");
resp = execute(rollbackTx);
Transaction rolledBack = mapper.readValue(resp.getEntity().getContent(), Transaction.class);
rollbackTx.releaseConnection();
assertEquals(rolledBack.getId(),tx.getId());
assertTrue(rolledBack.getState() == State.ROLLED_BACK);

/* check if the object is there, after the rollback */
resp = execute(getObj);
getObj.releaseConnection();
assertTrue(resp.getStatusLine().toString(),resp.getStatusLine().getStatusCode() == 404);
}
}
1 change: 1 addition & 0 deletions fcrepo-http-api/src/test/resources/spring-test/master.xml
Expand Up @@ -9,5 +9,6 @@
<import resource="classpath:/spring-test/repo.xml"/>
<import resource="classpath:/spring-test/rest.xml"/>
<import resource="classpath:/spring-test/eventing.xml"/>
<import resource="classpath:/spring-test/transactions.xml"/>

</beans>
16 changes: 16 additions & 0 deletions fcrepo-http-api/src/test/resources/spring-test/transactions.xml
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

<context:annotation-config />
<context:component-scan base-package="org.fcrepo.api" />

<task:scheduler id="taskScheduler" />
<task:executor id="taskExecutor" pool-size="1" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler" />

</beans>
8 changes: 7 additions & 1 deletion fcrepo-kernel/src/main/java/org/fcrepo/Transaction.java
Expand Up @@ -43,7 +43,13 @@ public Transaction(Session session) {
super();
this.session = session;
this.created = new Date();
this.expires = new Date(System.currentTimeMillis() + (1000 * 60 * 3));
long duration;
if (System.getProperty("fcrepo4.tx.timeout") != null){
duration = Long.parseLong(System.getProperty("fcrepo4.tx.timeout"));
}else{
duration = 1000l * 60l * 3l;
}
this.expires = new Date(System.currentTimeMillis() + duration);
this.id = UUID.randomUUID().toString();
}

Expand Down

0 comments on commit 45bc0a3

Please sign in to comment.