-
Notifications
You must be signed in to change notification settings - Fork 295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Support creation and reading of StructuredDataset with local or remote uri #2914
base: master
Are you sure you want to change the base?
[BUG] Support creation and reading of StructuredDataset with local or remote uri #2914
Conversation
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
this is awesome |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is amazing, @JiangJiaWei1103
I am testing this.
def _set_literal(self, ctx: FlyteContext, expected: LiteralType) -> None: | ||
""" | ||
Explicitly set the StructuredDataset Literal to handle the following cases: | ||
|
||
1. Read a dataframe from a StructuredDataset with an uri, for example: | ||
|
||
@task | ||
def return_sd() -> StructuredDataset: | ||
sd = StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet") | ||
df = sd.open(pd.DataFrame).all() | ||
return df | ||
|
||
For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5954. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice comment.
to_literal = loop_manager.synced(flyte_dataset_transformer.async_to_literal) | ||
self._literal_sd = to_literal(ctx, self, StructuredDataset, expected).scalar.structured_dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if is here the best way to write it.
cc @wild-endeavor @thomasjpfan
@task | ||
def upload_pqt_to_s3(local_path: str, remote_path: str) -> None: | ||
"""Upload local temp parquet file to s3 object storage""" | ||
with tempfile.TemporaryDirectory() as tmp_dir: | ||
fs = FileAccessProvider( | ||
local_sandbox_dir=tmp_dir, | ||
raw_output_prefix="s3://my-s3-bucket" | ||
) | ||
fs.upload(local_path, remote_path) | ||
|
||
@task | ||
def read_sd_from_uri(uri: str) -> pd.DataFrame: | ||
sd = StructuredDataset(uri=uri, file_format="parquet") | ||
df = sd.open(pd.DataFrame).all() | ||
|
||
return df | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @JiangJiaWei1103
you should put these tasks in a workflow,
which is closer to the reality that users use it.
Tracking issue
Closes #5954.
Why are the changes needed?
When
StructuredDataset
is instantiated with anuri
, the operation of directly reading the dataframe fails. There exist two cases to discuss:StructuredDataset
with a local file path asuri
:StructuredDataset
with a remote file path (e.g., s3 object storage) asuri
:Both cases should successfully read and return
pd.DataFrame
.What changes were proposed in this pull request?
As commented here, users have no options to set
_literal_sd
inStructuredDataset
. To prevent_literal_sd
from beingNoneType
, we useStructuredDatasetTransformerEngine
to construct aStructuredDataset
literal and assign it to_literal_sd
ofStructuredDataset
type.How was this patch tested?
This patch is tested through a newly added unit test.
Setup process
Check all the applicable boxes
Related PRs
Docs link