Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
added TransactionService/-Tests in kernel and changed FedoraTransacti…
…ons Endpoint to use the service
  • Loading branch information
fasseg committed Jun 12, 2013
1 parent a35afee commit ed45fe1
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 168 deletions.
Expand Up @@ -4,13 +4,8 @@
import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
import static javax.ws.rs.core.MediaType.TEXT_XML;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.ws.rs.GET;
Expand All @@ -23,88 +18,45 @@

import org.fcrepo.AbstractResource;
import org.fcrepo.Transaction;
import org.springframework.scheduling.annotation.Scheduled;
import org.fcrepo.services.TransactionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Path("/fcr:tx")
public class FedoraTransactions extends AbstractResource {

/*
* TODO: since transactions have to be available on all nodes, they have to
* be either persisted or written to a distributed map or sth, not just
* this plain hashmap that follows
*/
private static Map<String, Transaction> TRANSACTIONS =
new ConcurrentHashMap<String, Transaction>();

public static final long REAP_INTERVAL = 1000;

@Scheduled(fixedRate=REAP_INTERVAL)
public void removeAndRollbackExpired(){
synchronized(TRANSACTIONS){
Iterator<Entry<String, Transaction>> txs = TRANSACTIONS.entrySet().iterator();
while (txs.hasNext()){
Transaction tx = txs.next().getValue();
if (tx.getExpires().getTime() <= System.currentTimeMillis()){
try {
tx.rollback();
} catch (RepositoryException e) {
// TODO Not clear how to respond here
e.printStackTrace();
}
txs.remove();
}
}
}
}
@Autowired
private TransactionService txService;

@POST
@Produces({ APPLICATION_JSON, TEXT_XML })
public Transaction createTransaction() throws RepositoryException {
Session sess = getAuthenticatedSession();
Transaction tx = new Transaction(sess);
TRANSACTIONS.put(tx.getId(), tx);
return tx;
return txService.beginTransaction(sess);
}

@GET
@Path("/{txid}")
public Transaction getTransaction(@PathParam("txid")
final String txid) throws RepositoryException {
final Transaction tx = TRANSACTIONS.get(txid);
if (tx == null) {
throw new PathNotFoundException("Transaction is not available");
}
return tx;
return txService.getTransaction(txid);
}

@POST
@Path("/{txid}/fcr:commit")
@Produces({APPLICATION_JSON, TEXT_XML})
public Transaction commit(@PathParam("txid")
final String txid) throws RepositoryException {
final Transaction tx = TRANSACTIONS.remove(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid +
" is not available");
}
tx.commit();
return tx;
return txService.commit(txid);
}

@POST
@Path("/{txid}/fcr:rollback")
@Produces({APPLICATION_JSON, TEXT_XML})
public Transaction rollback(@PathParam("txid")
final String txid) throws RepositoryException {
final Transaction tx = TRANSACTIONS.remove(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid +
" is not available");
}
tx.rollback();
return tx;
return txService.rollback(txid);
}

@POST
Expand All @@ -113,7 +65,7 @@ public Transaction rollback(@PathParam("txid")
public Response createObjectInTransaction(@PathParam("txid")
final String txid, @PathParam("path")
final List<PathSegment> pathlist) throws RepositoryException {
final Transaction tx = TRANSACTIONS.get(txid);
final Transaction tx = txService.getTransaction(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid +
" is not available");
Expand Down

This file was deleted.

Expand Up @@ -11,7 +11,7 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.fcrepo.Transaction;
import org.fcrepo.Transaction.State;
import org.fcrepo.api.FedoraTransactions;
import org.fcrepo.services.TransactionService;
import org.junit.Test;

public class FedoraTransactionsIT extends AbstractResourceIT {
Expand Down Expand Up @@ -72,7 +72,7 @@ public void testCreateAndTimeoutTransaction() throws Exception {
long diff = 0;
// the loop should be able to run for at least the tx reaping interval
while (!expired &&
(diff = (System.currentTimeMillis() - start)) < (2* FedoraTransactions.REAP_INTERVAL)) {
(diff = (System.currentTimeMillis() - start)) < (2* TransactionService.REAP_INTERVAL)) {
/* check that the tx is removed */
HttpGet getTx = new HttpGet(serverAddress + "fcr:tx/" + tx.getId());
resp = execute(getTx);
Expand All @@ -81,7 +81,7 @@ public void testCreateAndTimeoutTransaction() throws Exception {
}

try {
assertTrue("Transaction did not expire", expired);
assertTrue("Transaction did not expire after " + (System.currentTimeMillis() - start) + " ms", expired);
assertTrue(diff >= testTimeout);
} finally {
System.setProperty(Transaction.TIMEOUT_SYSTEM_PROPERTY,
Expand Down
Expand Up @@ -7,7 +7,7 @@
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

<context:annotation-config />
<context:component-scan base-package="org.fcrepo.api" />
<context:component-scan base-package="org.fcrepo.services" />

<task:scheduler id="taskScheduler" />
<task:executor id="taskExecutor" pool-size="1" />
Expand Down
@@ -0,0 +1,130 @@
/**
*
*/

package org.fcrepo.services;

import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import javax.jcr.PathNotFoundException;
import javax.jcr.RepositoryException;
import javax.jcr.Session;

import org.fcrepo.Transaction;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
* This is part of the Strawman implementation for Fedora transactions
* This service implements a simple {@link Transaction} service which is able to
* create/commit/rollback {@link Transaction} objects
*
* A {@link Scheduled} annotation is used for removing timed out Transactions
* @author frank asseg
*
*/
@Component
public class TransactionService {

/*
* TODO: since transactions have to be available on all nodes, they have to
* be either persisted or written to a distributed map or sth, not just
* this plain hashmap that follows
*/
private static Map<String, Transaction> TRANSACTIONS =
new ConcurrentHashMap<String, Transaction>();

public static final long REAP_INTERVAL = 1000;

@Scheduled(fixedRate = REAP_INTERVAL)
public void removeAndRollbackExpired() {
synchronized (TRANSACTIONS) {
Iterator<Entry<String, Transaction>> txs =
TRANSACTIONS.entrySet().iterator();
while (txs.hasNext()) {
Transaction tx = txs.next().getValue();
if (tx.getExpires().getTime() <= System.currentTimeMillis()) {
try {
tx.rollback();
} catch (RepositoryException e) {
// TODO Not clear how to respond here
e.printStackTrace();
}
txs.remove();
}
}
}
}

/**
* Create a new Transaction and add it to the currently open ones
* @param sess The session to use for this Transaction
* @return the {@link Transaction}
*/
public Transaction beginTransaction(final Session sess) {
final Transaction tx = new Transaction(sess);
TRANSACTIONS.put(tx.getId(), tx);
return tx;
}

/**
* Retrieve an open {@link Transaction}
* @param txid the Id of the {@link Transaction}
* @return the {@link Transaction}
* @throws PathNotFoundException if the {@link Transaction} with the given id has not be found
*/
public Transaction getTransaction(final String txid)
throws PathNotFoundException {
final Transaction tx = TRANSACTIONS.get(txid);
if (tx == null) {
throw new PathNotFoundException("Transaction is not available");
}
return tx;
}

/**
* Check if a Transaction exists
* @param txid the Id of the {@link Transaction}
* @return the {@link Transaction}
* @throws PathNotFoundException if the {@link Transaction} with the given id has not be found
*/
public boolean exists(final String txid) {
final Transaction tx = TRANSACTIONS.get(txid);
return tx != null;
}

/**
* Commit a {@link Transaction} with the given id
* @param txid the id of the {@link Transaction}
* @throws RepositoryException
*/
public Transaction commit(final String txid) throws RepositoryException {
final Transaction tx = TRANSACTIONS.remove(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid +
" is not available");
}
tx.commit();
return tx;
}

/**
* Roll a {@link Transaction} back
* @param txid the id of the {@link Transaction}
* @return the {@link Transaction} object
* @throws RepositoryException if the {@link Transaction} could not be found
*/
public Transaction rollback(final String txid) throws RepositoryException {
final Transaction tx = TRANSACTIONS.remove(txid);
if (tx == null) {
throw new RepositoryException("Transaction with id " + txid +
" is not available");
}
tx.rollback();
return tx;
}

}

0 comments on commit ed45fe1

Please sign in to comment.