Skip to content

Commit

Permalink
add LowLevelCacheStore#storeValue which manipulates the binary conten…
Browse files Browse the repository at this point in the history
…t in a cachestore instance (and is really scary.. i assume there's a better way, but this seems to work for now)
  • Loading branch information
cbeer committed Mar 14, 2013
1 parent 488fa87 commit 8bbd68a
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 0 deletions.
@@ -1,6 +1,8 @@
package org.fcrepo.utils;

import org.apache.poi.util.IOUtils;
import org.fcrepo.utils.infinispan.StoreChunkInputStream;
import org.fcrepo.utils.infinispan.StoreChunkOutputStream;
import org.infinispan.loaders.AbstractCacheStoreConfig;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.CacheStoreConfig;
Expand All @@ -10,7 +12,9 @@
import org.modeshape.jcr.value.binary.BinaryStoreException;
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Properties;

Expand Down Expand Up @@ -49,6 +53,17 @@ public InputStream getInputStream(BinaryKey key) throws BinaryStoreException {
}
}

public void storeValue(BinaryKey key, InputStream stream ) throws BinaryStoreException, IOException {
if(this.store instanceof InfinispanBinaryStore) {
OutputStream outputStream = new StoreChunkOutputStream(low_level_store, key.toString() + DATA_SUFFIX);
IOUtils.copy(stream, outputStream);
outputStream.close();
} else {
// the BinaryStore will calculate a new key for us.
this.store.storeValue(stream);
}
}

public String getExternalIdentifier() {

if(this.store instanceof InfinispanBinaryStore) {
Expand Down
@@ -0,0 +1,104 @@
package org.fcrepo.utils.infinispan;


import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.infinispan.Cache;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.container.InternalEntryFactoryImpl;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.modeshape.common.logging.Logger;

public class StoreChunkOutputStream extends OutputStream {


protected final Logger logger;

public static final int CHUNKSIZE = 1024 * 1024 * 1; // 1 MB

protected final CacheStore blobCache;
protected final String keyPrefix;
private ByteArrayOutputStream chunkBuffer;
private boolean closed;
protected int chunkIndex;

private InternalEntryFactory entryFactory = new InternalEntryFactoryImpl();

public StoreChunkOutputStream( CacheStore blobCache,
String keyPrefix ) {
logger = Logger.getLogger(getClass());
this.blobCache = blobCache;
this.keyPrefix = keyPrefix;
chunkBuffer = new ByteArrayOutputStream(1024);
}


/**
* @return Number of chunks stored.
*/
public int getNumberChunks() {
return chunkIndex;
}

@Override
public void write( int b ) throws IOException {
if (chunkBuffer.size() == CHUNKSIZE) {
storeBufferInBLOBCache();
}
chunkBuffer.write(b);
}

@Override
public void write( byte[] b,
int off,
int len ) throws IOException {
if (len + chunkBuffer.size() <= CHUNKSIZE) {
chunkBuffer.write(b, off, len);
} else {
int storeLength = CHUNKSIZE - chunkBuffer.size();
write(b, off, storeLength);
storeBufferInBLOBCache();
write(b, off + storeLength, len - storeLength);
}
}

@Override
public void close() throws IOException {
logger.debug("Close. Buffer size at close: {0}", chunkBuffer.size());
if (closed) {
logger.debug("Stream already closed.");
return;
}
closed = true;
// store last chunk
if (chunkBuffer.size() > 0) {
storeBufferInBLOBCache();
}
}

private void storeBufferInBLOBCache() throws IOException {
final byte[] chunk = chunkBuffer.toByteArray();
try {
String chunkKey = keyPrefix + "-" + chunkIndex;
InternalCacheEntry c = blobCache.load(chunkKey);
final InternalCacheEntry cacheEntry;
if(c == null) {
cacheEntry = entryFactory.create(chunkKey, chunk, (EntryVersion)null);
} else {
cacheEntry = entryFactory.create(chunkKey, chunk, c);
}

logger.debug("Store chunk {0}", chunkKey);
blobCache.store(cacheEntry);
} catch (CacheLoaderException e) {
throw new IOException(e);
}

}

}
@@ -1,13 +1,18 @@
package org.fcrepo.utils;

import org.apache.commons.io.IOUtils;
import org.infinispan.configuration.cache.CacheStoreConfiguration;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.decorators.ChainingCacheStore;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.modeshape.jcr.JcrRepository;
import org.modeshape.jcr.value.BinaryKey;
import org.modeshape.jcr.value.BinaryValue;
import org.modeshape.jcr.value.binary.BinaryStore;
import org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore;
import org.springframework.test.context.ContextConfiguration;
Expand All @@ -16,6 +21,10 @@
import javax.inject.Inject;
import javax.jcr.Repository;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.LinkedHashMap;

import static org.junit.Assert.assertEquals;


Expand Down Expand Up @@ -47,4 +56,41 @@ public void testGetExternalIdentifierWithInfinispan() throws Exception {
LowLevelCacheStore cs = new LowLevelCacheStore(store, ispn);
assertEquals("org.modeshape.jcr.value.binary.infinispan.InfinispanBinaryStore:org.infinispan.loaders.file.FileCacheStore:target/FedoraRepository/storage", cs.getExternalIdentifier());
}

@Test
public void testModifyingCacheStores() throws Exception {

EmbeddedCacheManager cm = new DefaultCacheManager("config/infinispan_configuration_chained.xml");
BinaryStore store = new InfinispanBinaryStore(cm, false, "FedoraRepository", "FedoraRepository");

CacheStore ispn = cm.getCache("FedoraRepository").getAdvancedCache().getComponentRegistry()
.getComponent(CacheLoaderManager.class)
.getCacheStore();

assert(ispn instanceof ChainingCacheStore);

final BinaryKey key = new BinaryKey("123");

ChainingCacheStore chained_store = (ChainingCacheStore)ispn;

final LinkedHashMap<CacheStore,CacheStoreConfiguration> stores = chained_store.getStores();


LowLevelCacheStore cs = new LowLevelCacheStore(store, (CacheStore)stores.keySet().toArray()[0]);
LowLevelCacheStore cs2 = new LowLevelCacheStore(store, (CacheStore)stores.keySet().toArray()[1]);

cs.storeValue(key, new ByteArrayInputStream("123456".getBytes()));

cs2.storeValue(key, new ByteArrayInputStream("asdfg".getBytes()));

Thread.sleep(1000);

String v1 = IOUtils.toString(cs.getInputStream(key));
String v2 = IOUtils.toString(cs2.getInputStream(key));

assertEquals("Found the wrong value in our cache store", "123456", v1);
assertEquals("Found the wrong value in our cache store", "asdfg", v2);


}
}
@@ -0,0 +1,89 @@
<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd"
xmlns="urn:infinispan:config:5.2">

<global>
<globalJmxStatistics allowDuplicateDomains="true" />
</global>

<default>
</default>

<namedCache name="FedoraRepository">
<!--
Our Infinispan cache needs to be transactional. However, we'll also configure it to
use pessimistic locking, which is required whenever applications will be concurrently
updating nodes within the same process. If you're not sure, use pessimistic locking.
-->

<eviction maxEntries="100" strategy="LRU" threadPolicy="DEFAULT"/>

<transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
transactionMode="TRANSACTIONAL" lockingMode="PESSIMISTIC"/>
<!--
Define the cache loaders (i.e., cache stores). Passivation is false because we want *all*
data to be persisted, not just what doesn't fit into memory. Shared is false because there
are no other caches sharing this file store. We set preload to false for lazy loading;
may be improved by preloading and configuring eviction.
We can have multiple cache loaders, which get chained. But we'll define just one.
-->

<loaders passivation="false" shared="false" preload="false">

<!--
The 'fetchPersistentState' attribute applies when this cache joins the cluster; the value doesn't
really matter to us in this case. See the documentation for more options.
-->
<loader class="org.infinispan.loaders.file.FileCacheStore" fetchPersistentState="true"
purgeOnStartup="false">
<!-- See the documentation for more configuration examples and flags. -->
<properties>
<!-- We have to set the location where we want to store the data. -->
<property name="location" value="${fcrepo.ispn.CacheDirPath:target/FedoraRepository/storage}"/>

<property name="fsyncMode" value="perWrite"/>
</properties>
<!-- This repository isn't clustered, so we could set up the SingletonStore.
singletonStore enabled="true" pushStateWhenCoordinator="true" pushStateTimeout="20000"/>
-->
<!--
We could use "write-behind", which actually writes to the file system asynchronously,
which can improve performance as seen by the JCR client.
Plus changes are coalesced, meaning that if multiple changes are enqueued for the
same node, only the last one is written. (This is good much of the time, but not
always.)
<async enabled="true" flushLockTimeout="15000" threadPoolSize="5"/>
-->
</loader>

<!--
The 'fetchPersistentState' attribute applies when this cache joins the cluster; the value doesn't
really matter to us in this case. See the documentation for more options.
-->
<loader class="org.infinispan.loaders.file.FileCacheStore" fetchPersistentState="false"
purgeOnStartup="false">
<!-- See the documentation for more configuration examples and flags. -->
<properties>
<!-- We have to set the location where we want to store the data. -->
<property name="location" value="${fcrepo.ispn.alternative.CacheDirPath:target/FedoraRepository/storage-alt}"/>

<property name="fsyncMode" value="perWrite"/>
</properties>
<!-- This repository isn't clustered, so we could set up the SingletonStore.
singletonStore enabled="true" pushStateWhenCoordinator="true" pushStateTimeout="20000"/>
-->
<!--
We could use "write-behind", which actually writes to the file system asynchronously,
which can improve performance as seen by the JCR client.
Plus changes are coalesced, meaning that if multiple changes are enqueued for the
same node, only the last one is written. (This is good much of the time, but not
always.)
-->
<async enabled="true" flushLockTimeout="15000" threadPoolSize="5"/>

</loader>
</loaders>
</namedCache>
</infinispan>

0 comments on commit 8bbd68a

Please sign in to comment.