Skip to content

Commit

Permalink
feat(sink): batch add data to kinesis and async wait for delivery (#1…
Browse files Browse the repository at this point in the history
…7091) (#17195)

Co-authored-by: William Wen <[email protected]>
  • Loading branch information
github-actions[bot] and wenym1 authored Jun 11, 2024
1 parent edbc19e commit 001a5dd
Showing 1 changed file with 85 additions and 30 deletions.
115 changes: 85 additions & 30 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::collections::HashMap;

use anyhow::{anyhow, Context};
use aws_sdk_kinesis::operation::put_record::PutRecordOutput;
use aws_sdk_kinesis::operation::put_records::builders::PutRecordsFluentBuilder;
use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::types::PutRecordsRequestEntry;
use aws_sdk_kinesis::Client as KinesisClient;
use futures::{FutureExt, TryFuture};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
Expand Down Expand Up @@ -70,6 +72,8 @@ impl TryFrom<SinkParam> for KinesisSink {
}
}

const KINESIS_SINK_MAX_PENDING_CHUNK_NUM: usize = 64;

impl Sink for KinesisSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = AsyncTruncateLogSinkerOf<KinesisSinkWriter>;
Expand Down Expand Up @@ -125,7 +129,7 @@ impl Sink for KinesisSink {
self.sink_from_name.clone(),
)
.await?
.into_log_sinker(usize::MAX))
.into_log_sinker(KINESIS_SINK_MAX_PENDING_CHUNK_NUM))
}
}

Expand All @@ -148,12 +152,13 @@ impl KinesisSinkConfig {
pub struct KinesisSinkWriter {
pub config: KinesisSinkConfig,
formatter: SinkFormatterImpl,
payload_writer: KinesisSinkPayloadWriter,
client: KinesisClient,
}

struct KinesisSinkPayloadWriter {
client: KinesisClient,
config: KinesisSinkConfig,
// builder should always be `Some`. Making it an option so that we can call
// builder methods that take the builder ownership as input and return with a new builder.
builder: Option<PutRecordsFluentBuilder>,
}

impl KinesisSinkWriter {
Expand Down Expand Up @@ -182,29 +187,57 @@ impl KinesisSinkWriter {
Ok(Self {
config: config.clone(),
formatter,
payload_writer: KinesisSinkPayloadWriter { client, config },
client,
})
}

fn new_payload_writer(&self) -> KinesisSinkPayloadWriter {
let builder = self
.client
.put_records()
.stream_name(&self.config.common.stream_name);
KinesisSinkPayloadWriter {
builder: Some(builder),
}
}
}

pub type KinesisSinkPayloadWriterDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static;

impl KinesisSinkPayloadWriter {
async fn put_record(&self, key: &str, payload: Vec<u8>) -> Result<PutRecordOutput> {
let payload = Blob::new(payload);
// todo: switch to put_records() for batching
Retry::spawn(
ExponentialBackoff::from_millis(100).map(jitter).take(3),
|| async {
self.client
.put_record()
.stream_name(&self.config.common.stream_name)
fn put_record(&mut self, key: String, payload: Vec<u8>) {
self.builder = Some(
self.builder.take().expect("should not be None").records(
PutRecordsRequestEntry::builder()
.partition_key(key)
.data(payload.clone())
.send()
.await
},
)
.await
.with_context(|| format!("failed to put record to {}", self.config.common.stream_name))
.map_err(SinkError::Kinesis)
.data(Blob::new(payload))
.build()
.expect("should not fail because we have set `data` and `partition_key`"),
),
);
}

fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture {
async move {
let builder = self.builder.expect("should not be None");
let context_fmt = format!(
"failed to put record to {}",
builder
.get_stream_name()
.as_ref()
.expect("should have set stream name")
);
Retry::spawn(
ExponentialBackoff::from_millis(100).map(jitter).take(3),
|| builder.clone().send(),
)
.await
.with_context(|| context_fmt.clone())
.map_err(SinkError::Kinesis)?;
Ok(())
}
.boxed()
}
}

Expand All @@ -214,24 +247,46 @@ impl FormattedSink for KinesisSinkPayloadWriter {

async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
self.put_record(
&k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
k.ok_or_else(|| SinkError::Kinesis(anyhow!("no key provided")))?,
v.unwrap_or_default(),
)
.await
.map(|_| ())
);
Ok(())
}
}

impl AsyncTruncateSinkWriter for KinesisSinkWriter {
type DeliveryFuture = KinesisSinkPayloadWriterDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
let mut payload_writer = self.new_payload_writer();
dispatch_sink_formatter_str_key_impl!(
&self.formatter,
formatter,
self.payload_writer.write_chunk(chunk, formatter).await
)
payload_writer.write_chunk(chunk, formatter).await
)?;

add_future
.add_future_may_await(payload_writer.finish())
.await?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use aws_sdk_kinesis::types::PutRecordsRequestEntry;
use aws_smithy_types::Blob;

#[test]
fn test_kinesis_entry_builder_save_unwrap() {
PutRecordsRequestEntry::builder()
.data(Blob::new(b"data"))
.partition_key("partition-key")
.build()
.unwrap();
}
}

0 comments on commit 001a5dd

Please sign in to comment.