Skip to content

Commit

Permalink
Manually porting many of @Cabeer's modeshape40 b/c kernel/kernel-impl…
Browse files Browse the repository at this point in the history
… reorg made rebasing modeshape40 branch very messy
  • Loading branch information
escowles committed Aug 6, 2014
1 parent ae12d03 commit 9915c50
Show file tree
Hide file tree
Showing 18 changed files with 397 additions and 802 deletions.
Expand Up @@ -41,9 +41,9 @@
import org.modeshape.connector.filesystem.FileSystemConnector;
import org.modeshape.jcr.api.value.DateTime;
import org.modeshape.jcr.api.nodetype.NodeTypeManager;
import org.modeshape.jcr.federation.spi.DocumentChanges;
import org.modeshape.jcr.federation.spi.DocumentReader;
import org.modeshape.jcr.federation.spi.DocumentWriter;
import org.modeshape.jcr.spi.federation.DocumentChanges;
import org.modeshape.jcr.spi.federation.DocumentReader;
import org.modeshape.jcr.spi.federation.DocumentWriter;
import org.modeshape.jcr.value.BinaryValue;
import org.modeshape.jcr.value.Name;
import org.modeshape.jcr.value.Property;
Expand Down
Expand Up @@ -24,8 +24,8 @@
import org.modeshape.jcr.ExecutionContext;
import org.modeshape.jcr.api.nodetype.NodeTypeManager;
import org.modeshape.jcr.cache.document.DocumentTranslator;
import org.modeshape.jcr.federation.spi.DocumentReader;
import org.modeshape.jcr.federation.spi.ExtraPropertiesStore;
import org.modeshape.jcr.spi.federation.DocumentReader;
import org.modeshape.jcr.spi.federation.ExtraPropertiesStore;
import org.modeshape.jcr.value.BinaryValue;
import org.modeshape.jcr.value.NameFactory;
import org.modeshape.jcr.value.Property;
Expand Down
5 changes: 5 additions & 0 deletions fcrepo-http-commons/pom.xml
Expand Up @@ -76,6 +76,11 @@
<groupId>org.modeshape</groupId>
<artifactId>modeshape-jcr-api</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions fcrepo-kernel-impl/pom.xml
Expand Up @@ -88,6 +88,14 @@
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-commons</artifactId>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-cachestore-leveldb</artifactId>
Expand Down

This file was deleted.

Expand Up @@ -17,17 +17,17 @@

import com.google.common.collect.ImmutableSet;
import org.apache.commons.io.IOUtils;
import org.fcrepo.kernel.utils.ContentDigest;
import org.fcrepo.kernel.impl.utils.FixityInputStream;
import org.fcrepo.kernel.utils.FixityResult;
import org.fcrepo.kernel.impl.utils.FixityResultImpl;
import org.fcrepo.kernel.impl.utils.infinispan.StoreChunkInputStream;
import org.fcrepo.kernel.impl.utils.infinispan.CacheLoaderChunkInputStream;
import org.fcrepo.kernel.utils.ContentDigest;
import org.fcrepo.kernel.utils.FixityResult;
import org.infinispan.Cache;
import org.infinispan.CacheImpl;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.decorators.ChainingCacheStore;

import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.CacheLoader;

import java.io.InputStream;
import java.io.Serializable;
Expand All @@ -46,28 +46,34 @@
public class DistributedFixityCheck implements DistributedCallable<String, byte[], Collection<FixityResult>>,
Serializable {
private final String dataKey;
private final int chunkSize;
private final long length;
private Cache<String, byte[]> cache;

/**
*
* @param dataKey
*/
public DistributedFixityCheck(final String dataKey) {
public DistributedFixityCheck(final String dataKey, final int chunkSize, final long length) {
this.dataKey = dataKey;
this.chunkSize = chunkSize;
this.length = length;
}

@Override
public Collection<FixityResult> call() throws Exception {
final ImmutableSet.Builder<FixityResult> fixityResults = new ImmutableSet.Builder<>();

for (final CacheStore store : stores()) {
for (final CacheLoader store : stores()) {

final String digest = ContentDigest.getAlgorithm(new URI("urn:sha1"));

FixityInputStream fixityInputStream;
final InputStream cacheLoaderChunkInputStream = new StoreChunkInputStream(store, dataKey);
final InputStream cacheLoaderChunkInputStream = new CacheLoaderChunkInputStream(
store, dataKey, chunkSize, length);

final FixityInputStream fixityInputStream = new FixityInputStream(
cacheLoaderChunkInputStream, MessageDigest.getInstance(digest));

fixityInputStream = new FixityInputStream(cacheLoaderChunkInputStream, MessageDigest.getInstance(digest));
IOUtils.copy(fixityInputStream, NULL_OUTPUT_STREAM);

final URI calculatedChecksum = ContentDigest.asURI(digest, fixityInputStream.getMessageDigest().digest());
Expand All @@ -79,7 +85,7 @@ public Collection<FixityResult> call() throws Exception {
return fixityResults.build();
}

private String getExternalIdentifier(final CacheStore store) {
private String getExternalIdentifier(final CacheLoader store) {
final String address;

if (cache.getCacheManager().getAddress() != null) {
Expand All @@ -96,13 +102,8 @@ public void setEnvironment(final Cache<String, byte[]> cache, final Set<String>
this.cache = cache;
}

private Set<CacheStore> stores() {
final CacheLoaderManager cacheLoaderManager
= ((CacheImpl) cache).getComponentRegistry().getLocalComponent(CacheLoaderManager.class);
if (cacheLoaderManager.getCacheLoader() instanceof ChainingCacheStore) {
return ((ChainingCacheStore)cacheLoaderManager.getCacheLoader()).getStores().keySet();
} else {
return ImmutableSet.of(cacheLoaderManager.getCacheStore());
}
private Set<CacheLoader> stores() {
return ((CacheImpl)cache).getComponentRegistry().getLocalComponent(PersistenceManager.class)
.getStores(CacheLoader.class);
}
}
Expand Up @@ -28,7 +28,9 @@
import org.fcrepo.kernel.utils.FixityResult;
import org.infinispan.distexec.DistributedExecutorService;
import org.modeshape.jcr.value.BinaryKey;
import org.modeshape.jcr.value.binary.infinispan.ChunkBinaryMetadata;
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore;
import org.modeshape.jcr.value.binary.infinispan.InfinispanUtils;
import org.slf4j.Logger;

import javax.jcr.Property;
Expand Down Expand Up @@ -65,9 +67,12 @@ public Collection<FixityResult> checkFixity(final URI checksum, final long size)
final ImmutableSet.Builder<FixityResult> fixityResults = new ImmutableSet.Builder<>();

if (store().hasBinary(key)) {
final String dataKey = dataKeyFor(key);
final String dataKey = InfinispanUtils.dataKeyFrom((InfinispanBinaryStore)store(), key);
final ChunkBinaryMetadata metadata = InfinispanUtils.getMetadata((InfinispanBinaryStore)store(), key);

final DistributedFixityCheck task = new DistributedFixityCheck(dataKey, metadata.getChunkSize(),
metadata.getLength());

final DistributedFixityCheck task = new DistributedFixityCheck(dataKey);
final List<Future<Collection<FixityResult>>> futures
= clusterExecutor().submitEverywhere(task, dataKey + "-0");

Expand Down

0 comments on commit 9915c50

Please sign in to comment.