Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add a subtype for chaining stores, and have cluster executions of tra…
…nsforms unroll them for chaining stores
  • Loading branch information
barmintor committed Jun 6, 2013
1 parent 9d294df commit 94d079a
Show file tree
Hide file tree
Showing 15 changed files with 591 additions and 297 deletions.
Expand Up @@ -246,7 +246,7 @@ public boolean apply(org.fcrepo.utils.FixityResult input) {
result.getEntry()
.storeValue(anyGoodCacheEntry.getInputStream());
final FixityResult newResult =
result.getEntry().checkFixity(digestUri, size, digest);
result.getEntry().checkFixity(digestUri, size, digest.getAlgorithm());
if (newResult.isSuccess()) {
result.setRepaired();
fixityRepairedCounter.inc();
Expand Down
Expand Up @@ -6,10 +6,12 @@

package org.fcrepo.services;

import static com.google.common.collect.Collections2.transform;
import static com.google.common.collect.ImmutableSet.builder;
import static org.fcrepo.services.ServiceHelpers.getClusterExecutor;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -30,6 +32,8 @@
import org.fcrepo.services.functions.CacheLocalTransform;
import org.fcrepo.services.functions.GetBinaryKey;
import org.fcrepo.utils.LowLevelCacheEntry;
import org.fcrepo.utils.impl.ChainingCacheStoreEntry;
import org.fcrepo.utils.impl.LocalBinaryStoreEntry;
import org.infinispan.distexec.DistributedExecutorService;
import org.modeshape.jcr.GetBinaryStore;
import org.modeshape.jcr.api.JcrConstants;
Expand Down Expand Up @@ -111,7 +115,7 @@ public <T> Collection<T> transformLowLevelCacheEntries(final BinaryKey key,
}
} else {
final ImmutableSet.Builder<T> blobs = builder();
blobs.add(transform.apply(new LowLevelCacheEntry(store, key)));
blobs.add(transform.apply(new LocalBinaryStoreEntry(store, key)));
return blobs.build();
}
}
Expand Down Expand Up @@ -151,7 +155,8 @@ protected <T> Set<T> transformLowLevelCacheEntries(
}

/**
* Transform low-level cache entries from a particular CompositeBinaryStore
* Steer low-level cache entries to transform functions according
* to the subtype of BinaryStore in question
* @param key a Modeshape BinaryValue's key.
* @return a set of transformed objects
*/
Expand All @@ -170,18 +175,18 @@ protected <T> Set<T> transformLowLevelCacheEntries(

} else if (store instanceof InfinispanBinaryStore) {
try {
return getClusterResults(
(InfinispanBinaryStore) store, key, transform);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
return ImmutableSet.of();
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
return ImmutableSet.of();
}
return getClusterResults(
(InfinispanBinaryStore) store, key, transform);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
return ImmutableSet.of();
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
return ImmutableSet.of();
}
} else {
final ImmutableSet.Builder<T> blobs = builder();
blobs.add(transform.apply(new LowLevelCacheEntry(store, key)));
blobs.add(transform.apply(new LocalBinaryStoreEntry(store, key)));
return blobs.build();
}
}
Expand Down Expand Up @@ -242,22 +247,24 @@ public Set<LowLevelCacheEntry> getLowLevelCacheEntries(final BinaryKey key) {
public Set<LowLevelCacheEntry> getLowLevelCacheEntriesFromStore(final BinaryStore store,
final BinaryKey key) {

final ImmutableSet.Builder<LowLevelCacheEntry> blobs = builder();

if (store instanceof CompositeBinaryStore) {
return
transformLowLevelCacheEntries((CompositeBinaryStore) store,
key, new Echo<LowLevelCacheEntry>());
for (LowLevelCacheEntry entries:
transformLowLevelCacheEntries((CompositeBinaryStore) store, key, new Echo())) {
blobs.add(entries);
}

} else if (store instanceof InfinispanBinaryStore) {
return
transformLowLevelCacheEntries((InfinispanBinaryStore) store,
key, new Echo<LowLevelCacheEntry>());
for (LowLevelCacheEntry entries:
transformLowLevelCacheEntries((InfinispanBinaryStore) store, key, new Echo())) {
blobs.add(entries);
}

} else {
final ImmutableSet.Builder<LowLevelCacheEntry> blobs = builder();
blobs.add(new LowLevelCacheEntry(store, key));
return blobs.build();
blobs.add(new LocalBinaryStoreEntry(store, key));
}

return blobs.build();
}

/**
Expand All @@ -267,9 +274,14 @@ public Set<LowLevelCacheEntry> getLowLevelCacheEntriesFromStore(final BinaryStor
*/
protected Set<LowLevelCacheEntry> getLowLevelCacheEntriesFromStore(final CompositeBinaryStore compositeStore,
final BinaryKey key) {
final ImmutableSet.Builder<LowLevelCacheEntry> blobs = builder();

for (LowLevelCacheEntry entries:
transformLowLevelCacheEntries(compositeStore, key, new Echo())) {
blobs.add(entries);
}
return blobs.build();

return transformLowLevelCacheEntries(
compositeStore, key, new Echo<LowLevelCacheEntry>());
}

/**
Expand All @@ -284,7 +296,7 @@ protected Set<LowLevelCacheEntry> getLowLevelCacheEntriesFromStore(final Infinis
"InfinispanBinaryStore {}", key, ispnStore);

try {
return getClusterResults(ispnStore, key, new Echo<LowLevelCacheEntry>());
return getClusterResults(ispnStore, key, new Echo());
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
return ImmutableSet.of();
Expand All @@ -300,18 +312,18 @@ public <T> Set<T> getClusterResults(InfinispanBinaryStore cacheStore,
DistributedExecutorService exec =
getClusterExecutor(cacheStore);
@SuppressWarnings( {"synthetic-access", "unchecked", "rawtypes"} )
List<Future<T>> futures = exec.submitEverywhere(
new CacheLocalTransform(cacheStore, key, transform));
List<Future<Collection<T>>> futures = exec.submitEverywhere(
new CacheLocalTransform(key, new Unroll<T>(transform)));
Set<T> results = new HashSet<T>(futures.size());
while(futures.size() > 0) {
Iterator<Future<T>> futureIter =
Iterator<Future<Collection<T>>> futureIter =
futures.iterator();
while(futureIter.hasNext()) {
Future<T> future = futureIter.next();
Future<Collection<T>> future = futureIter.next();
try {
T result = future.get(100, TimeUnit.MILLISECONDS);
Collection<T> result = future.get(100, TimeUnit.MILLISECONDS);
futureIter.remove();
results.add(result);
results.addAll(result);
} catch (TimeoutException e) {
// we're just going to ignore this and try again!
}
Expand Down Expand Up @@ -342,9 +354,13 @@ public void setGetBinaryKey(final GetBinaryKey getBinaryKey) {
}

static class ExternalIdDecorator<T>
implements Function<LowLevelCacheEntry, T> {
implements Function<LowLevelCacheEntry, T>, Serializable {

final String externalId;
/**
* Must be serializable
*/
private static final long serialVersionUID = 7375231595038804409L;
final String externalId;
final Function<LowLevelCacheEntry, T> transform;
ExternalIdDecorator(String externalId, Function<LowLevelCacheEntry, T> transform) {
this.externalId = externalId;
Expand All @@ -359,13 +375,40 @@ public T apply(LowLevelCacheEntry input) {

}

static class Echo<T> implements Function<T, T> {
static class Echo implements Function<LowLevelCacheEntry, LowLevelCacheEntry>, Serializable {

private static final long serialVersionUID = -1L;

@Override
public T apply(T input) {
public LowLevelCacheEntry apply(LowLevelCacheEntry input) {
return input;
}

}

static class Unroll<T> implements Function<LowLevelCacheEntry, Collection<T>>, Serializable {

private final Function<LowLevelCacheEntry, T> transform;

Unroll(Function<LowLevelCacheEntry, T> transform) {
this.transform = transform;
}

private static final long serialVersionUID = -1L;

@Override
public Collection<T> apply(LowLevelCacheEntry input) {
final ImmutableSet.Builder<T> entries = builder();
if (input instanceof ChainingCacheStoreEntry) {
entries.addAll(
transform(((ChainingCacheStoreEntry)input).chainedEntries(), transform)
);
} else {
entries.add(transform.apply(input)).build();
}
return entries.build();
}

}

}
@@ -1,43 +1,56 @@
package org.fcrepo.services.functions;

import java.io.Serializable;
import java.util.Set;

import org.fcrepo.utils.LowLevelCacheEntry;
import org.fcrepo.utils.impl.CacheStoreEntry;
import org.fcrepo.utils.impl.ChainingCacheStoreEntry;
import org.infinispan.Cache;
import org.infinispan.CacheImpl;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.decorators.ChainingCacheStore;
import org.modeshape.jcr.value.BinaryKey;
import org.modeshape.jcr.value.binary.BinaryStore;

import com.google.common.base.Function;


public class CacheLocalTransform<K, V, T>
implements DistributedCallable<K, V, T> {
implements DistributedCallable<K, V, T>, Serializable {

/**
* Because this class will be communicated between cache nodes,
* it must be serializable
*/
private static final long serialVersionUID = -7014104738830230123L;

private static GetCacheStore TRANSFORM = new GetCacheStore();

private BinaryStore binStore;
private BinaryKey key;
private Function<LowLevelCacheEntry, T> entryTransform;
private CacheStore store;
private String cacheName = "";

public CacheLocalTransform(final BinaryStore store,
final BinaryKey key, final Function<LowLevelCacheEntry, T> entryTransform) {
public CacheLocalTransform(final BinaryKey key,
final Function<LowLevelCacheEntry, T> entryTransform) {

this.key = key;
this.entryTransform = entryTransform;
this.binStore = store;
}

@Override
public T call() throws Exception {
LowLevelCacheEntry entry = new LowLevelCacheEntry(binStore, store, key);
LowLevelCacheEntry entry =
(store instanceof ChainingCacheStore) ?
new ChainingCacheStoreEntry((ChainingCacheStore)store, cacheName, key) :
new CacheStoreEntry(store, cacheName, key);
return this.entryTransform.apply(entry);
}

@Override
public void setEnvironment(Cache<K, V> cache, Set<K> keys) {
this.store = TRANSFORM.apply(cache);

this.cacheName = ((CacheImpl)cache).getName();
}
}
Expand Up @@ -9,8 +9,10 @@
import static com.google.common.base.Throwables.propagate;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.Serializable;
import java.net.URI;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import org.fcrepo.services.LowLevelStorageService;
import org.fcrepo.utils.FixityResult;
Expand All @@ -28,12 +30,19 @@
* @date Apr 2, 2013
*/
public class CheckCacheEntryFixity implements
Function<LowLevelCacheEntry, FixityResult> {
Function<LowLevelCacheEntry, FixityResult>,
Serializable {

private static final Logger logger =
/**
* So that it can be communicated as state to cluster members
* during distributed fixity checks
*/
private static final long serialVersionUID = 4701589005571818110L;

private static final Logger logger =
getLogger(LowLevelStorageService.class);

private final MessageDigest digest;
private final String digestAlgorithm;

private final URI dsChecksum;

Expand All @@ -44,7 +53,7 @@ public class CheckCacheEntryFixity implements
*/
public CheckCacheEntryFixity(final MessageDigest digest,
final URI dsChecksum, final long dsSize) {
this.digest = digest;
this.digestAlgorithm = digest.getAlgorithm();
this.dsChecksum = dsChecksum;
this.dsSize = dsSize;
}
Expand All @@ -58,7 +67,7 @@ public FixityResult apply(final LowLevelCacheEntry input) {
input.toString());
FixityResult result = null;
try {
result = input.checkFixity(dsChecksum, dsSize, digest);
result = input.checkFixity(dsChecksum, dsSize, digestAlgorithm);
} catch (final BinaryStoreException e) {
logger.error("Exception checking low-level fixity: {}", e);
throw propagate(e);
Expand All @@ -70,7 +79,15 @@ public FixityResult apply(final LowLevelCacheEntry input) {
* Returns the digest to be used in checksum verification.
*/
public MessageDigest getDigest() {
return digest;
try {
return MessageDigest.getInstance(digestAlgorithm);
} catch (NoSuchAlgorithmException e) {
/**
* @todo this really shouldn't ever happen, since it is derived from
* an existing MessageDigest
*/
return null;
}
}

/**
Expand Down

0 comments on commit 94d079a

Please sign in to comment.