Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Reorganized API to make batching implementations more consistent i.e.
when batch tasks are completed and how they are reflected in trace. Created parseq-restli-client contrib project. The new parseq restli client will be backwards compatible to allow drop-in replacement for existing parseq restli client. Existing parseq restli client can't be updated because restli does not require java 8.
- Loading branch information
jodzga
committed
Dec 14, 2015
1 parent
4f21e06
commit ce2ae63
Showing
7 changed files
with
456 additions
and
101 deletions.
There are no files selected for viewing
33 changes: 33 additions & 0 deletions
33
contrib/parseq-batching/src/main/java/com/linkedin/parseq/batching/Batch.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package com.linkedin.parseq.batching; | ||
|
||
import java.util.Collection; | ||
import java.util.Set; | ||
import java.util.function.BiConsumer; | ||
|
||
import com.linkedin.parseq.batching.BatchImpl.BatchBuilder; | ||
import com.linkedin.parseq.batching.BatchImpl.BatchEntry; | ||
import com.linkedin.parseq.promise.PromiseResolvedException; | ||
import com.linkedin.parseq.promise.SettablePromise; | ||
|
||
public interface Batch<K, T> { | ||
|
||
void foreach(BiConsumer<K, SettablePromise<T>> consumer); | ||
|
||
Set<K> keys(); | ||
|
||
int size(); | ||
|
||
void done(K key, T value) throws PromiseResolvedException; | ||
|
||
void fail(K key, Throwable error) throws PromiseResolvedException; | ||
|
||
void failAll(Throwable error) throws PromiseResolvedException; | ||
|
||
boolean failAllRemaining(Throwable error); | ||
|
||
Collection<BatchEntry<T>> entries(); | ||
|
||
static <K, T> BatchBuilder<K, T> builder() { | ||
return new BatchBuilder<>(); | ||
} | ||
} |
23 changes: 0 additions & 23 deletions
23
contrib/parseq-batching/src/main/java/com/linkedin/parseq/batching/BatchEntry.java
This file was deleted.
Oops, something went wrong.
124 changes: 124 additions & 0 deletions
124
contrib/parseq-batching/src/main/java/com/linkedin/parseq/batching/BatchImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package com.linkedin.parseq.batching; | ||
|
||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.function.BiConsumer; | ||
|
||
import com.linkedin.parseq.promise.PromiseResolvedException; | ||
import com.linkedin.parseq.promise.Promises; | ||
import com.linkedin.parseq.promise.SettablePromise; | ||
import com.linkedin.parseq.trace.ShallowTraceBuilder; | ||
|
||
class BatchImpl<K, T> implements Batch<K, T> { | ||
|
||
private final Map<K, BatchEntry<T>> _map; | ||
|
||
private BatchImpl(Map<K, BatchEntry<T>> map) { | ||
_map = map; | ||
} | ||
|
||
@Override | ||
public void done(K key, T value) throws PromiseResolvedException { | ||
_map.get(key).getPromise().done(value); | ||
} | ||
|
||
@Override | ||
public void fail(K key, Throwable error) throws PromiseResolvedException { | ||
_map.get(key).getPromise().fail(error); | ||
} | ||
|
||
@Override | ||
public void failAll(Throwable error) throws PromiseResolvedException { | ||
PromiseResolvedException exception = null; | ||
for (BatchEntry<T> entry: _map.values()) { | ||
try { | ||
entry.getPromise().fail(error); | ||
} catch (PromiseResolvedException e) { | ||
exception = e; | ||
} | ||
} | ||
if (exception != null) { | ||
throw exception; | ||
} | ||
} | ||
|
||
@Override | ||
public Set<K> keys() { | ||
return _map.keySet(); | ||
} | ||
|
||
@Override | ||
public int size() { | ||
return _map.size(); | ||
} | ||
|
||
@Override | ||
public void foreach(final BiConsumer<K, SettablePromise<T>> consumer) { | ||
_map.forEach((key, entry) -> consumer.accept(key, entry.getPromise())); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "BatchImpl [entries=" + _map + "]"; | ||
} | ||
|
||
@Override | ||
public boolean failAllRemaining(Throwable error) { | ||
try { | ||
failAll(error); | ||
return true; | ||
} catch (PromiseResolvedException e) { | ||
return false; | ||
} | ||
} | ||
|
||
public static class BatchEntry<T> { | ||
|
||
private final SettablePromise<T> _promise; | ||
private final ShallowTraceBuilder _shallowTraceBuilder; | ||
|
||
public BatchEntry(ShallowTraceBuilder shallowTraceBuilder, SettablePromise<T> promise) { | ||
_promise = promise; | ||
_shallowTraceBuilder = shallowTraceBuilder; | ||
} | ||
|
||
public SettablePromise<T> getPromise() { | ||
return _promise; | ||
} | ||
|
||
public ShallowTraceBuilder getShallowTraceBuilder() { | ||
return _shallowTraceBuilder; | ||
} | ||
|
||
} | ||
|
||
static class BatchBuilder<K, T> { | ||
|
||
private final Map<K, BatchEntry<T>> _map = new HashMap<>(); | ||
|
||
public BatchBuilder<K, T> add(K key, BatchEntry<T> entry) { | ||
//deduplication | ||
BatchEntry<T> duplicate = _map.put(key, entry); | ||
if (duplicate != null) { | ||
Promises.propagateResult(entry.getPromise(), duplicate.getPromise()); | ||
} | ||
return this; | ||
} | ||
|
||
public BatchBuilder<K, T> add(K key, ShallowTraceBuilder traceBuilder, SettablePromise<T> promise) { | ||
return add(key, new BatchEntry<>(traceBuilder, promise)); | ||
} | ||
|
||
public Batch<K, T> build() { | ||
return new BatchImpl<>(_map); | ||
} | ||
} | ||
|
||
@Override | ||
public Collection<BatchEntry<T>> entries() { | ||
return _map.values(); | ||
} | ||
|
||
} |
122 changes: 62 additions & 60 deletions
122
contrib/parseq-batching/src/main/java/com/linkedin/parseq/batching/BatchingStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,95 +1,97 @@ | ||
package com.linkedin.parseq.batching; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.Callable; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.stream.Collectors; | ||
|
||
import com.linkedin.parseq.Task; | ||
import com.linkedin.parseq.Tasks; | ||
import com.linkedin.parseq.batching.BatchImpl.BatchBuilder; | ||
import com.linkedin.parseq.batching.BatchImpl.BatchEntry; | ||
import com.linkedin.parseq.internal.ContextImpl; | ||
import com.linkedin.parseq.internal.PlanContext; | ||
import com.linkedin.parseq.promise.CountDownPromiseListener; | ||
import com.linkedin.parseq.promise.PromiseListener; | ||
import com.linkedin.parseq.promise.Promises; | ||
import com.linkedin.parseq.promise.SettablePromise; | ||
import com.linkedin.parseq.trace.Relationship; | ||
import com.linkedin.parseq.trace.ShallowTraceBuilder; | ||
import com.linkedin.parseq.trace.TraceBuilder; | ||
|
||
public abstract class BatchingStrategy<K, G> { | ||
public abstract class BatchingStrategy<K, T> { | ||
|
||
private final ConcurrentHashMap<Long, List<InternalBatchEntry<K, Object>>> batches = | ||
private final ConcurrentHashMap<Long, BatchBuilder<K, T>> _batches = | ||
new ConcurrentHashMap<>(); | ||
|
||
@SuppressWarnings("unchecked") | ||
public <T> Task<T> batchable(final String desc, final Callable<BatchEntry<K, T>> callable) { | ||
return Task.async("batchable: " + desc, ctx -> { | ||
BatchEntry<K, T> batchEntry = callable.call(); | ||
|
||
public Task<T> batchable(final String desc, final K key) { | ||
return Task.async(desc, ctx -> { | ||
final SettablePromise<T> result = Promises.settable(); | ||
Long planId = ctx.getPlanId(); | ||
List<InternalBatchEntry<K, Object>> list = batches.get(planId); | ||
if (list == null) { | ||
list = new ArrayList<>(); | ||
List<InternalBatchEntry<K, Object>> existing = batches.putIfAbsent(planId, list); | ||
if (existing != null) { | ||
list = existing; | ||
BatchBuilder<K, T> builder = _batches.get(planId); | ||
if (builder == null) { | ||
builder = Batch.builder(); | ||
BatchBuilder<K, T> existingBuilder = _batches.putIfAbsent(planId, builder); | ||
if (existingBuilder != null) { | ||
builder = existingBuilder; | ||
} | ||
} | ||
list.add((InternalBatchEntry<K, Object>) new InternalBatchEntry<>(ctx.getShallowTraceBuilder(), | ||
batchEntry.getKey(), batchEntry.getPromise())); | ||
|
||
return batchEntry.getPromise(); | ||
builder.add(key, ctx.getShallowTraceBuilder(), result); | ||
return result; | ||
}); | ||
} | ||
|
||
void handleBatch(final PlanContext planContext) { | ||
final List<InternalBatchEntry<K, Object>> list = batches.remove(planContext.getId()); | ||
if (list != null) { | ||
try { | ||
list.stream().collect(Collectors.groupingBy(entry -> groupForKey(entry.getKey()))) | ||
.forEach((group, batch) -> handleGroupBatch(group, batch, planContext)); | ||
} catch (Throwable t) { | ||
failAll(list, t); | ||
} | ||
} | ||
} | ||
|
||
private void failAll(final List<InternalBatchEntry<K, Object>> list, final Throwable t) { | ||
for (InternalBatchEntry<K, Object> entry: list) { | ||
entry.getPromise().fail(t); | ||
} | ||
} | ||
private Task<?> taskForBatch(final Batch<K, T> batch) { | ||
return Task.async("batch", ctx -> { | ||
final SettablePromise<T> result = Promises.settable(); | ||
final PromiseListener<T> countDownListener = | ||
new CountDownPromiseListener<T>(batch.size(), result, null); | ||
|
||
void handleGroupBatch(final G group, final List<InternalBatchEntry<K, Object>> batch, final PlanContext planContext) { | ||
try { | ||
Task<?> task = taskForBatch(group, batch); | ||
//one of the batchable tasks becomes a parent of batch task | ||
//all other tasks become potential parents | ||
final TraceBuilder traceBuilder = ctx.getTraceBuilder(); | ||
Relationship rel = Relationship.PARENT_OF; | ||
for (InternalBatchEntry<K, ?> entry : batch) { | ||
planContext.getRelationshipsBuilder().addRelationship(rel, | ||
entry.getShallowTraceBuilder(), task.getShallowTraceBuilder()); | ||
for (BatchEntry<T> entry : batch.entries()) { | ||
traceBuilder.addRelationship(rel, | ||
entry.getShallowTraceBuilder(), ctx.getShallowTraceBuilder()); | ||
rel = Relationship.POTENTIAL_PARENT_OF; | ||
entry.getPromise().addListener(countDownListener); | ||
} | ||
new ContextImpl(planContext, task).runTask(); | ||
} catch (Throwable t) { | ||
failAll(batch, t); | ||
} | ||
} | ||
|
||
public abstract G groupForKey(K key); | ||
executeBatch(batch); | ||
|
||
public abstract Task<?> taskForBatch(G group, List<? extends BatchEntry<K, Object>> batch); | ||
|
||
private static class InternalBatchEntry<K, T> extends BatchEntry<K, T> { | ||
|
||
private final ShallowTraceBuilder _shallowTraceBuilder; | ||
return result; | ||
}); | ||
} | ||
|
||
public InternalBatchEntry(ShallowTraceBuilder shallowTraceBuilder, K key, SettablePromise<T> promise) { | ||
super(key, promise); | ||
_shallowTraceBuilder = shallowTraceBuilder; | ||
private Task<?> taskForBatches(Collection<Batch<K, T>> batches) { | ||
if (batches.size() == 1) { | ||
return taskForBatch(batches.iterator().next()); | ||
} else { | ||
return Tasks.par(batches.stream().map(this::taskForBatch).collect(Collectors.toList())); | ||
} | ||
} | ||
|
||
public ShallowTraceBuilder getShallowTraceBuilder() { | ||
return _shallowTraceBuilder; | ||
void handleBatch(final PlanContext planContext) { | ||
final Batch<K, T> batch = _batches.remove(planContext.getId()).build(); | ||
if (batch.size() > 0) { | ||
try { | ||
final Collection<Batch<K, T>> batches = split(batch); | ||
if (batches.size() > 0) { | ||
final Task<?> task = taskForBatches(batches); | ||
new ContextImpl(planContext, task).runTask(); | ||
} | ||
} catch (Throwable t) { | ||
//we don't care if some of promises have already been completed | ||
//all we care is that all remaining promises have been failed | ||
batch.failAllRemaining(t); | ||
} | ||
} | ||
} | ||
|
||
public abstract void executeBatch(Batch<K, T> batch); | ||
|
||
public Collection<Batch<K, T>> split(Batch<K, T> batch) { | ||
return Collections.singleton(batch); | ||
} | ||
|
||
} |
25 changes: 7 additions & 18 deletions
25
...eq-examples/src/main/java/com/linkedin/parseq/example/batching/BatchingClientExample.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.