Skip to content

Commit

Permalink
Reorganized API to make batching implementations more consistent i.e.
Browse files Browse the repository at this point in the history
when batch tasks are completed and how they are reflected in trace.
Created parseq-restli-client contrib project. The new parseq restli
client will be backwards compatible to allow drop-in replacement for
existing parseq restli client. Existing parseq restli client can't be
updated because restli does not require java 8.
  • Loading branch information
jodzga committed Dec 14, 2015
1 parent 4f21e06 commit ce2ae63
Show file tree
Hide file tree
Showing 7 changed files with 456 additions and 101 deletions.
@@ -0,0 +1,33 @@
package com.linkedin.parseq.batching;

import java.util.Collection;
import java.util.Set;
import java.util.function.BiConsumer;

import com.linkedin.parseq.batching.BatchImpl.BatchBuilder;
import com.linkedin.parseq.batching.BatchImpl.BatchEntry;
import com.linkedin.parseq.promise.PromiseResolvedException;
import com.linkedin.parseq.promise.SettablePromise;

public interface Batch<K, T> {

void foreach(BiConsumer<K, SettablePromise<T>> consumer);

Set<K> keys();

int size();

void done(K key, T value) throws PromiseResolvedException;

void fail(K key, Throwable error) throws PromiseResolvedException;

void failAll(Throwable error) throws PromiseResolvedException;

boolean failAllRemaining(Throwable error);

Collection<BatchEntry<T>> entries();

static <K, T> BatchBuilder<K, T> builder() {
return new BatchBuilder<>();
}
}

This file was deleted.

@@ -0,0 +1,124 @@
package com.linkedin.parseq.batching;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;

import com.linkedin.parseq.promise.PromiseResolvedException;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import com.linkedin.parseq.trace.ShallowTraceBuilder;

class BatchImpl<K, T> implements Batch<K, T> {

private final Map<K, BatchEntry<T>> _map;

private BatchImpl(Map<K, BatchEntry<T>> map) {
_map = map;
}

@Override
public void done(K key, T value) throws PromiseResolvedException {
_map.get(key).getPromise().done(value);
}

@Override
public void fail(K key, Throwable error) throws PromiseResolvedException {
_map.get(key).getPromise().fail(error);
}

@Override
public void failAll(Throwable error) throws PromiseResolvedException {
PromiseResolvedException exception = null;
for (BatchEntry<T> entry: _map.values()) {
try {
entry.getPromise().fail(error);
} catch (PromiseResolvedException e) {
exception = e;
}
}
if (exception != null) {
throw exception;
}
}

@Override
public Set<K> keys() {
return _map.keySet();
}

@Override
public int size() {
return _map.size();
}

@Override
public void foreach(final BiConsumer<K, SettablePromise<T>> consumer) {
_map.forEach((key, entry) -> consumer.accept(key, entry.getPromise()));
}

@Override
public String toString() {
return "BatchImpl [entries=" + _map + "]";
}

@Override
public boolean failAllRemaining(Throwable error) {
try {
failAll(error);
return true;
} catch (PromiseResolvedException e) {
return false;
}
}

public static class BatchEntry<T> {

private final SettablePromise<T> _promise;
private final ShallowTraceBuilder _shallowTraceBuilder;

public BatchEntry(ShallowTraceBuilder shallowTraceBuilder, SettablePromise<T> promise) {
_promise = promise;
_shallowTraceBuilder = shallowTraceBuilder;
}

public SettablePromise<T> getPromise() {
return _promise;
}

public ShallowTraceBuilder getShallowTraceBuilder() {
return _shallowTraceBuilder;
}

}

static class BatchBuilder<K, T> {

private final Map<K, BatchEntry<T>> _map = new HashMap<>();

public BatchBuilder<K, T> add(K key, BatchEntry<T> entry) {
//deduplication
BatchEntry<T> duplicate = _map.put(key, entry);
if (duplicate != null) {
Promises.propagateResult(entry.getPromise(), duplicate.getPromise());
}
return this;
}

public BatchBuilder<K, T> add(K key, ShallowTraceBuilder traceBuilder, SettablePromise<T> promise) {
return add(key, new BatchEntry<>(traceBuilder, promise));
}

public Batch<K, T> build() {
return new BatchImpl<>(_map);
}
}

@Override
public Collection<BatchEntry<T>> entries() {
return _map.values();
}

}
@@ -1,95 +1,97 @@
package com.linkedin.parseq.batching;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.linkedin.parseq.Task;
import com.linkedin.parseq.Tasks;
import com.linkedin.parseq.batching.BatchImpl.BatchBuilder;
import com.linkedin.parseq.batching.BatchImpl.BatchEntry;
import com.linkedin.parseq.internal.ContextImpl;
import com.linkedin.parseq.internal.PlanContext;
import com.linkedin.parseq.promise.CountDownPromiseListener;
import com.linkedin.parseq.promise.PromiseListener;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import com.linkedin.parseq.trace.Relationship;
import com.linkedin.parseq.trace.ShallowTraceBuilder;
import com.linkedin.parseq.trace.TraceBuilder;

public abstract class BatchingStrategy<K, G> {
public abstract class BatchingStrategy<K, T> {

private final ConcurrentHashMap<Long, List<InternalBatchEntry<K, Object>>> batches =
private final ConcurrentHashMap<Long, BatchBuilder<K, T>> _batches =
new ConcurrentHashMap<>();

@SuppressWarnings("unchecked")
public <T> Task<T> batchable(final String desc, final Callable<BatchEntry<K, T>> callable) {
return Task.async("batchable: " + desc, ctx -> {
BatchEntry<K, T> batchEntry = callable.call();

public Task<T> batchable(final String desc, final K key) {
return Task.async(desc, ctx -> {
final SettablePromise<T> result = Promises.settable();
Long planId = ctx.getPlanId();
List<InternalBatchEntry<K, Object>> list = batches.get(planId);
if (list == null) {
list = new ArrayList<>();
List<InternalBatchEntry<K, Object>> existing = batches.putIfAbsent(planId, list);
if (existing != null) {
list = existing;
BatchBuilder<K, T> builder = _batches.get(planId);
if (builder == null) {
builder = Batch.builder();
BatchBuilder<K, T> existingBuilder = _batches.putIfAbsent(planId, builder);
if (existingBuilder != null) {
builder = existingBuilder;
}
}
list.add((InternalBatchEntry<K, Object>) new InternalBatchEntry<>(ctx.getShallowTraceBuilder(),
batchEntry.getKey(), batchEntry.getPromise()));

return batchEntry.getPromise();
builder.add(key, ctx.getShallowTraceBuilder(), result);
return result;
});
}

void handleBatch(final PlanContext planContext) {
final List<InternalBatchEntry<K, Object>> list = batches.remove(planContext.getId());
if (list != null) {
try {
list.stream().collect(Collectors.groupingBy(entry -> groupForKey(entry.getKey())))
.forEach((group, batch) -> handleGroupBatch(group, batch, planContext));
} catch (Throwable t) {
failAll(list, t);
}
}
}

private void failAll(final List<InternalBatchEntry<K, Object>> list, final Throwable t) {
for (InternalBatchEntry<K, Object> entry: list) {
entry.getPromise().fail(t);
}
}
private Task<?> taskForBatch(final Batch<K, T> batch) {
return Task.async("batch", ctx -> {
final SettablePromise<T> result = Promises.settable();
final PromiseListener<T> countDownListener =
new CountDownPromiseListener<T>(batch.size(), result, null);

void handleGroupBatch(final G group, final List<InternalBatchEntry<K, Object>> batch, final PlanContext planContext) {
try {
Task<?> task = taskForBatch(group, batch);
//one of the batchable tasks becomes a parent of batch task
//all other tasks become potential parents
final TraceBuilder traceBuilder = ctx.getTraceBuilder();
Relationship rel = Relationship.PARENT_OF;
for (InternalBatchEntry<K, ?> entry : batch) {
planContext.getRelationshipsBuilder().addRelationship(rel,
entry.getShallowTraceBuilder(), task.getShallowTraceBuilder());
for (BatchEntry<T> entry : batch.entries()) {
traceBuilder.addRelationship(rel,
entry.getShallowTraceBuilder(), ctx.getShallowTraceBuilder());
rel = Relationship.POTENTIAL_PARENT_OF;
entry.getPromise().addListener(countDownListener);
}
new ContextImpl(planContext, task).runTask();
} catch (Throwable t) {
failAll(batch, t);
}
}

public abstract G groupForKey(K key);
executeBatch(batch);

public abstract Task<?> taskForBatch(G group, List<? extends BatchEntry<K, Object>> batch);

private static class InternalBatchEntry<K, T> extends BatchEntry<K, T> {

private final ShallowTraceBuilder _shallowTraceBuilder;
return result;
});
}

public InternalBatchEntry(ShallowTraceBuilder shallowTraceBuilder, K key, SettablePromise<T> promise) {
super(key, promise);
_shallowTraceBuilder = shallowTraceBuilder;
private Task<?> taskForBatches(Collection<Batch<K, T>> batches) {
if (batches.size() == 1) {
return taskForBatch(batches.iterator().next());
} else {
return Tasks.par(batches.stream().map(this::taskForBatch).collect(Collectors.toList()));
}
}

public ShallowTraceBuilder getShallowTraceBuilder() {
return _shallowTraceBuilder;
void handleBatch(final PlanContext planContext) {
final Batch<K, T> batch = _batches.remove(planContext.getId()).build();
if (batch.size() > 0) {
try {
final Collection<Batch<K, T>> batches = split(batch);
if (batches.size() > 0) {
final Task<?> task = taskForBatches(batches);
new ContextImpl(planContext, task).runTask();
}
} catch (Throwable t) {
//we don't care if some of promises have already been completed
//all we care is that all remaining promises have been failed
batch.failAllRemaining(t);
}
}
}

public abstract void executeBatch(Batch<K, T> batch);

public Collection<Batch<K, T>> split(Batch<K, T> batch) {
return Collections.singleton(batch);
}

}
@@ -1,46 +1,35 @@
/* $Id$ */
package com.linkedin.parseq.example.batching;

import java.util.List;

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.batching.BatchEntry;
import com.linkedin.parseq.batching.Batch;
import com.linkedin.parseq.batching.BatchingStrategy;
import com.linkedin.parseq.batching.BatchingSupport;
import com.linkedin.parseq.example.common.AbstractExample;
import com.linkedin.parseq.example.common.ExampleUtil;
import com.linkedin.parseq.promise.Promises;


/**
* @author Jaroslaw Odzga (jodzga@linkedin.com)
*/
public class BatchingClientExample extends AbstractExample {

public static class ExampleBatchingStrategy extends BatchingStrategy<Long, Long> {
public static class ExampleBatchingStrategy extends BatchingStrategy<Long, String> {

@Override
public Long groupForKey(Long key) {
//lets say all tasks can be grouped into one batch
return 0L;
public void executeBatch(Batch<Long, String> batch) {
System.out.println("batch: " + batch);
batch.foreach((key, promise) -> promise.done("value for id + " + key));
}

@Override
public Task<?> taskForBatch(final Long group, final List<? extends BatchEntry<Long, Object>> batch) {
return Task.action("batchExecute", () -> {
System.out.println("batch: " + batch);
batch.forEach(entry -> entry.getPromise().done("value for id + " + entry.getKey()));
});
}
}


final ExampleBatchingStrategy batchingStrategy = new ExampleBatchingStrategy();

Task<String> batchableTask(final Long id) {
return batchingStrategy.batchable("fetch id: " + id, () -> {
return new BatchEntry<Long, String>(id, Promises.settable());
});
return batchingStrategy.batchable("fetch id: " + id, id);
}

@Override
Expand Down

0 comments on commit ce2ae63

Please sign in to comment.