Skip to content

Commit

Permalink
fix some bugs in chunking I/O; add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
barmintor committed May 9, 2013
1 parent 0c0e0e6 commit b566a78
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 11 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.infinispan.loaders.CacheStore;
import org.modeshape.common.logging.Logger;


/**
* Merges chunks from cache and provides InputStream-feeling.
*/
Expand Down Expand Up @@ -85,18 +86,24 @@ public int read(final byte[] b, final int off, int len) throws IOException {
}
System.arraycopy(buffer, indexInBuffer, b, off, len);
indexInBuffer += len;
if (indexInBuffer >= buffer.length) {
fillBuffer();
// if we've just exhausted the buffer, make sure we try a new buffer on next skip/read
if (indexInBuffer == buffer.length) {
buffer = null;
indexInBuffer = 0;
}
return len;
}

@Override
public int available() throws IOException {
if (buffer == null) {
fillBuffer();
return 0;
}
if (indexInBuffer >= 0) {
return buffer.length - indexInBuffer;
} else {
return -1;
}
return buffer.length - indexInBuffer;
}

@Override
Expand All @@ -108,14 +115,17 @@ public final long skip(long n) throws IOException {
fillBuffer();
return skip(n);
}
if (buffer.length + n > indexInBuffer) {
n = buffer.length - indexInBuffer;
}
if (n < 0) {
return 0;
// do not load a new buffer if skippable bytes remain in current buffer
if (indexInBuffer + n >= buffer.length) {
long skipped = buffer.length - indexInBuffer;
// but make sure a new buffer is loaded on next skip/read
buffer = null;
indexInBuffer = 0;
return skipped;
} else {
indexInBuffer += n;
return n;
}
indexInBuffer += n;
return n;
}

private void fillBuffer() throws IOException {
Expand Down
Expand Up @@ -99,6 +99,8 @@ private void storeBufferInBLOBCache() throws IOException {

logger.debug("Store chunk {0}", chunkKey);
blobCache.store(cacheEntry);
chunkIndex++;
chunkBuffer.reset();
} catch (final CacheLoaderException e) {
throw new IOException(e);
}
Expand Down
@@ -0,0 +1,131 @@
package org.fcrepo.utils.infinispan;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.security.SecureRandom;

import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.junit.Before;
import org.junit.Test;

public class StoreChunkInputStreamTest {

private static final int DATA_SIZE = 1024;

private StoreChunkInputStream testObj;

private CacheStore mockStore;

private InternalCacheEntry mockEntry;

private String mockKey = "key-to-a-mock-blob";

private String mockFirstChunk = mockKey + "-0";


@Before
public void setUp() throws CacheLoaderException {
mockStore = mock(CacheStore.class);
when(mockStore.containsKey(mockFirstChunk)).thenReturn(true);
mockEntry = mock(InternalCacheEntry.class);
testObj = new StoreChunkInputStream(mockStore, mockKey);
}

@Test
public void testRead() throws IOException {
testObj.read();
}

@Test
public void testBufferedRead() throws IOException, CacheLoaderException {
InternalCacheEntry mockEntry = mock(InternalCacheEntry.class);
byte [] data = SecureRandom.getSeed(DATA_SIZE);
when(mockEntry.getValue()).thenReturn(data);
when(mockStore.load(anyString())).thenReturn(mockEntry).thenReturn(mockEntry).thenReturn(null);
int partition = 234;
int expected = (DATA_SIZE - partition);
byte [] buffer = new byte[DATA_SIZE];
long actual = testObj.read(buffer, 0, expected);
// can read less than a block of data
assertEquals(expected, actual);
// will not load the next chunk if more data is available
actual = testObj.read(buffer,0, DATA_SIZE);
assertEquals(partition, actual);
actual = testObj.read(buffer,0, DATA_SIZE);
// will load the next chunk if no data is available
assertEquals(DATA_SIZE, actual);
// and will report the end of the data accurately
actual = testObj.read(buffer,0, DATA_SIZE);
assertEquals(-1, actual);

}

@Test
public void testAvailable() throws IOException, CacheLoaderException {
byte [] data = SecureRandom.getSeed(DATA_SIZE);
when(mockEntry.getValue()).thenReturn(data);
when(mockStore.load(mockFirstChunk)).thenReturn(mockEntry);
assertEquals(0, testObj.available());
int partition = 435;
testObj.skip(partition);
// part of the first buffer remains
assertEquals(DATA_SIZE - partition, testObj.available());
testObj.skip(DATA_SIZE - partition);
// none of the first buffer remains
assertEquals(0, testObj.available());
testObj.skip(1);
// no buffers remain
assertEquals(-1, testObj.available());
}

@Test
public void testSkip() throws IOException, CacheLoaderException {
long expected = (DATA_SIZE - 1);
byte [] data = SecureRandom.getSeed(DATA_SIZE);
when(mockEntry.getValue()).thenReturn(data);
when(mockStore.load(mockFirstChunk)).thenReturn(mockEntry);
long actual = testObj.skip(expected);
assertEquals(expected, actual);
verify(mockStore).load(anyString());
verify(mockStore).load(mockFirstChunk);
verify(mockEntry).getValue();
assertTrue(testObj.read() > -1);
assertEquals(-1, testObj.read());
}

@Test
public void testSkipMultipleBuffers() throws IOException, CacheLoaderException {
InternalCacheEntry mockEntry = mock(InternalCacheEntry.class);
byte [] data = SecureRandom.getSeed(DATA_SIZE);
when(mockEntry.getValue()).thenReturn(data);
when(mockStore.load(anyString())).thenReturn(mockEntry).thenReturn(mockEntry).thenReturn(null);

long expected = (DATA_SIZE);
// ask for more than the buffer
long actual = testObj.skip(DATA_SIZE + 1);
// we should skip only one complete buffer
assertEquals(expected, actual);
// ok, skip all but the last byte remaining
expected = (DATA_SIZE - 1);
actual = testObj.skip(expected);
// new buffer, mostly skipped
assertEquals(expected, actual);
// we should still have 1 more byte
assertTrue(testObj.read() > -1);
// but only the one
assertEquals(-1, testObj.read());
// and we only had two cacheEntries
verify(mockEntry, times(2)).getValue();
}

@Test
public void testNextChunk() throws IOException {
testObj.nextChunk();
}

}
@@ -0,0 +1,58 @@
package org.fcrepo.utils.infinispan;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.security.SecureRandom;

import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.junit.Before;
import org.junit.Test;

public class StoreChunkOutputStreamTest {

private static final int DATA_SIZE = 1024;

private StoreChunkOutputStream testObj;

private CacheStore mockStore;

private InternalCacheEntry mockEntry;

private String mockKey = "key-to-a-mock-blob";


@Before
public void setUp() {
mockStore = mock(CacheStore.class);
mockEntry = mock(InternalCacheEntry.class);
testObj = new StoreChunkOutputStream(mockStore, mockKey);
}

@Test
public void testWritingMultipleChunks() throws IOException, CacheLoaderException {
byte[] data = SecureRandom.getSeed(DATA_SIZE);
for (int i=0; i< 1025; i++) {
testObj.write(data);
}
testObj.close();
verify(mockStore, times(2)).store(any(InternalCacheEntry.class));
assertEquals(2, testObj.getNumberChunks());
}

@Test
public void testWritingMultipleChunksOnVersionedKey() throws IOException, CacheLoaderException {
byte[] data = SecureRandom.getSeed(DATA_SIZE);
when(mockStore.load(mockKey + "-0")).thenReturn(mockEntry);
for (int i=0; i< 1025; i++) {
testObj.write(data);
}
testObj.close();
verify(mockStore).load(mockKey + "-0");
verify(mockStore, times(2)).store(any(InternalCacheEntry.class));
assertEquals(2, testObj.getNumberChunks());
}
}

0 comments on commit b566a78

Please sign in to comment.