Navigation Menu

Skip to content

Commit

Permalink
exclude ISPN cache stores that don't contain our key..
Browse files Browse the repository at this point in the history
  • Loading branch information
cbeer committed May 8, 2013
1 parent 87ab40a commit 8a49bd6
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 143 deletions.
Expand Up @@ -33,6 +33,7 @@

import org.fcrepo.Datastream;
import org.fcrepo.services.functions.GetBinaryKey;
import org.infinispan.loaders.CacheLoaderException;
import org.modeshape.jcr.GetBinaryStore;
import org.fcrepo.services.functions.GetCacheStore;
import org.fcrepo.services.functions.GetGoodFixityResults;
Expand Down Expand Up @@ -195,18 +196,34 @@ protected Set<LowLevelCacheEntry> getLowLevelCacheEntriesFromStore(final Infinis

final CacheStore cacheStore = getCacheStore.apply(c);

if (cacheStore == null) {
continue;
}

// A ChainingCacheStore indicates we (may) have multiple CacheStores at play
if (cacheStore instanceof ChainingCacheStore) {
final ChainingCacheStore chainingCacheStore =
(ChainingCacheStore) cacheStore;
// the stores are a map of the cache store and the configuration; i'm just throwing the configuration away..
for (final CacheStore s : chainingCacheStore.getStores()
.keySet()) {
blobs.add(new LowLevelCacheEntry(ispnStore, s, key));
try {
if (s.containsKey(key + "-data-0")) {
blobs.add(new LowLevelCacheEntry(ispnStore, s, key));
}
} catch (CacheLoaderException e) {
logger.warn("Cache loader raised exception: {}", e);
}
}
} else {
// just a nice, simple infinispan cache.
blobs.add(new LowLevelCacheEntry(ispnStore, cacheStore, key));
try {
if (cacheStore.containsKey(key + "-data-0")) {
blobs.add(new LowLevelCacheEntry(ispnStore, cacheStore, key));
}
} catch (CacheLoaderException e) {
logger.warn("Cache loader raised exception: {}", e);
}
}
}

Expand Down
309 changes: 181 additions & 128 deletions fcrepo-kernel/src/main/java/org/fcrepo/utils/LowLevelCacheEntry.java
Expand Up @@ -30,102 +30,135 @@
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore;
import org.slf4j.Logger;

/**
* Manage low-level I/O from a cache store (or, for an ISPN store, a cache loader) in order
* to report on e.g. fixity.
*/
public class LowLevelCacheEntry {

private static final Logger logger = getLogger(LowLevelCacheEntry.class);
private static final Logger logger = getLogger(LowLevelCacheEntry.class);

private static final String DATA_SUFFIX = "-data";
private static final String DATA_SUFFIX = "-data";

private final BinaryStore store;
private final BinaryStore store;

private final CacheStore cacheStore;
private final CacheStore cacheStore;

private String externalId;

private final BinaryKey key;
private final BinaryKey key;

public LowLevelCacheEntry(final BinaryStore store,
final CacheStore lowLevelStore, final BinaryKey key) {
this.store = store;
cacheStore = lowLevelStore;
this.key = key;
public LowLevelCacheEntry(final BinaryStore store,
final CacheStore lowLevelStore, final BinaryKey key) {
this.store = store;
cacheStore = lowLevelStore;
this.key = key;
this.externalId = "";
}
}

public LowLevelCacheEntry(final BinaryStore store, final BinaryKey key) {
this.store = store;
cacheStore = null;
this.key = key;
public LowLevelCacheEntry(final BinaryStore store, final BinaryKey key) {
this.store = store;
cacheStore = null;
this.key = key;
this.externalId = "";
}

@Override
public boolean equals(final Object other) {
if (other instanceof LowLevelCacheEntry) {
final LowLevelCacheEntry that = (LowLevelCacheEntry) other;

return key.equals(that.key) &&
store.equals(that.store) &&
(cacheStore == null && that.cacheStore == null || cacheStore != null &&
cacheStore.equals(that.cacheStore));
} else {
return false;
}
}

@Override
public int hashCode() {
return hash(store, cacheStore, key);
}

public InputStream getInputStream() throws BinaryStoreException {
if (store instanceof InfinispanBinaryStore) {
return new StoreChunkInputStream(cacheStore, key.toString() +
DATA_SUFFIX);
} else {
return store.getInputStream(key);
}
}

public void storeValue(final InputStream stream)
throws BinaryStoreException, IOException {
if (store instanceof InfinispanBinaryStore) {
final OutputStream outputStream =
new StoreChunkOutputStream(cacheStore, key.toString() +
DATA_SUFFIX);
IOUtils.copy(stream, outputStream);
outputStream.close();
} else {
// the BinaryStore will calculate a new key for us.
store.storeValue(stream);
}
}

public String getExternalIdentifier() {

if (store instanceof InfinispanBinaryStore) {
}

/**
* Two LowLevelCacheEntries are the same if they have the same key, come from the same BinaryStore,
* and have the same underlying store configuration
* @param other
* @return
*/
@Override
public boolean equals(final Object other) {
if (other instanceof LowLevelCacheEntry) {
final LowLevelCacheEntry that = (LowLevelCacheEntry) other;

return key.equals(that.key) &&
store.equals(that.store) &&
(cacheStore == null && that.cacheStore == null || cacheStore != null &&
cacheStore.equals(that.cacheStore));
} else {
return false;
}
}

@Override
public int hashCode() {
return hash(store, cacheStore, key);
}

/**
* Get a raw input stream from the underlying store
* @return the content for this entry
* @throws BinaryStoreException
*/
public InputStream getInputStream() throws BinaryStoreException {
if (store instanceof InfinispanBinaryStore) {
return new StoreChunkInputStream(cacheStore, key.toString() +
DATA_SUFFIX);
} else {
return store.getInputStream(key);
}
}

/**
* Send a raw input stream to the underlying store for this entry; used for
* fixing e.g. fixity failures.
*
* @param stream binary content to REPLACE the content in the store
* @throws BinaryStoreException
* @throws IOException
*/
public void storeValue(final InputStream stream)
throws BinaryStoreException, IOException {
// TODO: this is probably an auditable action.
logger.info("Doing a low-level write to store {} for key {}", getExternalIdentifier(), key);

if (store instanceof InfinispanBinaryStore) {
final OutputStream outputStream =
new StoreChunkOutputStream(cacheStore, key.toString() +
DATA_SUFFIX);
IOUtils.copy(stream, outputStream);
outputStream.close();
} else {
// the BinaryStore will calculate a new key for us.
store.storeValue(stream);
}
}

/**
* Generate a human-readable identifier for the location of this entry
*
* @return
*/
public String getExternalIdentifier() {

// TODO : I wonder if this could/should be a JSON blob or something machine parsable as well?

if (store instanceof InfinispanBinaryStore) {

final InfinispanBinaryStore ispnStore = (InfinispanBinaryStore)store;

final CacheStoreConfig config = cacheStore.getCacheStoreConfig();
final CacheStoreConfig config = cacheStore.getCacheStoreConfig();

String ispnExternalId = null;
if (config instanceof AbstractCacheStoreConfig) {
final Properties properties =
((AbstractCacheStoreConfig) config).getProperties();
if (properties.containsKey("id")) {
return properties.getProperty("id");
}
String ispnExternalId = null;
if (config instanceof AbstractCacheStoreConfig) {
final Properties properties =
((AbstractCacheStoreConfig) config).getProperties();
if (properties.containsKey("id")) {
return properties.getProperty("id");
}

}
}

if (config instanceof FileCacheStoreConfig) {
ispnExternalId = ((FileCacheStoreConfig) config).getLocation();
}
if (config instanceof FileCacheStoreConfig) {
ispnExternalId = ((FileCacheStoreConfig) config).getLocation();
}

if (ispnExternalId == null) {
if (ispnExternalId == null) {
ispnExternalId = config.toString();
}
}

String blobCacheName = "";
try {
Expand All @@ -140,62 +173,82 @@ public String getExternalIdentifier() {


return getExternalId() + "/" + store.getClass().getName() + ":" + blobCacheName + ":" +
config.getCacheLoaderClassName() +
":" + ispnExternalId;
config.getCacheLoaderClassName() +
":" + ispnExternalId;
} else if ( store instanceof FileSystemBinaryStore) {
final FileSystemBinaryStore fsStore = (FileSystemBinaryStore)store;
return getExternalId() + "/" + store.getClass().getName() + ":" + ((FileSystemBinaryStore) store).getDirectory().toPath();
} else {
return getExternalId() + "/" + store.toString();
}
}

public FixityResult checkFixity(final URI checksum, final long size,
final MessageDigest digest) throws BinaryStoreException {
FixityResult result = null;
FixityInputStream ds = null;
try {
ds =
new FixityInputStream(getInputStream(),
(MessageDigest) digest.clone());

result = new FixityResult(this);

while (ds.read() != -1) {
// noop; we're just reading the stream for the checksum and size
}

result.computedChecksum =
ContentDigest.asURI(digest.getAlgorithm(), ds
.getMessageDigest().digest());
result.computedSize = ds.getByteCount();
result.dsChecksum = checksum;
result.dsSize = size;

if (!result.computedChecksum.equals(result.dsChecksum)) {
result.status.add(BAD_CHECKSUM);
}

if (result.dsSize != result.computedSize) {
result.status.add(BAD_SIZE);
}

if (result.status.isEmpty()) {
result.status.add(SUCCESS);
}

logger.debug("Got " + result.toString());
ds.close();
} catch (final CloneNotSupportedException e) {
logger.warn("Could not clone MessageDigest: {}", e);
throw propagate(e);
} catch (final IOException e) {
throw propagate(e);
}

return result;
}
} else {
return getExternalId() + "/" + store.toString();
}
}

/**
* Check the entry's InputStream against the checksum and size.
*
* @param checksum
* @param size
* @param digest
* @return
* @throws BinaryStoreException
*/
public FixityResult checkFixity(final URI checksum, final long size,
final MessageDigest digest) throws BinaryStoreException {
final FixityInputStream ds;

try {
ds = new FixityInputStream(getInputStream(), (MessageDigest) digest.clone());
} catch (CloneNotSupportedException e) {
logger.warn("Could not clone MessageDigest: {}", e);
throw propagate(e);
}

try {
final FixityResult result = new FixityResult(this);

while (ds.read() != -1) {
// noop; we're just reading the stream for the checksum and size
}

result.computedChecksum = ContentDigest.asURI(digest.getAlgorithm(), ds
.getMessageDigest().digest());
result.computedSize = ds.getByteCount();
result.dsChecksum = checksum;
result.dsSize = size;

if (!result.computedChecksum.equals(result.dsChecksum)) {
result.status.add(BAD_CHECKSUM);
}

if (result.dsSize != result.computedSize) {
result.status.add(BAD_SIZE);
}

if (result.status.isEmpty()) {
result.status.add(SUCCESS);
}

logger.debug("Got " + result.toString());

return result;
} catch (final IOException e) {
throw propagate(e);
} finally {
try {
ds.close();
} catch (IOException e) {
logger.debug("Got error closing input stream: {}", e);
}
}

}

/**
* A meaningful identifier at some higher level that we should
* dutifully pass through.
*
* @param externalId some identifier for the cache store
*/
public void setExternalId(String externalId) {
this.externalId = externalId;
}
Expand Down

0 comments on commit 8a49bd6

Please sign in to comment.