Skip to content

Commit

Permalink
Finish off decorator pattern for KafkaBuffer
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Aug 31, 2023
1 parent 07340ed commit 8da8cc3
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected int getRecordsInFlight() {
*
* @param recordsInBuffer the current number of records in the buffer
*/
protected void postProcess(final Long recordsInBuffer) {
public void postProcess(final Long recordsInBuffer) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private T pollForBufferEntry(final int timeoutValue, final TimeUnit timeoutUnit)
}

@Override
protected void postProcess(final Long recordsInBuffer) {
public void postProcess(final Long recordsInBuffer) {
// adding bounds to address race conditions and reporting negative buffer usage
final Double nonNegativeTotalRecords = recordsInBuffer.doubleValue() < 0 ? 0 : recordsInBuffer.doubleValue();
final Double boundedTotalRecords = nonNegativeTotalRecords > bufferSize ? bufferSize : nonNegativeTotalRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public static PluginSetting getDefaultPluginSettings() {
}

@Override
protected void postProcess(final Long recordsInBuffer) {
public void postProcess(final Long recordsInBuffer) {
// adding bounds to address race conditions and reporting negative buffer usage
final Double nonNegativeTotalRecords = recordsInBuffer.doubleValue() < 0 ? 0 : recordsInBuffer.doubleValue();
final Double boundedTotalRecords = nonNegativeTotalRecords > bufferCapacity ? bufferCapacity : nonNegativeTotalRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.AbstractBuffer;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import com.google.common.util.concurrent.AtomicDouble;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSinkConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
Expand All @@ -38,8 +36,8 @@
@DataPrepperPlugin(name = "kafka_buffer", pluginType = Buffer.class, pluginConfigurationType = KafkaSinkConfig.class)
public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer.class);
private static final String BUFFER_USAGE_METRIC = "bufferUsage";
private final AtomicDouble bufferUsage;
private static final int BUFFER_SIZE = 1000000;
private static final int BATCH_SIZE = 250000;
private final AbstractBuffer innerBuffer;
private final KafkaSinkProducer producer;

Expand All @@ -48,11 +46,9 @@ public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaSinkConfig kafkaSinkConfig, final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager, final PipelineDescription pipelineDescription,
final PluginMetrics pluginMetrics) {
final AcknowledgementSetManager acknowledgementSetManager, final PluginMetrics pluginMetrics) {
super(pluginSetting);
bufferUsage = pluginMetrics.gauge(BUFFER_USAGE_METRIC, new AtomicDouble());
this.innerBuffer = new BlockingBuffer<>(pluginSetting);
this.innerBuffer = new BlockingBuffer<>(BUFFER_SIZE, BATCH_SIZE, pluginSetting.getPipelineName());

final KafkaSinkProducerFactory kafkaSinkProducerFactory = new KafkaSinkProducerFactory();
this.producer = kafkaSinkProducerFactory.createProducer(kafkaSinkConfig, pluginFactory, pluginSetting, null, null);
Expand Down Expand Up @@ -85,18 +81,18 @@ public Map.Entry<Collection<T>, CheckpointState> doRead(int timeoutInMillis) {
}

@Override
protected void postProcess(final Long recordsInBuffer) {

public void postProcess(final Long recordsInBuffer) {
innerBuffer.postProcess(recordsInBuffer);
}

@Override
public void doCheckpoint(final CheckpointState checkpointState) {

innerBuffer.doCheckpoint(checkpointState);
}

@Override
public boolean isEmpty() {
return getRecordsInFlight() == 0;
return innerBuffer.isEmpty();
}

private KafkaSourceConfig convertSinkConfigToSourceConfig(final KafkaSinkConfig kafkaSinkConfig) {
Expand Down

0 comments on commit 8da8cc3

Please sign in to comment.