Skip to content

Commit

Permalink
Added:
Browse files Browse the repository at this point in the history
- max batch size parameter with early running full batches
- dry run mode that only simulates batching
- metrics:
 - batch size for strategy
 - aggregation time for strategy
 - batch size per endpoint in parseq restli client
  • Loading branch information
jodzga committed Mar 22, 2016
1 parent d304af3 commit 8d7a365
Show file tree
Hide file tree
Showing 17 changed files with 349 additions and 92 deletions.
5 changes: 5 additions & 0 deletions contrib/parseq-batching/pom.xml
Expand Up @@ -25,6 +25,11 @@
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
<version>2.1.6</version>
</dependency>
</dependencies>

<distributionManagement>
Expand Down
Expand Up @@ -83,7 +83,7 @@ public interface Batch<K, T> {

Set<Map.Entry<K, BatchEntry<T>>> entries();

static <K, T> BatchBuilder<K, T> builder() {
return new BatchBuilder<>();
static <K, T> BatchBuilder<K, T> builder(int maxSize, BatchAggregationTimeMetric batchAggregationTimeMetric) {
return new BatchBuilder<>(maxSize, batchAggregationTimeMetric);
}
}
@@ -0,0 +1,60 @@
package com.linkedin.parseq.batching;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchAggregationTimeMetric {

private static final Logger LOGGER = LoggerFactory.getLogger(BatchAggregationTimeMetric.class);

private static final long LOWEST_DISCERNIBLE_VALUE = 1;
private static final long HIGHEST_TRACKABLE_VALUE = TimeUnit.HOURS.toNanos(1);
private static final int NUMBER_OF_FIGNIFICANT_VALUE_DIGITS = 3;

private final Recorder _recorder =
new Recorder(LOWEST_DISCERNIBLE_VALUE, HIGHEST_TRACKABLE_VALUE, NUMBER_OF_FIGNIFICANT_VALUE_DIGITS);

private Histogram _recycle;

/**
* Records a batch aggregation time.
* This method is thread safe.
* @param batchAggregationTimeNano batch aggregation time
*/
public void record(long batchAggregationTimeNano) {
recordSafeValue(narrow(batchAggregationTimeNano));
}

private long narrow(long batchAggregationTimeNano) {
if (batchAggregationTimeNano < LOWEST_DISCERNIBLE_VALUE) {
LOGGER.warn("batch aggregation time lower than expected: " + batchAggregationTimeNano + ", recording as: " + LOWEST_DISCERNIBLE_VALUE);
return LOWEST_DISCERNIBLE_VALUE;
}
if (batchAggregationTimeNano > HIGHEST_TRACKABLE_VALUE) {
LOGGER.warn("batch aggregation time greater than expected: " + batchAggregationTimeNano + ", recording as: " + HIGHEST_TRACKABLE_VALUE);
return HIGHEST_TRACKABLE_VALUE;
}
return batchAggregationTimeNano;
}

private void recordSafeValue(long batchAggregationTimeNano) {
_recorder.recordValue(batchAggregationTimeNano);
}

/**
* Allows consuming histogram.
* Histogram passed to the consumer includes stable, consistent view
* of all values accumulated since last harvest.
* This method is thread safe.
* @param consumer
*/
public synchronized void harvest(Consumer<Histogram> consumer) {
_recycle = _recorder.getIntervalHistogram(_recycle);
consumer.accept(_recycle);
}
}
Expand Up @@ -9,6 +9,7 @@
import java.util.Set;
import java.util.function.BiConsumer;

import com.linkedin.parseq.internal.ArgumentUtil;
import com.linkedin.parseq.promise.PromiseResolvedException;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
Expand Down Expand Up @@ -69,6 +70,7 @@ public static class BatchEntry<T> {

private final SettablePromise<T> _promise;
private final List<ShallowTraceBuilder> _shallowTraceBuilders = new ArrayList<>();
private final long _creationTimeNano = System.nanoTime();

public BatchEntry(ShallowTraceBuilder shallowTraceBuilder, SettablePromise<T> promise) {
_promise = promise;
Expand All @@ -90,18 +92,28 @@ void addShallowTraceBuilder(final ShallowTraceBuilder shallowTraceBuilder) {
void addShallowTraceBuilders(final List<ShallowTraceBuilder> shallowTraceBuilders) {
_shallowTraceBuilders.addAll(shallowTraceBuilders);
}

}

static class BatchBuilder<K, T> {

private final Map<K, BatchEntry<T>> _map = new HashMap<>();
private Batch<K, T> _batch = null;
private final int _maxSize;
private final BatchAggregationTimeMetric _batchAggregationTimeMetric;

public BatchBuilder(int maxSize, BatchAggregationTimeMetric batchAggregationTimeMetric) {
ArgumentUtil.requirePositive(maxSize, "max batch size");
_maxSize = maxSize;
_batchAggregationTimeMetric = batchAggregationTimeMetric;
}

BatchBuilder<K, T> add(K key, BatchEntry<T> entry) {
if (_batch != null) {
throw new IllegalStateException("BatchBuilder has already been used to build a batch");
}
if (isFull()) {
throw new IllegalStateException("BatchBuilder is full, max size: " + _maxSize);
}
//deduplication
BatchEntry<T> duplicate = _map.get(key);
if (duplicate != null) {
Expand All @@ -117,9 +129,17 @@ BatchBuilder<K, T> add(K key, ShallowTraceBuilder traceBuilder, SettablePromise<
return add(key, new BatchEntry<>(traceBuilder, promise));
}

public boolean isFull() {
return _map.size() == _maxSize;
}

public Batch<K, T> build() {
if (_batch == null) {
final long _currentTimeNano = System.nanoTime();
_map.values().forEach(entry -> {
final long time = _currentTimeNano - entry._creationTimeNano;
_batchAggregationTimeMetric.record(time > 0 ? time : 0);
});
_batch = new BatchImpl<>(_map);
}
return _batch;
Expand Down
@@ -0,0 +1,59 @@
package com.linkedin.parseq.batching;

import java.util.function.Consumer;

import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchSizeMetric {

private static final Logger LOGGER = LoggerFactory.getLogger(BatchSizeMetric.class);

private static final int LOWEST_DISCERNIBLE_VALUE = 1;
private static final int HIGHEST_TRACKABLE_VALUE = 10_000;
private static final int NUMBER_OF_FIGNIFICANT_VALUE_DIGITS = 3;

private final Recorder _recorder =
new Recorder(LOWEST_DISCERNIBLE_VALUE, HIGHEST_TRACKABLE_VALUE, NUMBER_OF_FIGNIFICANT_VALUE_DIGITS);

private Histogram _recycle;

/**
* Records a batch size.
* This method is thread safe.
* @param batchSize batch size
*/
public void record(int batchSize) {
recordSafeValue(narrow(batchSize));
}

private int narrow(int batchSize) {
if (batchSize < LOWEST_DISCERNIBLE_VALUE) {
LOGGER.warn("batch size lower than expected: " + batchSize + ", recording as: " + LOWEST_DISCERNIBLE_VALUE);
return LOWEST_DISCERNIBLE_VALUE;
}
if (batchSize > HIGHEST_TRACKABLE_VALUE) {
LOGGER.warn("batch size greater than expected: " + batchSize + ", recording as: " + HIGHEST_TRACKABLE_VALUE);
return HIGHEST_TRACKABLE_VALUE;
}
return batchSize;
}

private void recordSafeValue(int batchSize) {
_recorder.recordValue(batchSize);
}

/**
* Allows consuming histogram.
* Histogram passed to the consumer includes stable, consistent view
* of all values accumulated since last harvest.
* This method is thread safe.
* @param consumer
*/
public synchronized void harvest(Consumer<Histogram> consumer) {
_recycle = _recorder.getIntervalHistogram(_recycle);
consumer.accept(_recycle);
}
}

0 comments on commit 8d7a365

Please sign in to comment.