-
Notifications
You must be signed in to change notification settings - Fork 580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): implement snowflake sink #15429
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4863 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2101 | 1 | 2761 | 0 |
Click to see the invalid file list
- src/connector/src/sink/snowflake_connector.rs
Also need to add an example to integration_tests |
problem is - we always need an external s3 bucket during any potential use-case (including integration tests), plus a valid snowflake account for the final sink part. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
I think log queue, i.e. buffering data in multiple epochs before commit, is a must-have thing for Snowflake. Let's do it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm as the first version. please fix the test also.
Ok ok, that can be done without integrating it into our ci, just as an example |
src/connector/src/sink/snowflake.rs
Outdated
|
||
/// Construct the *unique* file suffix for the sink | ||
fn file_suffix(&self) -> String { | ||
format!("{}_{}", self.epoch, self.sink_file_suffix) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sink_file_suffix
is local to each parallelism of snowflake writer, so there might be a same sink_file_suffix across multiple parallelisms. Do we need to ensure uniqueness across multiple parallelisms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as long as epoch
remains different, everthing should be fine - actually I'm thinking to only include the epoch
as the unique identifier for the s3 intermediate file(s), the sink_file_suffix
does not really matter, cc @xxhZs @fuyufjh.
refer: #15429 (comment)
plus, the context for this uniqueness is due to snowflake pipe - which will implicitly refuse the sink request (i.e., from insertFiles
) to the intermediate s3 file with the same name, even if the status code being returned is 200
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But different parallelisms share the same epochs. Let's say we are now at epoch 233, and we have two sink parallelisms. The first file name in this epoch of both executors are both <s3_path>/233-0
. In this case, will the later written one overwrite the firstly written one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overwritten will not happen, but it's even worse 🫠 - the latter one upload the data to the same file on external S3, and we trigger the insertFiles
request to snowflake pipe; but the file name remains the same, snowflake then implicitly treats this as redundant, and the new version will not be loaded into the pipe, which means the data in the latter file simply gets lost...🙃
Thus, we need another identifier for the parallel writer(s) scenario - any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uuid is fine. To help debug we can use <epoch>-<uuid>
as the file name.
return Ok(()); | ||
} | ||
// first sink to the external stage provided by user (i.e., s3) | ||
self.s3_client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use streaming upload instead of buffering the data by ourselves? We can use the streaming upload of opendal or the streaming upload implemented by ourselves with the aws sdk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep I think streaming upload is possible - a possible implementation would probably be something like this: https://gist.github.com/ivormetcalf/f2b8e6abfece4328c86ad1ee34363caf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually there is no need to reimplement it again. In our object store crate, we have implemented streaming upload for both aws s3 sdk and opendal. For simplicity we can use opendal. You may see the implementation in the following code.
async fn streaming_upload(&self, path: &str) -> ObjectResult<BoxedStreamingUploader> { |
risingwave/src/object_store/src/object/s3.rs
Line 340 in 5fe4222
async fn streaming_upload(&self, path: &str) -> ObjectResult<BoxedStreamingUploader> { |
@@ -32,6 +32,8 @@ pub mod nats; | |||
pub mod pulsar; | |||
pub mod redis; | |||
pub mod remote; | |||
pub mod snowflake; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May have a separate folder snowflake
to hold the files related to snowflake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently we only have snowflake.rs
for the core sinking logic, plus snowflake_connector.rs
for the helper clients (i.e., rest api client, s3 client) implementations - let's keep it simple at present, and move things around when it gets bigger in the future.
const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; | ||
|
||
/// The helper function to generate the s3 file name | ||
fn generate_s3_file_name(s3_path: Option<String>, suffix: String) -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that we implemented the functionality of writing to snowflake from scratch. If there is no other implementation from other repo, in the future we may move the logic here to a separate repo specially for snowflake, so that users in snowflake community can reuse our implementation.
} | ||
|
||
async fn barrier(&mut self, _is_checkpoint: bool) -> Result<Self::CommitMetadata> { | ||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We can following the similar implementation from iceberg #15634
Agree. It's hard to make it work in CI because Snowflake is a SaaS service. An example is good enough in this cases. |
I'll add a detailed spec / example afterwards, any suggestion on where to put it? (e.g., |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Detailed spec will be added when finalizing this PR.
The general sink logic for
snowflake-sink
is a little bit different from others, since snowflake only support sink (e.g.,insertFiles
REST API) from three external stage storages (e.g., aws, azure, and gcp), then we must somehow first upload the corresponding data files to an user provided s3 bucket, and then trigger the snowflake pipe to copy from that specific external staged storage.To keep everthing simple at present, we only support amazon s3 as an external staged storage.
For detailed snowpipe workflow, please refer to: https://docs.snowflake.com/user-guide/data-load-snowpipe-rest-overview.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Detailed spec will be updated in Notion later.