Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #35 from futures/ispn-lowlevel-fixity-check
low-level retrieval of jcr content from the low-level binary stores
- Loading branch information
Showing
6 changed files
with
445 additions
and
0 deletions.
There are no files selected for viewing
186 changes: 186 additions & 0 deletions
186
fcrepo-kernel/src/main/java/org/fcrepo/services/RepositoryService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
package org.fcrepo.services; | ||
|
||
import org.fcrepo.utils.BinaryCacheStore; | ||
import org.infinispan.Cache; | ||
import org.infinispan.configuration.cache.CacheStoreConfiguration; | ||
import org.infinispan.loaders.CacheLoaderManager; | ||
import org.infinispan.loaders.CacheStore; | ||
import org.infinispan.loaders.decorators.ChainingCacheStore; | ||
import org.modeshape.jcr.JcrRepository; | ||
import org.modeshape.jcr.RepositoryConfiguration; | ||
import org.modeshape.jcr.cache.NodeKey; | ||
import org.modeshape.jcr.value.BinaryKey; | ||
import org.modeshape.jcr.value.BinaryValue; | ||
import org.modeshape.jcr.value.binary.AbstractBinaryStore; | ||
import org.modeshape.jcr.value.binary.BinaryStore; | ||
import org.modeshape.jcr.value.binary.BinaryStoreException; | ||
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore; | ||
|
||
import static org.modeshape.jcr.api.JcrConstants.JCR_CONTENT; | ||
import static org.modeshape.jcr.api.JcrConstants.JCR_DATA; | ||
|
||
import org.fcrepo.utils.infinispan.StoreChunkInputStream; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import javax.annotation.PostConstruct; | ||
import javax.annotation.PreDestroy; | ||
import javax.inject.Inject; | ||
import javax.jcr.Node; | ||
import javax.jcr.PathNotFoundException; | ||
import javax.jcr.Repository; | ||
import javax.jcr.RepositoryException; | ||
import javax.jcr.Session; | ||
import java.io.InputStream; | ||
import java.lang.reflect.Method; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
|
||
public class RepositoryService { | ||
|
||
|
||
private static final Logger logger = LoggerFactory | ||
.getLogger(RepositoryService.class); | ||
|
||
|
||
@Inject | ||
private Repository repo; | ||
|
||
/** | ||
* For use with non-mutating methods. | ||
*/ | ||
private static Session readOnlySession; | ||
|
||
private static List<BinaryCacheStore> cacheStores; | ||
|
||
private static JcrRepository getRepositoryInstance() { | ||
return (JcrRepository)readOnlySession.getRepository(); | ||
} | ||
|
||
/** | ||
* | ||
* @param resource a JCR node that has a jcr:content/jcr:data child. | ||
* @return a map of binary stores and input streams | ||
* @throws RepositoryException | ||
*/ | ||
public static HashMap<BinaryCacheStore, InputStream> getContentBlobs(Node resource) throws RepositoryException { | ||
|
||
BinaryValue v = (BinaryValue) resource.getNode(JCR_CONTENT).getProperty(JCR_DATA).getBinary(); | ||
|
||
return getBlobs(v.getKey()); | ||
|
||
} | ||
|
||
/** | ||
* | ||
* @param key a Modeshape BinaryValue's key. | ||
* @return a map of binary stores and input streams | ||
*/ | ||
public static HashMap<BinaryCacheStore, InputStream> getBlobs(BinaryKey key) { | ||
|
||
HashMap<BinaryCacheStore, InputStream> blobs = new LinkedHashMap<BinaryCacheStore, InputStream>(); | ||
|
||
for( BinaryCacheStore c : getLowLevelCacheStores()) { | ||
try { | ||
blobs.put(c, c.getInputStream(key)); | ||
} catch (BinaryStoreException e) { | ||
e.printStackTrace(); //uh oh, we didn't find anything! | ||
blobs.put(c, null); | ||
} | ||
} | ||
|
||
return blobs; | ||
} | ||
|
||
/** | ||
* Extract the BinaryStore out of Modeshape (infinspan, jdbc, file, transient, etc) | ||
* @return | ||
*/ | ||
private static BinaryStore getBinaryStore() { | ||
try { | ||
|
||
JcrRepository jcrRepository = getRepositoryInstance(); | ||
|
||
return jcrRepository.getConfiguration().getBinaryStorage().getBinaryStore(); | ||
|
||
} catch (Exception e) { // boo, catching all exceptions. unfortunately, that's all getBinaryStore promises.. | ||
e.printStackTrace(); | ||
return null; | ||
} | ||
|
||
} | ||
|
||
/** | ||
* Get the list of low-level cache stores at play. If it's an infinispan node, for instance, figure out exactly | ||
* which cachestores are being used. | ||
* | ||
* @return a list of "BinaryCacheStore", an abstraction over a plain BinaryStore or a specific Infinispan Cache | ||
*/ | ||
private static List<BinaryCacheStore> getLowLevelCacheStores() { | ||
// I'm assuming the list of stores doesn't change.. probably not a safe assumption | ||
if(cacheStores != null) { | ||
return cacheStores; | ||
} | ||
|
||
List<BinaryCacheStore> stores = new ArrayList<>(); | ||
|
||
BinaryStore store = getBinaryStore(); | ||
|
||
if(store == null) { | ||
return stores; | ||
} | ||
|
||
// if we have an Infinispan store, it may have multiple stores (or cluster nodes) | ||
if(store instanceof InfinispanBinaryStore) { | ||
InfinispanBinaryStore ispnStore = (InfinispanBinaryStore)store; | ||
|
||
//seems like we have to start it, not sure why. | ||
ispnStore.start(); | ||
|
||
for(Cache c : ispnStore.getCaches()) { | ||
|
||
final CacheStore cacheStore = c.getAdvancedCache().getComponentRegistry().getComponent(CacheLoaderManager.class).getCacheStore(); | ||
|
||
// A ChainingCacheStore indicates we (may) have multiple CacheStores at play | ||
if(cacheStore instanceof ChainingCacheStore) { | ||
final ChainingCacheStore chainingCacheStore = (ChainingCacheStore) cacheStore; | ||
|
||
// the stores are a map of the cache store and the configuration; i'm just throwing the configuration away.. | ||
for( CacheStore s : chainingCacheStore.getStores().keySet()) { | ||
stores.add(new BinaryCacheStore(store, s)); | ||
} | ||
|
||
} else { | ||
// just a nice, simple infinispan cache. | ||
stores.add(new BinaryCacheStore(store, cacheStore)); | ||
} | ||
|
||
} | ||
} else { | ||
stores.add(new BinaryCacheStore(store)); | ||
} | ||
|
||
cacheStores = stores; | ||
|
||
return stores; | ||
} | ||
|
||
@PostConstruct | ||
public final void getSession() { | ||
try { | ||
readOnlySession = repo.login(); | ||
} catch (RepositoryException e) { | ||
throw new IllegalStateException(e); | ||
} | ||
} | ||
|
||
@PreDestroy | ||
public final void logoutSession() { | ||
readOnlySession.logout(); | ||
} | ||
|
||
} |
36 changes: 36 additions & 0 deletions
36
fcrepo-kernel/src/main/java/org/fcrepo/utils/BinaryCacheStore.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package org.fcrepo.utils; | ||
|
||
import org.fcrepo.utils.infinispan.StoreChunkInputStream; | ||
import org.infinispan.loaders.CacheStore; | ||
import org.modeshape.jcr.value.BinaryKey; | ||
import org.modeshape.jcr.value.binary.BinaryStore; | ||
import org.modeshape.jcr.value.binary.BinaryStoreException; | ||
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore; | ||
|
||
import java.io.InputStream; | ||
|
||
public class BinaryCacheStore { | ||
|
||
private static final String DATA_SUFFIX = "-data"; | ||
private final BinaryStore store; | ||
private final CacheStore low_level_store; | ||
|
||
public BinaryCacheStore(BinaryStore store, CacheStore low_level_store) { | ||
this.store = store; | ||
this.low_level_store = low_level_store; | ||
} | ||
|
||
public BinaryCacheStore(BinaryStore store) { | ||
this.store = store; | ||
this.low_level_store = null; | ||
} | ||
|
||
|
||
public InputStream getInputStream(BinaryKey key) throws BinaryStoreException { | ||
if(this.store instanceof InfinispanBinaryStore) { | ||
return new StoreChunkInputStream(low_level_store, key.toString() + DATA_SUFFIX); | ||
} else { | ||
return this.store.getInputStream(key); | ||
} | ||
} | ||
} |
151 changes: 151 additions & 0 deletions
151
fcrepo-kernel/src/main/java/org/fcrepo/utils/infinispan/StoreChunkInputStream.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* ModeShape (http://www.modeshape.org) | ||
* See the COPYRIGHT.txt file distributed with this work for information | ||
* regarding copyright ownership. Some portions may be licensed | ||
* to Red Hat, Inc. under one or more contributor license agreements. | ||
* See the AUTHORS.txt file in the distribution for a full listing of | ||
* individual contributors. | ||
* | ||
* ModeShape is free software. Unless otherwise indicated, all code in ModeShape | ||
* is licensed to you under the terms of the GNU Lesser General Public License as | ||
* published by the Free Software Foundation; either version 2.1 of | ||
* the License, or (at your option) any later version. | ||
* | ||
* ModeShape is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
* Lesser General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Lesser General Public | ||
* License along with this software; if not, write to the Free | ||
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA | ||
* 02110-1301 USA, or see the FSF site: http://www.fsf.org. | ||
*/ | ||
package org.fcrepo.utils.infinispan; | ||
|
||
import org.infinispan.Cache; | ||
import org.infinispan.container.entries.CacheEntry; | ||
import org.infinispan.container.entries.InternalCacheEntry; | ||
import org.infinispan.loaders.CacheLoaderException; | ||
import org.infinispan.loaders.CacheStore; | ||
import org.infinispan.marshall.MarshalledValue; | ||
import org.modeshape.common.logging.Logger; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.ObjectOutputStream; | ||
|
||
/** | ||
* Merges chunks from cache and provides InputStream-feeling. | ||
*/ | ||
|
||
public class StoreChunkInputStream extends InputStream { | ||
|
||
private final Logger logger; | ||
private final CacheStore blobCache; | ||
private final String key; | ||
|
||
protected int indexInBuffer; | ||
protected byte[] buffer; | ||
private int chunkNumber; | ||
|
||
|
||
public StoreChunkInputStream(CacheStore blobCache, String key){ | ||
logger = Logger.getLogger(getClass()); | ||
this.blobCache = blobCache; | ||
this.key = key; | ||
} | ||
|
||
@Override | ||
public int read() throws IOException { | ||
if(indexInBuffer == -1){ | ||
return -1; | ||
} | ||
if(buffer == null || indexInBuffer >= buffer.length){ | ||
fillBuffer(); | ||
return read(); | ||
} | ||
return buffer[indexInBuffer++] & 0xff; | ||
} | ||
|
||
@Override | ||
public int read(byte[] b, int off, int len) throws IOException { | ||
if(indexInBuffer == -1){ | ||
return -1; | ||
} | ||
if(buffer == null){ | ||
fillBuffer(); | ||
return read(b, off, len); | ||
} | ||
if(indexInBuffer >= buffer.length){ | ||
return -1; | ||
} | ||
if (indexInBuffer + len > buffer.length){ | ||
len = buffer.length - indexInBuffer; | ||
} | ||
System.arraycopy(buffer, indexInBuffer, b, off, len); | ||
indexInBuffer += len; | ||
if(indexInBuffer >= buffer.length){ | ||
fillBuffer(); | ||
} | ||
return len; | ||
} | ||
|
||
@Override | ||
public int available() throws IOException { | ||
if (buffer == null) { | ||
fillBuffer(); | ||
} | ||
return buffer.length - indexInBuffer; | ||
} | ||
|
||
@Override | ||
public final long skip(long n) throws IOException { | ||
if(n <= 0 || indexInBuffer == -1){ | ||
return 0; | ||
} | ||
if(buffer == null){ | ||
fillBuffer(); | ||
return skip(n); | ||
} | ||
if (buffer.length + n > indexInBuffer){ | ||
n = buffer.length - indexInBuffer; | ||
} | ||
if (n < 0){ | ||
return 0; | ||
} | ||
indexInBuffer += n; | ||
return n; | ||
} | ||
|
||
private void fillBuffer() throws IOException { | ||
|
||
buffer = nextChunk(); | ||
if(buffer == null){ | ||
buffer = new byte[0]; | ||
indexInBuffer = -1; | ||
} else { | ||
indexInBuffer = 0; | ||
} | ||
} | ||
|
||
protected byte[] nextChunk() throws IOException { | ||
String chunkKey = key+"-"+chunkNumber++; | ||
logger.debug("Read chunk {0}", chunkKey); | ||
|
||
byte[] bytes; | ||
|
||
try { | ||
final CacheEntry cacheEntry = blobCache.load(chunkKey); | ||
|
||
if(cacheEntry == null ) { | ||
return null; | ||
} | ||
|
||
return (byte[]) cacheEntry.getValue(); | ||
} catch (CacheLoaderException e) { | ||
throw new IOException(e.toString()); | ||
} | ||
} | ||
} |
Oops, something went wrong.