Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add LowLevelCacheStore#storeValue which manipulates the binary conten…
…t in a cachestore instance (and is really scary.. i assume there's a better way, but this seems to work for now)
- Loading branch information
Showing
4 changed files
with
254 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
104 changes: 104 additions & 0 deletions
104
fcrepo-kernel/src/main/java/org/fcrepo/utils/infinispan/StoreChunkOutputStream.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package org.fcrepo.utils.infinispan; | ||
|
||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import org.infinispan.Cache; | ||
import org.infinispan.container.InternalEntryFactory; | ||
import org.infinispan.container.InternalEntryFactoryImpl; | ||
import org.infinispan.container.entries.CacheEntry; | ||
import org.infinispan.container.entries.InternalCacheEntry; | ||
import org.infinispan.container.versioning.EntryVersion; | ||
import org.infinispan.loaders.CacheLoaderException; | ||
import org.infinispan.loaders.CacheStore; | ||
import org.modeshape.common.logging.Logger; | ||
|
||
public class StoreChunkOutputStream extends OutputStream { | ||
|
||
|
||
protected final Logger logger; | ||
|
||
public static final int CHUNKSIZE = 1024 * 1024 * 1; // 1 MB | ||
|
||
protected final CacheStore blobCache; | ||
protected final String keyPrefix; | ||
private ByteArrayOutputStream chunkBuffer; | ||
private boolean closed; | ||
protected int chunkIndex; | ||
|
||
private InternalEntryFactory entryFactory = new InternalEntryFactoryImpl(); | ||
|
||
public StoreChunkOutputStream( CacheStore blobCache, | ||
String keyPrefix ) { | ||
logger = Logger.getLogger(getClass()); | ||
this.blobCache = blobCache; | ||
this.keyPrefix = keyPrefix; | ||
chunkBuffer = new ByteArrayOutputStream(1024); | ||
} | ||
|
||
|
||
/** | ||
* @return Number of chunks stored. | ||
*/ | ||
public int getNumberChunks() { | ||
return chunkIndex; | ||
} | ||
|
||
@Override | ||
public void write( int b ) throws IOException { | ||
if (chunkBuffer.size() == CHUNKSIZE) { | ||
storeBufferInBLOBCache(); | ||
} | ||
chunkBuffer.write(b); | ||
} | ||
|
||
@Override | ||
public void write( byte[] b, | ||
int off, | ||
int len ) throws IOException { | ||
if (len + chunkBuffer.size() <= CHUNKSIZE) { | ||
chunkBuffer.write(b, off, len); | ||
} else { | ||
int storeLength = CHUNKSIZE - chunkBuffer.size(); | ||
write(b, off, storeLength); | ||
storeBufferInBLOBCache(); | ||
write(b, off + storeLength, len - storeLength); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
logger.debug("Close. Buffer size at close: {0}", chunkBuffer.size()); | ||
if (closed) { | ||
logger.debug("Stream already closed."); | ||
return; | ||
} | ||
closed = true; | ||
// store last chunk | ||
if (chunkBuffer.size() > 0) { | ||
storeBufferInBLOBCache(); | ||
} | ||
} | ||
|
||
private void storeBufferInBLOBCache() throws IOException { | ||
final byte[] chunk = chunkBuffer.toByteArray(); | ||
try { | ||
String chunkKey = keyPrefix + "-" + chunkIndex; | ||
InternalCacheEntry c = blobCache.load(chunkKey); | ||
final InternalCacheEntry cacheEntry; | ||
if(c == null) { | ||
cacheEntry = entryFactory.create(chunkKey, chunk, (EntryVersion)null); | ||
} else { | ||
cacheEntry = entryFactory.create(chunkKey, chunk, c); | ||
} | ||
|
||
logger.debug("Store chunk {0}", chunkKey); | ||
blobCache.store(cacheEntry); | ||
} catch (CacheLoaderException e) { | ||
throw new IOException(e); | ||
} | ||
|
||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
89 changes: 89 additions & 0 deletions
89
fcrepo-kernel/src/test/resources/config/infinispan_configuration_chained.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd" | ||
xmlns="urn:infinispan:config:5.2"> | ||
|
||
<global> | ||
<globalJmxStatistics allowDuplicateDomains="true" /> | ||
</global> | ||
|
||
<default> | ||
</default> | ||
|
||
<namedCache name="FedoraRepository"> | ||
<!-- | ||
Our Infinispan cache needs to be transactional. However, we'll also configure it to | ||
use pessimistic locking, which is required whenever applications will be concurrently | ||
updating nodes within the same process. If you're not sure, use pessimistic locking. | ||
--> | ||
|
||
<eviction maxEntries="100" strategy="LRU" threadPolicy="DEFAULT"/> | ||
|
||
<transaction | ||
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" | ||
transactionMode="TRANSACTIONAL" lockingMode="PESSIMISTIC"/> | ||
<!-- | ||
Define the cache loaders (i.e., cache stores). Passivation is false because we want *all* | ||
data to be persisted, not just what doesn't fit into memory. Shared is false because there | ||
are no other caches sharing this file store. We set preload to false for lazy loading; | ||
may be improved by preloading and configuring eviction. | ||
We can have multiple cache loaders, which get chained. But we'll define just one. | ||
--> | ||
|
||
<loaders passivation="false" shared="false" preload="false"> | ||
|
||
<!-- | ||
The 'fetchPersistentState' attribute applies when this cache joins the cluster; the value doesn't | ||
really matter to us in this case. See the documentation for more options. | ||
--> | ||
<loader class="org.infinispan.loaders.file.FileCacheStore" fetchPersistentState="true" | ||
purgeOnStartup="false"> | ||
<!-- See the documentation for more configuration examples and flags. --> | ||
<properties> | ||
<!-- We have to set the location where we want to store the data. --> | ||
<property name="location" value="${fcrepo.ispn.CacheDirPath:target/FedoraRepository/storage}"/> | ||
|
||
<property name="fsyncMode" value="perWrite"/> | ||
</properties> | ||
<!-- This repository isn't clustered, so we could set up the SingletonStore. | ||
singletonStore enabled="true" pushStateWhenCoordinator="true" pushStateTimeout="20000"/> | ||
--> | ||
<!-- | ||
We could use "write-behind", which actually writes to the file system asynchronously, | ||
which can improve performance as seen by the JCR client. | ||
Plus changes are coalesced, meaning that if multiple changes are enqueued for the | ||
same node, only the last one is written. (This is good much of the time, but not | ||
always.) | ||
<async enabled="true" flushLockTimeout="15000" threadPoolSize="5"/> | ||
--> | ||
</loader> | ||
|
||
<!-- | ||
The 'fetchPersistentState' attribute applies when this cache joins the cluster; the value doesn't | ||
really matter to us in this case. See the documentation for more options. | ||
--> | ||
<loader class="org.infinispan.loaders.file.FileCacheStore" fetchPersistentState="false" | ||
purgeOnStartup="false"> | ||
<!-- See the documentation for more configuration examples and flags. --> | ||
<properties> | ||
<!-- We have to set the location where we want to store the data. --> | ||
<property name="location" value="${fcrepo.ispn.alternative.CacheDirPath:target/FedoraRepository/storage-alt}"/> | ||
|
||
<property name="fsyncMode" value="perWrite"/> | ||
</properties> | ||
<!-- This repository isn't clustered, so we could set up the SingletonStore. | ||
singletonStore enabled="true" pushStateWhenCoordinator="true" pushStateTimeout="20000"/> | ||
--> | ||
<!-- | ||
We could use "write-behind", which actually writes to the file system asynchronously, | ||
which can improve performance as seen by the JCR client. | ||
Plus changes are coalesced, meaning that if multiple changes are enqueued for the | ||
same node, only the last one is written. (This is good much of the time, but not | ||
always.) | ||
--> | ||
<async enabled="true" flushLockTimeout="15000" threadPoolSize="5"/> | ||
|
||
</loader> | ||
</loaders> | ||
</namedCache> | ||
</infinispan> |