Skip to content

Commit

Permalink
Update DatastreamService#getFixity to support InMemory and External
Browse files Browse the repository at this point in the history
Binary values

Previously, getFixity only supported binary values that were persisted
into a CacheStore, ignoring projected and in-memory values. This patch
adds support for InMemory and External values by running their
InputStream through our fixity checks (and assumes these values can't be
repaired, as some configurations of cache stores can).
  • Loading branch information
fasseg authored and cbeer committed Dec 13, 2013
1 parent 65301c7 commit 9136033
Show file tree
Hide file tree
Showing 14 changed files with 459 additions and 107 deletions.
Expand Up @@ -23,6 +23,7 @@
import static org.fcrepo.kernel.services.ServiceHelpers.getCheckCacheFixityFunction;
import static org.fcrepo.metrics.RegistryService.getMetrics;
import static org.modeshape.jcr.api.JcrConstants.JCR_CONTENT;
import static org.modeshape.jcr.api.JcrConstants.JCR_DATA;
import static org.slf4j.LoggerFactory.getLogger;

import java.io.IOException;
Expand All @@ -31,6 +32,7 @@
import java.util.Collection;
import java.util.Set;

import javax.jcr.Binary;
import javax.jcr.Node;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
Expand All @@ -40,9 +42,14 @@
import org.fcrepo.kernel.rdf.GraphSubjects;
import org.fcrepo.kernel.rdf.JcrRdfTools;
import org.fcrepo.kernel.services.policy.StoragePolicyDecisionPoint;
import org.fcrepo.kernel.utils.BinaryCacheEntry;
import org.fcrepo.kernel.utils.CacheEntry;
import org.fcrepo.kernel.utils.FixityResult;
import org.fcrepo.kernel.utils.LowLevelCacheEntry;
import org.fcrepo.kernel.utils.ProjectedCacheEntry;
import org.fcrepo.kernel.utils.iterators.RdfStream;
import org.modeshape.jcr.value.binary.ExternalBinaryValue;
import org.modeshape.jcr.value.binary.InMemoryBinaryValue;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -51,6 +58,7 @@
import com.codahale.metrics.Timer;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;

/**
* Service for creating and retrieving Datastreams without using the JCR API.
Expand Down Expand Up @@ -187,9 +195,7 @@ public Datastream asDatastream(final Node node) throws RepositoryException {
*/
public RdfStream getFixityResultsModel(final GraphSubjects subjects,
final Datastream datastream) throws RepositoryException {

final Collection<FixityResult> blobs =
runFixityAndFixProblems(datastream);
final Collection<FixityResult> blobs = runFixityAndFixProblems(datastream);

return JcrRdfTools.withContext(subjects,
datastream.getNode().getSession()).getJcrTriples(
Expand Down Expand Up @@ -242,16 +248,18 @@ public boolean apply(
return fixityResults;
}

final LowLevelCacheEntry anyGoodCacheEntry =
final CacheEntry anyGoodCacheEntry =
goodEntries.iterator().next().getEntry();

final Set<FixityResult> badEntries =
difference(fixityResults, goodEntries);

for (final FixityResult result : badEntries) {
try {
result.getEntry()
.storeValue(anyGoodCacheEntry.getInputStream());
// we can safely cast to a LowLevelCacheEntry here, since
// other entries have to be filtered out before
LowLevelCacheEntry lle = (LowLevelCacheEntry) result.getEntry();
lle.storeValue(anyGoodCacheEntry.getInputStream());
final FixityResult newResult =
result.getEntry().checkFixity(digestUri, size);
if (newResult.isSuccess()) {
Expand Down Expand Up @@ -281,10 +289,23 @@ public boolean apply(
public Collection<FixityResult> getFixity(final Node resource,
final URI dsChecksum, final long dsSize) throws RepositoryException {
logger.debug("Checking resource: " + resource.getPath());
final Function<LowLevelCacheEntry, FixityResult> checkCacheFunc =

final Binary bin = resource.getProperty(JCR_DATA).getBinary();

if (bin instanceof ExternalBinaryValue) {
return ImmutableSet.of(new ProjectedCacheEntry(bin, resource.getPath())
.checkFixity(dsChecksum, dsSize));

} else if (bin instanceof InMemoryBinaryValue) {
return ImmutableSet.of(new BinaryCacheEntry(bin, resource.getPath())
.checkFixity(dsChecksum, dsSize));
} else {
final Function<LowLevelCacheEntry, FixityResult> checkCacheFunc =
getCheckCacheFixityFunction(dsChecksum, dsSize);
return llStoreService.transformLowLevelCacheEntries(resource,
checkCacheFunc);
return llStoreService.transformLowLevelCacheEntries(resource,
checkCacheFunc);
}

}

/**
Expand Down
Expand Up @@ -23,11 +23,12 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import javax.jcr.RepositoryException;

import org.fcrepo.kernel.services.LowLevelStorageService;
import org.fcrepo.kernel.utils.ContentDigest;
import org.fcrepo.kernel.utils.FixityResult;
import org.fcrepo.kernel.utils.LowLevelCacheEntry;
import org.modeshape.jcr.value.binary.BinaryStoreException;
import org.slf4j.Logger;

import com.google.common.base.Function;
Expand Down Expand Up @@ -71,7 +72,7 @@ public FixityResult apply(final LowLevelCacheEntry input) {
FixityResult result = null;
try {
result = input.checkFixity(dsChecksum, dsSize);
} catch (final BinaryStoreException e) {
} catch (final RepositoryException e) {
logger.error("Exception checking low-level fixity: {}", e);
throw propagate(e);
}
Expand Down
@@ -0,0 +1,105 @@
/**
* Copyright 2013 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.utils;

import org.slf4j.Logger;

import javax.jcr.RepositoryException;
import java.io.IOException;
import java.net.URI;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import static com.google.common.base.Throwables.propagate;
import static org.fcrepo.kernel.utils.FixityResult.FixityState.BAD_CHECKSUM;
import static org.fcrepo.kernel.utils.FixityResult.FixityState.BAD_SIZE;
import static org.fcrepo.kernel.utils.FixityResult.FixityState.MISSING_STORED_FIXITY;
import static org.fcrepo.kernel.utils.FixityResult.FixityState.SUCCESS;
import static org.slf4j.LoggerFactory.getLogger;

abstract public class BasicCacheEntry implements CacheEntry {

private static final Logger LOGGER = getLogger(BasicCacheEntry.class);

/**
* Calculate the fixity of a CacheEntry by piping it through
* a simple fixity-calculating InputStream
*
* @param checksum the checksum previously generated for the entry
* @param size the size of the entry
* @return
* @throws RepositoryException
*/
@Override
public FixityResult checkFixity(final URI checksum, final long size)
throws RepositoryException {

final FixityInputStream fixityInputStream;

final String digest = ContentDigest.getAlgorithm(checksum);
try {
fixityInputStream = new FixityInputStream(this.getInputStream(),
MessageDigest.getInstance(digest));
} catch (NoSuchAlgorithmException e) {
LOGGER.warn("Could not create MessageDigest: {}", e);
throw propagate(e);
}

try {

while (fixityInputStream.read() != -1) {
// noop; we're just reading the stream for the checksum and size
}

final URI calculatedChecksum = ContentDigest.asURI(digest,
fixityInputStream.getMessageDigest().digest());
final FixityResult result =
new FixityResult(this,
fixityInputStream.getByteCount(),
calculatedChecksum);

if (checksum.equals(ContentDigest.missingChecksum()) || size == -1L) {
result.status.add(MISSING_STORED_FIXITY);
}

if (!result.matches(checksum)) {
result.status.add(BAD_CHECKSUM);
}

if (!result.matches(size)) {
result.status.add(BAD_SIZE);
}

if (result.matches(size, checksum)) {
result.status.add(SUCCESS);
}

LOGGER.debug("Got {}", result.toString());

return result;
} catch (final IOException e) {
throw propagate(e);
} finally {
try {
fixityInputStream.close();
} catch (IOException e) {
LOGGER.debug("Got error closing input stream: {}", e);
}
}

}
}
@@ -0,0 +1,62 @@
/**
* Copyright 2013 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.utils;

import java.io.InputStream;

import javax.jcr.Binary;
import javax.jcr.RepositoryException;

/**
* A {@link CacheEntry} for simple Binary objects
* @author frank asseg
*
*/
public class BinaryCacheEntry extends BasicCacheEntry {

protected final Binary binary;
protected final String externalUri;

/**
* Create a new BinaryCacheEntry
* @param binary
*/
public BinaryCacheEntry(final Binary binary, final String externalUri) {
super();
this.binary = binary;
this.externalUri = externalUri;
}

/*
* (non-Javadoc)
* @see org.fcrepo.kernel.utils.CacheEntry#getInputStream()
*/
@Override
public InputStream getInputStream() throws RepositoryException {
return this.binary.getStream();
}

/*
* (non-Javadoc)
* @see org.fcrepo.kernel.utils.CacheEntry#getExternalIdentifier()
*/
@Override
public String getExternalIdentifier() {
return this.externalUri;
}

}
@@ -0,0 +1,55 @@
/**
* Copyright 2013 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.fcrepo.kernel.utils;

import java.io.InputStream;
import java.net.URI;

import javax.jcr.RepositoryException;

/**
* A CacheEntry abstraction for the various possible types of entries
* @author fasseg
*
*/
public interface CacheEntry {

/**
* Check the fixity of a {@link CacheEntry}
* @param checksum the checksum previously generated for the entry
* @param size the size of the entry
* @return a {@link FixityResult} containing the relevant data
* @throws RepositoryException
*/
public FixityResult checkFixity(URI checksum, long size)
throws RepositoryException;

/**
* Get a raw input stream from the underlying store
* @return the content for this entry
* @throws RepositoryException
*/
public abstract InputStream getInputStream() throws RepositoryException;

/**
* Generate a human-readable identifier for the location of this entry
*
* @return
* @throws RepositoryException
*/
public abstract String getExternalIdentifier();
}

0 comments on commit 9136033

Please sign in to comment.