Skip to content

Commit

Permalink
Merge pull request #35 from futures/ispn-lowlevel-fixity-check
Browse files Browse the repository at this point in the history
low-level retrieval of jcr content from the low-level binary stores
  • Loading branch information
barmintor committed Mar 12, 2013
2 parents 9c3568b + 3aa1602 commit 316432a
Show file tree
Hide file tree
Showing 6 changed files with 445 additions and 0 deletions.
186 changes: 186 additions & 0 deletions fcrepo-kernel/src/main/java/org/fcrepo/services/RepositoryService.java
@@ -0,0 +1,186 @@
package org.fcrepo.services;

import org.fcrepo.utils.BinaryCacheStore;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheStoreConfiguration;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.decorators.ChainingCacheStore;
import org.modeshape.jcr.JcrRepository;
import org.modeshape.jcr.RepositoryConfiguration;
import org.modeshape.jcr.cache.NodeKey;
import org.modeshape.jcr.value.BinaryKey;
import org.modeshape.jcr.value.BinaryValue;
import org.modeshape.jcr.value.binary.AbstractBinaryStore;
import org.modeshape.jcr.value.binary.BinaryStore;
import org.modeshape.jcr.value.binary.BinaryStoreException;
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore;

import static org.modeshape.jcr.api.JcrConstants.JCR_CONTENT;
import static org.modeshape.jcr.api.JcrConstants.JCR_DATA;

import org.fcrepo.utils.infinispan.StoreChunkInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.jcr.Node;
import javax.jcr.PathNotFoundException;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;

public class RepositoryService {


private static final Logger logger = LoggerFactory
.getLogger(RepositoryService.class);


@Inject
private Repository repo;

/**
* For use with non-mutating methods.
*/
private static Session readOnlySession;

private static List<BinaryCacheStore> cacheStores;

private static JcrRepository getRepositoryInstance() {
return (JcrRepository)readOnlySession.getRepository();
}

/**
*
* @param resource a JCR node that has a jcr:content/jcr:data child.
* @return a map of binary stores and input streams
* @throws RepositoryException
*/
public static HashMap<BinaryCacheStore, InputStream> getContentBlobs(Node resource) throws RepositoryException {

BinaryValue v = (BinaryValue) resource.getNode(JCR_CONTENT).getProperty(JCR_DATA).getBinary();

return getBlobs(v.getKey());

}

/**
*
* @param key a Modeshape BinaryValue's key.
* @return a map of binary stores and input streams
*/
public static HashMap<BinaryCacheStore, InputStream> getBlobs(BinaryKey key) {

HashMap<BinaryCacheStore, InputStream> blobs = new LinkedHashMap<BinaryCacheStore, InputStream>();

for( BinaryCacheStore c : getLowLevelCacheStores()) {
try {
blobs.put(c, c.getInputStream(key));
} catch (BinaryStoreException e) {
e.printStackTrace(); //uh oh, we didn't find anything!
blobs.put(c, null);
}
}

return blobs;
}

/**
* Extract the BinaryStore out of Modeshape (infinspan, jdbc, file, transient, etc)
* @return
*/
private static BinaryStore getBinaryStore() {
try {

JcrRepository jcrRepository = getRepositoryInstance();

return jcrRepository.getConfiguration().getBinaryStorage().getBinaryStore();

} catch (Exception e) { // boo, catching all exceptions. unfortunately, that's all getBinaryStore promises..
e.printStackTrace();
return null;
}

}

/**
* Get the list of low-level cache stores at play. If it's an infinispan node, for instance, figure out exactly
* which cachestores are being used.
*
* @return a list of "BinaryCacheStore", an abstraction over a plain BinaryStore or a specific Infinispan Cache
*/
private static List<BinaryCacheStore> getLowLevelCacheStores() {
// I'm assuming the list of stores doesn't change.. probably not a safe assumption
if(cacheStores != null) {
return cacheStores;
}

List<BinaryCacheStore> stores = new ArrayList<>();

BinaryStore store = getBinaryStore();

if(store == null) {
return stores;
}

// if we have an Infinispan store, it may have multiple stores (or cluster nodes)
if(store instanceof InfinispanBinaryStore) {
InfinispanBinaryStore ispnStore = (InfinispanBinaryStore)store;

//seems like we have to start it, not sure why.
ispnStore.start();

for(Cache c : ispnStore.getCaches()) {

final CacheStore cacheStore = c.getAdvancedCache().getComponentRegistry().getComponent(CacheLoaderManager.class).getCacheStore();

// A ChainingCacheStore indicates we (may) have multiple CacheStores at play
if(cacheStore instanceof ChainingCacheStore) {
final ChainingCacheStore chainingCacheStore = (ChainingCacheStore) cacheStore;

// the stores are a map of the cache store and the configuration; i'm just throwing the configuration away..
for( CacheStore s : chainingCacheStore.getStores().keySet()) {
stores.add(new BinaryCacheStore(store, s));
}

} else {
// just a nice, simple infinispan cache.
stores.add(new BinaryCacheStore(store, cacheStore));
}

}
} else {
stores.add(new BinaryCacheStore(store));
}

cacheStores = stores;

return stores;
}

@PostConstruct
public final void getSession() {
try {
readOnlySession = repo.login();
} catch (RepositoryException e) {
throw new IllegalStateException(e);
}
}

@PreDestroy
public final void logoutSession() {
readOnlySession.logout();
}

}
36 changes: 36 additions & 0 deletions fcrepo-kernel/src/main/java/org/fcrepo/utils/BinaryCacheStore.java
@@ -0,0 +1,36 @@
package org.fcrepo.utils;

import org.fcrepo.utils.infinispan.StoreChunkInputStream;
import org.infinispan.loaders.CacheStore;
import org.modeshape.jcr.value.BinaryKey;
import org.modeshape.jcr.value.binary.BinaryStore;
import org.modeshape.jcr.value.binary.BinaryStoreException;
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore;

import java.io.InputStream;

public class BinaryCacheStore {

private static final String DATA_SUFFIX = "-data";
private final BinaryStore store;
private final CacheStore low_level_store;

public BinaryCacheStore(BinaryStore store, CacheStore low_level_store) {
this.store = store;
this.low_level_store = low_level_store;
}

public BinaryCacheStore(BinaryStore store) {
this.store = store;
this.low_level_store = null;
}


public InputStream getInputStream(BinaryKey key) throws BinaryStoreException {
if(this.store instanceof InfinispanBinaryStore) {
return new StoreChunkInputStream(low_level_store, key.toString() + DATA_SUFFIX);
} else {
return this.store.getInputStream(key);
}
}
}
@@ -0,0 +1,151 @@
/*
* ModeShape (http://www.modeshape.org)
* See the COPYRIGHT.txt file distributed with this work for information
* regarding copyright ownership. Some portions may be licensed
* to Red Hat, Inc. under one or more contributor license agreements.
* See the AUTHORS.txt file in the distribution for a full listing of
* individual contributors.
*
* ModeShape is free software. Unless otherwise indicated, all code in ModeShape
* is licensed to you under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* ModeShape is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.fcrepo.utils.infinispan;

import org.infinispan.Cache;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.marshall.MarshalledValue;
import org.modeshape.common.logging.Logger;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;

/**
* Merges chunks from cache and provides InputStream-feeling.
*/

public class StoreChunkInputStream extends InputStream {

private final Logger logger;
private final CacheStore blobCache;
private final String key;

protected int indexInBuffer;
protected byte[] buffer;
private int chunkNumber;


public StoreChunkInputStream(CacheStore blobCache, String key){
logger = Logger.getLogger(getClass());
this.blobCache = blobCache;
this.key = key;
}

@Override
public int read() throws IOException {
if(indexInBuffer == -1){
return -1;
}
if(buffer == null || indexInBuffer >= buffer.length){
fillBuffer();
return read();
}
return buffer[indexInBuffer++] & 0xff;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if(indexInBuffer == -1){
return -1;
}
if(buffer == null){
fillBuffer();
return read(b, off, len);
}
if(indexInBuffer >= buffer.length){
return -1;
}
if (indexInBuffer + len > buffer.length){
len = buffer.length - indexInBuffer;
}
System.arraycopy(buffer, indexInBuffer, b, off, len);
indexInBuffer += len;
if(indexInBuffer >= buffer.length){
fillBuffer();
}
return len;
}

@Override
public int available() throws IOException {
if (buffer == null) {
fillBuffer();
}
return buffer.length - indexInBuffer;
}

@Override
public final long skip(long n) throws IOException {
if(n <= 0 || indexInBuffer == -1){
return 0;
}
if(buffer == null){
fillBuffer();
return skip(n);
}
if (buffer.length + n > indexInBuffer){
n = buffer.length - indexInBuffer;
}
if (n < 0){
return 0;
}
indexInBuffer += n;
return n;
}

private void fillBuffer() throws IOException {

buffer = nextChunk();
if(buffer == null){
buffer = new byte[0];
indexInBuffer = -1;
} else {
indexInBuffer = 0;
}
}

protected byte[] nextChunk() throws IOException {
String chunkKey = key+"-"+chunkNumber++;
logger.debug("Read chunk {0}", chunkKey);

byte[] bytes;

try {
final CacheEntry cacheEntry = blobCache.load(chunkKey);

if(cacheEntry == null ) {
return null;
}

return (byte[]) cacheEntry.getValue();
} catch (CacheLoaderException e) {
throw new IOException(e.toString());
}
}
}

0 comments on commit 316432a

Please sign in to comment.