Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- max batch size parameter with early running full batches - dry run mode that only simulates batching - metrics: - batch size for strategy - aggregation time for strategy - batch size per endpoint in parseq restli client
- Loading branch information
jodzga
committed
Mar 22, 2016
1 parent
d304af3
commit 8d7a365
Showing
17 changed files
with
349 additions
and
92 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 60 additions & 0 deletions
60
...arseq-batching/src/main/java/com/linkedin/parseq/batching/BatchAggregationTimeMetric.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package com.linkedin.parseq.batching; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Consumer; | ||
|
||
import org.HdrHistogram.Histogram; | ||
import org.HdrHistogram.Recorder; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class BatchAggregationTimeMetric { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(BatchAggregationTimeMetric.class); | ||
|
||
private static final long LOWEST_DISCERNIBLE_VALUE = 1; | ||
private static final long HIGHEST_TRACKABLE_VALUE = TimeUnit.HOURS.toNanos(1); | ||
private static final int NUMBER_OF_FIGNIFICANT_VALUE_DIGITS = 3; | ||
|
||
private final Recorder _recorder = | ||
new Recorder(LOWEST_DISCERNIBLE_VALUE, HIGHEST_TRACKABLE_VALUE, NUMBER_OF_FIGNIFICANT_VALUE_DIGITS); | ||
|
||
private Histogram _recycle; | ||
|
||
/** | ||
* Records a batch aggregation time. | ||
* This method is thread safe. | ||
* @param batchAggregationTimeNano batch aggregation time | ||
*/ | ||
public void record(long batchAggregationTimeNano) { | ||
recordSafeValue(narrow(batchAggregationTimeNano)); | ||
} | ||
|
||
private long narrow(long batchAggregationTimeNano) { | ||
if (batchAggregationTimeNano < LOWEST_DISCERNIBLE_VALUE) { | ||
LOGGER.warn("batch aggregation time lower than expected: " + batchAggregationTimeNano + ", recording as: " + LOWEST_DISCERNIBLE_VALUE); | ||
return LOWEST_DISCERNIBLE_VALUE; | ||
} | ||
if (batchAggregationTimeNano > HIGHEST_TRACKABLE_VALUE) { | ||
LOGGER.warn("batch aggregation time greater than expected: " + batchAggregationTimeNano + ", recording as: " + HIGHEST_TRACKABLE_VALUE); | ||
return HIGHEST_TRACKABLE_VALUE; | ||
} | ||
return batchAggregationTimeNano; | ||
} | ||
|
||
private void recordSafeValue(long batchAggregationTimeNano) { | ||
_recorder.recordValue(batchAggregationTimeNano); | ||
} | ||
|
||
/** | ||
* Allows consuming histogram. | ||
* Histogram passed to the consumer includes stable, consistent view | ||
* of all values accumulated since last harvest. | ||
* This method is thread safe. | ||
* @param consumer | ||
*/ | ||
public synchronized void harvest(Consumer<Histogram> consumer) { | ||
_recycle = _recorder.getIntervalHistogram(_recycle); | ||
consumer.accept(_recycle); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
59 changes: 59 additions & 0 deletions
59
contrib/parseq-batching/src/main/java/com/linkedin/parseq/batching/BatchSizeMetric.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
package com.linkedin.parseq.batching; | ||
|
||
import java.util.function.Consumer; | ||
|
||
import org.HdrHistogram.Histogram; | ||
import org.HdrHistogram.Recorder; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class BatchSizeMetric { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(BatchSizeMetric.class); | ||
|
||
private static final int LOWEST_DISCERNIBLE_VALUE = 1; | ||
private static final int HIGHEST_TRACKABLE_VALUE = 10_000; | ||
private static final int NUMBER_OF_FIGNIFICANT_VALUE_DIGITS = 3; | ||
|
||
private final Recorder _recorder = | ||
new Recorder(LOWEST_DISCERNIBLE_VALUE, HIGHEST_TRACKABLE_VALUE, NUMBER_OF_FIGNIFICANT_VALUE_DIGITS); | ||
|
||
private Histogram _recycle; | ||
|
||
/** | ||
* Records a batch size. | ||
* This method is thread safe. | ||
* @param batchSize batch size | ||
*/ | ||
public void record(int batchSize) { | ||
recordSafeValue(narrow(batchSize)); | ||
} | ||
|
||
private int narrow(int batchSize) { | ||
if (batchSize < LOWEST_DISCERNIBLE_VALUE) { | ||
LOGGER.warn("batch size lower than expected: " + batchSize + ", recording as: " + LOWEST_DISCERNIBLE_VALUE); | ||
return LOWEST_DISCERNIBLE_VALUE; | ||
} | ||
if (batchSize > HIGHEST_TRACKABLE_VALUE) { | ||
LOGGER.warn("batch size greater than expected: " + batchSize + ", recording as: " + HIGHEST_TRACKABLE_VALUE); | ||
return HIGHEST_TRACKABLE_VALUE; | ||
} | ||
return batchSize; | ||
} | ||
|
||
private void recordSafeValue(int batchSize) { | ||
_recorder.recordValue(batchSize); | ||
} | ||
|
||
/** | ||
* Allows consuming histogram. | ||
* Histogram passed to the consumer includes stable, consistent view | ||
* of all values accumulated since last harvest. | ||
* This method is thread safe. | ||
* @param consumer | ||
*/ | ||
public synchronized void harvest(Consumer<Histogram> consumer) { | ||
_recycle = _recorder.getIntervalHistogram(_recycle); | ||
consumer.accept(_recycle); | ||
} | ||
} |
Oops, something went wrong.