Skip to content

Commit

Permalink
Move CacheLocalTransform into a class, testing
Browse files Browse the repository at this point in the history
  • Loading branch information
barmintor committed Jun 5, 2013
1 parent ae56df3 commit 9d294df
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 75 deletions.
Expand Up @@ -6,15 +6,13 @@

package org.fcrepo.services;

import static com.google.common.collect.Collections2.transform;
import static com.google.common.collect.ImmutableSet.builder;
import static org.fcrepo.services.ServiceHelpers.getClusterExecutor;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -29,20 +27,10 @@
import javax.jcr.Repository;
import javax.jcr.RepositoryException;

import org.fcrepo.services.functions.CheckCacheEntryFixity;
import org.fcrepo.services.functions.CacheLocalTransform;
import org.fcrepo.services.functions.GetBinaryKey;
import org.fcrepo.services.functions.GetCacheStore;
import org.fcrepo.utils.FixityResult;
import org.fcrepo.utils.LowLevelCacheEntry;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheStoreConfiguration;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.distexec.DistributedExecutorService;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.decorators.ChainingCacheStore;
import org.modeshape.common.collection.Collections;
import org.modeshape.jcr.GetBinaryStore;
import org.modeshape.jcr.api.JcrConstants;
import org.modeshape.jcr.value.BinaryKey;
Expand Down Expand Up @@ -71,9 +59,6 @@ public class LowLevelStorageService {

private GetBinaryKey getBinaryKey = new GetBinaryKey();

private GetCacheStore getCacheStore = new GetCacheStore();


/**
* Apply some Function to the low-level cache entries for the Node
* @param resource a JCR Node containing a jcr:data binary property
Expand Down Expand Up @@ -106,7 +91,7 @@ public <T> Collection<T> transformLowLevelCacheEntries(final BinaryKey key,
final BinaryStore store = getBinaryStore.apply(repo);

if (store == null) {
return new HashSet<>();
return ImmutableSet.of();
}
if (store instanceof CompositeBinaryStore) {
return
Expand All @@ -119,10 +104,10 @@ public <T> Collection<T> transformLowLevelCacheEntries(final BinaryKey key,
(InfinispanBinaryStore) store, key, transform);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
return new HashSet<T>(0);
return ImmutableSet.of();
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
return new HashSet<T>(0);
return ImmutableSet.of();
}
} else {
final ImmutableSet.Builder<T> blobs = builder();
Expand Down Expand Up @@ -176,7 +161,7 @@ protected <T> Set<T> transformLowLevelCacheEntries(
final Function <LowLevelCacheEntry, T> transform) {

if (store == null) {
return new HashSet<>();
return ImmutableSet.of();
}
if (store instanceof CompositeBinaryStore) {
return
Expand All @@ -189,10 +174,10 @@ protected <T> Set<T> transformLowLevelCacheEntries(
(InfinispanBinaryStore) store, key, transform);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
return new HashSet<T>(0);
return ImmutableSet.of();
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
return new HashSet<T>(0);
return ImmutableSet.of();
}
} else {
final ImmutableSet.Builder<T> blobs = builder();
Expand Down Expand Up @@ -242,7 +227,7 @@ public Set<LowLevelCacheEntry> getLowLevelCacheEntries(final BinaryKey key) {
final BinaryStore store = getBinaryStore.apply(repo);

if (store == null) {
return new HashSet<>();
return ImmutableSet.of();
}

return getLowLevelCacheEntriesFromStore(store, key);
Expand Down Expand Up @@ -302,18 +287,18 @@ protected Set<LowLevelCacheEntry> getLowLevelCacheEntriesFromStore(final Infinis
return getClusterResults(ispnStore, key, new Echo<LowLevelCacheEntry>());
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
return new HashSet<LowLevelCacheEntry>(0);
return ImmutableSet.of();
} catch (ExecutionException e) {
logger.error(e.getMessage(), e);
return new HashSet<LowLevelCacheEntry>(0);
return ImmutableSet.of();
}
}

public <T> Set<T> getClusterResults(InfinispanBinaryStore cacheStore,
BinaryKey key, Function<LowLevelCacheEntry, T> transform)
throws InterruptedException, ExecutionException {
DistributedExecutorService exec =
new DefaultExecutorService(cacheStore.getCaches().get(0));
getClusterExecutor(cacheStore);
@SuppressWarnings( {"synthetic-access", "unchecked", "rawtypes"} )
List<Future<T>> futures = exec.submitEverywhere(
new CacheLocalTransform(cacheStore, key, transform));
Expand Down Expand Up @@ -356,14 +341,7 @@ public void setGetBinaryKey(final GetBinaryKey getBinaryKey) {
this.getBinaryKey = getBinaryKey;
}

/**
* @todo Add Documentation.
*/
public void setGetCacheStore(final GetCacheStore getCacheStore) {
this.getCacheStore = getCacheStore;
}

private static class ExternalIdDecorator<T>
static class ExternalIdDecorator<T>
implements Function<LowLevelCacheEntry, T> {

final String externalId;
Expand All @@ -381,7 +359,7 @@ public T apply(LowLevelCacheEntry input) {

}

private static class Echo<T> implements Function<T, T> {
static class Echo<T> implements Function<T, T> {

@Override
public T apply(T input) {
Expand All @@ -390,35 +368,4 @@ public T apply(T input) {

}

private static final class CacheLocalTransform<K, V, T>
implements DistributedCallable<K, V, T> {

private static GetCacheStore TRANSFORM = new GetCacheStore();

private BinaryStore binStore;
private BinaryKey key;
private Function<LowLevelCacheEntry, T> entryTransform;
private CacheStore store;

CacheLocalTransform(final BinaryStore store,
final BinaryKey key, final Function<LowLevelCacheEntry, T> entryTransform) {
this.key = key;
this.entryTransform = entryTransform;
this.binStore = store;
}

@Override
public T call() throws Exception {
LowLevelCacheEntry entry = new LowLevelCacheEntry(binStore, store, key);
return this.entryTransform.apply(entry);
}

@Override
public void setEnvironment(Cache<K, V> cache, Set<K> keys) {
this.store = TRANSFORM.apply(cache);

}

}

}
Expand Up @@ -23,6 +23,10 @@
import org.fcrepo.services.functions.CheckCacheEntryFixity;
import org.fcrepo.utils.FixityResult;
import org.fcrepo.utils.LowLevelCacheEntry;
import org.infinispan.Cache;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.DistributedExecutorService;
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore;

import com.google.common.base.Function;

Expand Down Expand Up @@ -105,6 +109,16 @@ public static Long getContentSize(final Node ds)

return size;
}

/**
* A static factory function to insulate services from the details of building
* a DistributedExecutorService
* @param cache
* @return
*/
public static DistributedExecutorService getClusterExecutor(InfinispanBinaryStore cacheStore) {
return new DefaultExecutorService(cacheStore.getCaches().get(0));
}

/**
* @todo Add Documentation.
Expand Down
@@ -0,0 +1,43 @@
package org.fcrepo.services.functions;

import java.util.Set;

import org.fcrepo.utils.LowLevelCacheEntry;
import org.infinispan.Cache;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.loaders.CacheStore;
import org.modeshape.jcr.value.BinaryKey;
import org.modeshape.jcr.value.binary.BinaryStore;

import com.google.common.base.Function;


public class CacheLocalTransform<K, V, T>
implements DistributedCallable<K, V, T> {

private static GetCacheStore TRANSFORM = new GetCacheStore();

private BinaryStore binStore;
private BinaryKey key;
private Function<LowLevelCacheEntry, T> entryTransform;
private CacheStore store;

public CacheLocalTransform(final BinaryStore store,
final BinaryKey key, final Function<LowLevelCacheEntry, T> entryTransform) {
this.key = key;
this.entryTransform = entryTransform;
this.binStore = store;
}

@Override
public T call() throws Exception {
LowLevelCacheEntry entry = new LowLevelCacheEntry(binStore, store, key);
return this.entryTransform.apply(entry);
}

@Override
public void setEnvironment(Cache<K, V> cache, Set<K> keys) {
this.store = TRANSFORM.apply(cache);

}
}
Expand Up @@ -7,19 +7,27 @@
package org.fcrepo.services;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.spy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.jcr.Node;
import javax.jcr.Property;
Expand All @@ -30,6 +38,7 @@
import org.fcrepo.services.functions.GetCacheStore;
import org.fcrepo.utils.LowLevelCacheEntry;
import org.infinispan.Cache;
import org.infinispan.distexec.DistributedExecutorService;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.junit.Test;
Expand Down Expand Up @@ -156,13 +165,17 @@ public void shouldRetrieveLowLevelCacheStoresForBinaryKey()
}

/**
* @throws Exception
* @todo Add Documentation.
*/
@Test
@SuppressWarnings("unchecked")
@Test
public void shouldRetrieveLowLevelCacheStoresForCompositeStore()
throws RepositoryException, CacheLoaderException {
throws Exception {

final Cache<?, ?> ispnCache1 = mock(Cache.class);
mockStatic(ServiceHelpers.class);

final Cache<?, ?> ispnCache1 = mock(Cache.class);
final Cache<?, ?> ispnCache2 = mock(Cache.class);
final CacheStore ispnCacheStore1 = mock(CacheStore.class);
final CacheStore ispnCacheStore2 = mock(CacheStore.class);
Expand Down Expand Up @@ -190,23 +203,47 @@ public void shouldRetrieveLowLevelCacheStoresForCompositeStore()
when(mockStore.getNamedStoreIterator()).thenReturn(
map.entrySet().iterator());

final LowLevelStorageService testObj = new LowLevelStorageService();
testObj.setGetCacheStore(mockCacheStoreFunc);
final DistributedExecutorService mockCluster =
mock(DistributedExecutorService.class);
when(ServiceHelpers.getClusterExecutor(infinispanBinaryStore))
.thenReturn(mockCluster);

final BinaryKey key = new BinaryKey("key-123");

LowLevelCacheEntry cacheEntry1 =
new LowLevelCacheEntry(infinispanBinaryStore, ispnCacheStore1,
key);
LowLevelCacheEntry cacheEntry2 =
new LowLevelCacheEntry(infinispanBinaryStore, ispnCacheStore2,
key);
Future<LowLevelCacheEntry> future1 = mock(Future.class);
Future<LowLevelCacheEntry> future2 = mock(Future.class);
when(future1.get(any(Long.class), eq(TimeUnit.MILLISECONDS)))
.thenReturn(cacheEntry1);
when(future2.get(any(Long.class), eq(TimeUnit.MILLISECONDS)))
.thenReturn(cacheEntry2);

List<Future<?>> mockClusterResults = new ArrayList<Future<?>>(2);
mockClusterResults.add(future1);
mockClusterResults.add(future2);

when(mockCluster.submitEverywhere(any(org.fcrepo.services.functions.CacheLocalTransform.class)))
.thenReturn(mockClusterResults);


final LowLevelStorageService testObj = new LowLevelStorageService();

when(plainBinaryStore.hasBinary(key)).thenReturn(true);
when(plainBinaryStore2.hasBinary(key)).thenReturn(false);
when(infinispanBinaryStore.hasBinary(key)).thenReturn(true);
when(ispnCacheStore1.containsKey("key-123-data-0")).thenReturn(true);
when(ispnCacheStore2.containsKey("key-123-data-0")).thenReturn(true);
final Set<LowLevelCacheEntry> entries =
testObj.getLowLevelCacheEntriesFromStore(mockStore, key);

assertEquals(3, entries.size());

assertTrue(entries.contains(new LowLevelCacheEntry(plainBinaryStore,
key)));
assertTrue(!entries.contains(new LowLevelCacheEntry(plainBinaryStore2,
assertFalse(entries.contains(new LowLevelCacheEntry(plainBinaryStore2,
key)));
assertTrue(entries.contains(new LowLevelCacheEntry(
infinispanBinaryStore, ispnCacheStore1, key)));
Expand Down

0 comments on commit 9d294df

Please sign in to comment.