Skip to content

Commit

Permalink
Added javadoc to BatchingStrategy.
Browse files Browse the repository at this point in the history
Added unit tests.
  • Loading branch information
jodzga committed Dec 14, 2015
1 parent b784147 commit fc27e66
Show file tree
Hide file tree
Showing 4 changed files with 402 additions and 13 deletions.
Expand Up @@ -27,7 +27,7 @@
import com.linkedin.parseq.trace.TraceBuilder;

/**
* BatchingStrategy helps building "batching clients" in ParSeq. "Client" means on object that given {@code K key}
* {@code BatchingStrategy} helps building "batching clients" in ParSeq. "Client" means on object that given {@code K key}
* provides a task that returns {@code T value}. "Batching" means that it can group together keys to resolve values
* in batches. The benefit of this approach is that batching happens transparently in the background and user's code
* does not have to deal with logic needed to implement batching.
Expand All @@ -36,9 +36,7 @@
* the sake of simplicity of the example we are using dummy, synchronous key-value store interface:
* <blockquote><pre>
* interface KVStore {
*
* String get(Long key);
*
* Map{@code <Long, String>} batchGet(Collection{@code <Long>} keys);
* }
* </pre></blockquote>
Expand All @@ -47,7 +45,6 @@
* <blockquote><pre>
* public static class BatchingKVStoreClient extends BatchingStrategy{@code <Integer, Long, String>} {
* private final KVStore _store;
*
* public BatchingKVStoreClient(KVStore store) {
* _store = store;
* }
Expand All @@ -74,12 +71,12 @@
* trivially returns a constant {@code 0}. In practice {@code classify()} returns an equivalence class. All keys that
* returns equal equivalence class will constitute a batch.
* <p>
* The interaction between ParSeq and BatchingStrategy is the following:
* The interaction between ParSeq and {@code BatchingStrategy} is the following:
* <ol>
* <li>{@code batchable(String desc, K key)} is invoked to create Task instance</li>
* <li>Plan is started by {@code Engine.run()}</li>
* <li>When Task returned by {@code batchable(String desc, K key)} is started, the key {@code K} is remembered by a BatchingStrategy</li>
* <li>When Plan can't make immediate progress BatchingStrategy will be invoked to run batchable operations:
* <li>When Task returned by {@code batchable(String desc, K key)} is started, the key {@code K} is remembered by a {@code BatchingStrategy}</li>
* <li>When Plan can't make immediate progress {@code BatchingStrategy} will be invoked to run batchable operations:
* <ol>
* <li>Every {@code K key} is classified using {@code classify(K key)} method</li>
* <li>Keys, together with adequate Promises, are batched together based on {@code G group} returned by previous step</li>
Expand All @@ -88,6 +85,10 @@
* </ol>
* Both {@code executeSingleton(G group, K key, BatchEntry<T> entry)} and {@code executeBatch(G group, Batch<K, T> batch)} are invoked
* in the context of their own Task instance with description given by {@code getBatchName(G group, Batch<K, T> batch)}.
* Implementation of {@code BatchingStrategy} has to be fast because it is executed sequentially with respect to tasks belonging
* to the plan. It means that no other task will be executed until {@code BatchingStrategy} completes. Typically classify(K key)
* is a synchronous and fast operation whilst {@code executeBatch(G group, Batch<K, T> batch)} and
* {@code executeSingleton(G group, K key, BatchEntry<T> entry)} return quickly and complete promises asynchronously.
* </ol>
*
* @author Jaroslaw Odzga (jodzga@linkedin.com)
Expand Down Expand Up @@ -189,21 +190,48 @@ void handleBatch(final PlanContext planContext) {
}
}

/**
* This method will be called for every {@code Batch} that contains at least two elements.
* Implementation of this method should make sure that all {@code SettablePromise} contained in the {@code Batch}
* will eventually be resolved - typically asynchronously.
* @param group group that represents the batch
* @param batch batch contains collection of {@code SettablePromise} that eventually need to be resolved - typically asynchronously
*/
public abstract void executeBatch(G group, Batch<K, T> batch);

/**
* This method will be called for every {@code Batch} that contains only one element.
* Implementation of this method should make sure that {@code SettablePromise} contained in the {@code BatchEntry}
* will eventually be resolved - typically asynchronously.
* @param group group that represents the batch
* @param key key of the single element classified to the given group
* @param entry entry contains a {@code SettablePromise} that eventually needs to be resolved - typically asynchronously
*/
public abstract void executeSingleton(G group, K key, BatchEntry<T> entry);

public abstract G classify(K entry);
/**
* Classify the {@code K Key} and by doing so assign it to a {@code G group}.
* If two keys are classified by the same group then they will belong to the same {@code Batch}.
* For each batch either {@link #executeBatch(Object, Batch)} will be called if batch contains at least two elements
* or {@link #executeSingleton(Object, Object, BatchEntry)} will be called if batch contains only one element.
* @param key key to be classified
* @return Group that represents a batch the key will belong to
*/
public abstract G classify(K key);

/**
* Overriding this method allows providing custom name for a batch. Name will appear in the
* ParSeq trace as a description of the task that executes the batch.
* @param batch batch
* @param group group
* @return name for the batch
* @param batch batch to be described
* @param group group to be described
* @return name for the batch and group
*/
public String getBatchName(Batch<K, T> batch, G group) {
return "batch(" + batch.size() + ")";
if (batch.size() == 1) {
return "singleton";
} else {
return "batch(" + batch.size() + ")";
}
}

private Map<G, Batch<K, T>> split(Batch<K, T> batch) {
Expand Down
@@ -0,0 +1,57 @@
package com.linkedin.parseq.batching;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;

import com.linkedin.parseq.batching.BatchImpl.BatchEntry;
import com.linkedin.parseq.function.Tuple2;
import com.linkedin.parseq.function.Tuples;
import com.linkedin.parseq.promise.SettablePromise;

public class RecordingStrategy<G, K, T> extends BatchingStrategy<G, K, T> {

final List<K> _classifiedKeys = new ArrayList<>();
final List<Batch<K, T>> _executedBatches = new ArrayList<>();
final List<Tuple2<K, BatchEntry<T>>> _executedSingletons = new ArrayList<>();

final BiConsumer<K, SettablePromise<T>> _completer;
final Function<K, G> _classifier;

public RecordingStrategy(BiConsumer<K, SettablePromise<T>> completer,Function<K, G> classifier) {
_completer = completer;
_classifier = classifier;
}

@Override
public void executeBatch(G group, Batch<K, T> batch) {
_executedBatches.add(batch);
batch.foreach(_completer);
}

@Override
public void executeSingleton(G group, K key, BatchEntry<T> entry) {
_executedSingletons.add(Tuples.tuple(key, entry));
_completer.accept(key, entry.getPromise());
}

@Override
public G classify(K key) {
_classifiedKeys.add(key);
return _classifier.apply(key);
}

public List<K> getClassifiedKeys() {
return _classifiedKeys;
}

public List<Batch<K, T>> getExecutedBatches() {
return _executedBatches;
}

public List<Tuple2<K, BatchEntry<T>>> getExecutedSingletons() {
return _executedSingletons;
}

}
@@ -0,0 +1,74 @@
package com.linkedin.parseq.batching;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.testng.annotations.Test;

import com.linkedin.parseq.batching.BatchImpl.BatchBuilder;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import com.linkedin.parseq.trace.ShallowTraceBuilder;

public class TestBatch {

@Test
public void testEmptyBatch() {
BatchBuilder<Integer, String> builder = Batch.builder();
Batch<Integer, String> empty = builder.build();

assertEquals(empty.size(), 0);
assertEquals(empty.values().size(), 0);
assertEquals(empty.entries().size(), 0);
}

@Test
public void testBatch() {

final AtomicInteger counter = new AtomicInteger(0);

final Function<String, SettablePromise<String>> createPromise = expected -> {
SettablePromise<String> promise = Promises.settable();
promise.addListener(p -> {
if (p.get().equals(expected)) {
counter.incrementAndGet();
}
});
return promise;
};

BatchBuilder<Integer, String> builder = Batch.builder();
builder.add(0, new ShallowTraceBuilder(0L), createPromise.apply("0"));
builder.add(1, new ShallowTraceBuilder(1L), createPromise.apply("1"));
final SettablePromise<String> p2 = createPromise.apply("2");
builder.add(2, new ShallowTraceBuilder(2L), p2);
final SettablePromise<String> p3 = Promises.settable();
builder.add(3, new ShallowTraceBuilder(2L), p3);
builder.add(0, new ShallowTraceBuilder(3L), createPromise.apply("0")); //duplicate
Batch<Integer, String> batch = builder.build();

assertEquals(batch.size(), 4);
assertEquals(batch.values().size(), 4);
assertEquals(batch.entries().size(), 4);
assertEquals(batch.keys().size(), 4);

assertTrue(batch.keys().contains(0));
assertTrue(batch.keys().contains(1));
assertTrue(batch.keys().contains(2));
assertTrue(batch.keys().contains(3));

batch.done(0, "0");
batch.done(1, "1");
batch.fail(3, new Exception());

assertEquals(counter.get(), 3); // 0 with duplicate + 1
assertFalse(p2.isDone());
assertTrue(p3.isDone());
assertTrue(p3.isFailed());
}

}

0 comments on commit fc27e66

Please sign in to comment.