Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the Flyte agent to provision and manage K8s (data) service for deep learning (GNN) use cases #3004

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

shuyingliang
Copy link

@shuyingliang shuyingliang commented Dec 14, 2024

Why are the changes needed?

Graph Neural Networks are critical for understanding complex relationships across LinkedIn's professional networks. However, training these models at scale involves intricate data loading, sampling, and processing across multiple nodes and GPUs. The missing piece is the infrastructure to support how and where to run these Kubernetes data services, making them scalable and reliable along with the training or inference processes.

To simplify the complex orchestration pipeline, we decided to leverage flyte agent framework to provision and manage the data services for GNN use case.

What changes were proposed in this pull request?

This PR adds the flyte agent to create/update/delete the K8s statefulset and service.

How was this patch tested?

  • The same code (with removed company related internal environments and set up) has been running in production along with the training job MPIJobs (for deep learning GNN training) or TFJob (for offline inference)
  • This is also tested in local sandbox

Setup process

pip install flytekitplugins-k8sdataservice

Screenshots

Screenshot 2024-11-11 at 3 48 18 PM

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Docs link

Blog from Flyte community sync

Summary by Bito

This PR introduces a new K8s data service plugin for Flyte, specifically designed to support GNN training workloads. The implementation includes a DataServiceAgent for managing K8s resources (StatefulSets and Services), a CleanupSensor for resource cleanup, and comprehensive resource management functionality. The changes are accompanied by extensive unit tests and documentation.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

Copy link

codecov bot commented Dec 14, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 90.46%. Comparing base (f99d50e) to head (944a500).
Report is 4 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #3004       +/-   ##
===========================================
+ Coverage   51.08%   90.46%   +39.38%     
===========================================
  Files         201      100      -101     
  Lines       21231     4920    -16311     
  Branches     2731        0     -2731     
===========================================
- Hits        10846     4451     -6395     
+ Misses       9787      469     -9318     
+ Partials      598        0      -598     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@pingsutw pingsutw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing!!! leave some minor comments

@shuyingliang shuyingliang force-pushed the shuliang/k8sdataservice branch 6 times, most recently from a0c5d8e to ec6d4c1 Compare December 20, 2024 05:10
@@ -0,0 +1,86 @@
# Example of the role/binding set up for the data service to create/update/delete resources in the sandbox flyte namespace
apiVersion: rbac.authorization.k8s.io/v1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding this file to flytekit, could we add this to the agent setup guide? https://docs.flyte.org/en/latest/deployment/agents/databricks.html#deployment-agent-setup-databricks

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the documentation in in flyte repo, https://github.com/flyteorg/flyte/tree/master/docs/deployment/agents
I will remove the set up guide here and create new branch to flyte repo.

@shuyingliang shuyingliang force-pushed the shuliang/k8sdataservice branch from a2e628f to 43e2733 Compare January 11, 2025 03:44
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Code Review Agent Run #a24be6

Actionable Suggestions - 16
  • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_agent.py - 2
  • flytekit/image_spec/default_builder.py - 2
    • Consider adding uv.lock validation checks · Line 179-182
    • Consider validating pyproject.toml file existence · Line 216-217
  • flytekit/core/environment.py - 1
    • Consider splitting complex call method · Line 67-86
  • tests/flytekit/unit/core/test_environment.py - 1
    • Consider adding assertions to test case · Line 74-78
  • tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py - 1
    • Consider adding error handling for StructuredDataset · Line 34-34
  • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/agent.py - 1
    • Unused parameters and missing annotations · Line 28-30
  • plugins/flytekit-k8sdataservice/utils/infra.py - 1
    • Insecure hash function usage · Line 7-7
  • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/sensor.py - 1
    • Consider moving k8s client initialization · Line 27-35
  • plugins/flytekit-k8sdataservice/setup.py - 1
    • Consider pinning exact dependency versions · Line 7-7
  • flytekit/core/local_cache.py - 1
    • Consider adding error handling for serialization · Line 116-116
  • plugins/flytekit-k8sdataservice/utils/resources.py - 1
    • Consider exact key matching for mem · Line 23-23
  • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_manager.py - 1
    • Improve error handling test coverage · Line 41-46
  • flytekit/types/directory/types.py - 1
    • Consider using relative paths for portability · Line 371-373
  • flytekit/core/array_node.py - 1
    • Consider execution mode initialization removal impact · Line 64-64
Additional Suggestions - 10
  • flytekit/core/environment.py - 1
    • Consider refactoring duplicated task logic · Line 95-115
  • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py - 1
    • Improve stateful set error handling · Line 47-48
  • flytekit/image_spec/default_builder.py - 1
  • flytekit/core/local_cache.py - 1
    • Consider updating test case for consistency · Line 28-28
  • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_manager.py - 1
    • Consider adding edge cases to test · Line 91-96
  • tests/flytekit/unit/types/directory/test_listdir.py - 1
    • Consider using context manager for tempdir · Line 9-10
  • tests/flytekit/unit/core/image_spec/test_default_builder.py - 3
  • plugins/flytekit-k8sdataservice/utils/resources.py - 1
    • Consider extracting zero check utility function · Line 7-14
Review Details
  • Files reviewed - 63 · Commit Range: 158469e..43e2733
    • .pre-commit-config.yaml
    • Dockerfile.agent
    • dev-requirements.txt
    • docs/source/plugins/k8sstatefuldataservice.rst
    • flytekit/__init__.py
    • flytekit/clis/sdk_in_container/run.py
    • flytekit/core/array_node.py
    • flytekit/core/array_node_map_task.py
    • flytekit/core/environment.py
    • flytekit/core/local_cache.py
    • flytekit/core/workflow.py
    • flytekit/extend/backend/agent_service.py
    • flytekit/extend/backend/base_agent.py
    • flytekit/image_spec/default_builder.py
    • flytekit/models/core/workflow.py
    • flytekit/models/literals.py
    • flytekit/remote/remote.py
    • flytekit/tools/translator.py
    • flytekit/types/directory/types.py
    • flytekit/types/file/file.py
    • plugins/flytekit-airflow/setup.py
    • plugins/flytekit-inference/flytekitplugins/inference/__init__.py
    • plugins/flytekit-inference/flytekitplugins/inference/vllm/serve.py
    • plugins/flytekit-inference/setup.py
    • plugins/flytekit-inference/tests/test_vllm.py
    • plugins/flytekit-k8sdataservice/dev-requirements.txt
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/__init__.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/agent.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/kube_config.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/sensor.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/task.py
    • plugins/flytekit-k8sdataservice/setup.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_kube_config.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_manager.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_agent.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_sensor.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_task.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/utils/test_resources.py
    • plugins/flytekit-k8sdataservice/utils/infra.py
    • plugins/flytekit-k8sdataservice/utils/resources.py
    • plugins/flytekit-onnx-pytorch/dev-requirements.txt
    • plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py
    • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py
    • plugins/flytekit-optuna/setup.py
    • plugins/flytekit-optuna/tests/test_optimizer.py
    • plugins/flytekit-spark/tests/test_environment.py
    • plugins/setup.py
    • pyproject.toml
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/utils.py
    • tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py
    • tests/flytekit/integration/remote/workflows/basic/flytefile.py
    • tests/flytekit/integration/remote/workflows/basic/pydantic_wf.py
    • tests/flytekit/unit/core/image_spec/test_default_builder.py
    • tests/flytekit/unit/core/test_environment.py
    • tests/flytekit/unit/core/test_flyte_directory.py
    • tests/flytekit/unit/core/test_flyte_file.py
    • tests/flytekit/unit/core/test_generice_idl_type_engine.py
    • tests/flytekit/unit/core/test_local_cache.py
    • tests/flytekit/unit/core/test_type_engine.py
    • tests/flytekit/unit/core/test_workflows.py
    • tests/flytekit/unit/types/directory/test_listdir.py
  • Files skipped - 4
    • .github/workflows/pythonbuild.yml - Reason: Filter setting
    • plugins/flytekit-inference/README.md - Reason: Filter setting
    • plugins/flytekit-k8sdataservice/README.md - Reason: Filter setting
    • plugins/flytekit-optuna/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
New Feature - K8s Data Service Infrastructure for GNN Training

agent.py - Implements DataServiceAgent for managing K8s resources

manager.py - Implements K8s resource management functionality

task.py - Defines DataServiceTask and configuration structures

sensor.py - Implements CleanupSensor for resource cleanup

Testing - K8s Data Service Unit Tests

test_kube_config.py - Adds tests for KubeConfig functionality

test_manager.py - Adds comprehensive tests for K8s resource management

Documentation - K8s Data Service Documentation

k8sstatefuldataservice.rst - Adds documentation for K8s StatefulSet Data Service API

setup.py - Configures package metadata and dependencies

Other Improvements - Configuration Updates

.pre-commit-config.yaml - Updates pre-commit hooks and dependencies

Dockerfile.agent - Adds K8s data service plugin to agent image

Testing - K8s Data Service Test Coverage

test_agent.py - Adds comprehensive tests for DataServiceAgent functionality including creation, status checks and cleanup

test_task.py - Implements tests for DataServiceTask configuration and execution

test_sensor.py - Adds tests for CleanupSensor functionality and resource management

test_resources.py - Tests resource conversion and cleanup utilities

New Feature - K8s Data Service Infrastructure

infra.py - Adds infrastructure name generation utility

resources.py - Implements resource management utilities for K8s

Other Improvements - Configuration and Integration Updates

setup.py - Adds k8sdataservice plugin to package mapping

test_remote.py - Adds test for pydantic default input with map task

test_generice_idl_type_engine.py - Updates type annotations and fixes misc type issues

New Feature - K8s Data Service Infrastructure for GNN Training

agent.py - Implements DataServiceAgent for managing K8s resources

manager.py - Implements K8s resource management functionality

task.py - Defines DataServiceTask and configuration structures

sensor.py - Implements CleanupSensor for resource cleanup

infra.py - Adds infrastructure name generation utility

resources.py - Implements resource management utilities for K8s

Testing - K8s Data Service Test Suite

test_kube_config.py - Adds tests for KubeConfig functionality

test_manager.py - Adds comprehensive tests for K8s resource management

test_agent.py - Adds comprehensive tests for DataServiceAgent functionality

test_task.py - Implements tests for DataServiceTask configuration

test_sensor.py - Adds tests for CleanupSensor functionality

test_resources.py - Tests resource conversion and cleanup utilities

test_type_engine.py - Updates type annotations and fixes type-related issues

Documentation - K8s Data Service Documentation

k8sstatefuldataservice.rst - Adds documentation for K8s StatefulSet Data Service API

setup.py - Configures package metadata and dependencies

Other Improvements - Configuration and Integration Updates

.pre-commit-config.yaml - Updates pre-commit hooks and dependencies

Dockerfile.agent - Adds K8s data service plugin to agent image

setup.py - Adds k8sdataservice plugin to package mapping

test_remote.py - Adds test for pydantic default input with map task

Comment on lines 74 to 78
def test_show_environment():

env = Environment(retries=2)

env.show()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding assertions to test case

The test_show_environment() test case appears to be incomplete as it only calls show() without any assertions to verify the expected behavior. Consider adding assertions to validate the output format and content.

Code suggestion
Check the AI-generated fix before applying
 @@ -74,5 +74,10 @@
  def test_show_environment():
 
      env = Environment(retries=2)
 +    from io import StringIO
 +    import sys
 +    captured_output = StringIO()
 +    sys.stdout = captured_output
      env.show()
 +    sys.stdout = sys.__stdout__
 +    assert "retries" in captured_output.getvalue()
 +    assert "2" in captured_output.getvalue()

Code Review Run #a24be6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +28 to +30
def create(
self, task_template: TaskTemplate, output_prefix: str, inputs: Optional[LiteralMap] = None, **kwargs
) -> DataServiceMetadata:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused parameters and missing annotations

The 'create' method has unused parameters and is missing type annotations for 'kwargs'.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def create(
self, task_template: TaskTemplate, output_prefix: str, inputs: Optional[LiteralMap] = None, **kwargs
) -> DataServiceMetadata:
def create(
self, task_template: TaskTemplate, **kwargs: dict[str, Any]
) -> DataServiceMetadata:

Code Review Run #a24be6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


def gen_infra_name() -> str:
random_uuid = uuid.uuid4().hex
hash_object = hashlib.sha1(random_uuid.encode())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insecure hash function usage

The use of 'sha1' hash function is considered insecure. Consider using a more secure alternative like 'sha256' or 'sha512'.

Code suggestion
Check the AI-generated fix before applying
Suggested change
hash_object = hashlib.sha1(random_uuid.encode())
hash_object = hashlib.sha256(random_uuid.encode())

Code Review Run #a24be6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +27 to +35
self.k8s_config = KubeConfig()
self.k8s_config.load_kube_config()
self.apps_v1_api = client.AppsV1Api()
self.core_v1_api = client.CoreV1Api()
self.custom_api = client.CustomObjectsApi()
self.release_name = release_name
self.cleanup_data_service = cleanup_data_service
self.namespace = "flyte"
self.cluster = cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving k8s client initialization

Consider moving the Kubernetes client initialization to __init__ method since these configurations are used across multiple methods and don't need to be recreated on each poke call.

Code suggestion
Check the AI-generated fix before applying
          super().__init__(name=name, task_type="sensor", **kwargs)
 +        self.k8s_config = KubeConfig()
 +        self.k8s_config.load_kube_config()
 +        self.apps_v1_api = client.AppsV1Api()
 +        self.core_v1_api = client.CoreV1Api()
 +        self.custom_api = client.CustomObjectsApi()
 +        self.namespace = "flyte"
 +
  @@ -27,12 +34,6 @@
          it for simplicity. This is also why we use the sensor API to keep forward compatibility
          """
 -        self.k8s_config = KubeConfig()
 -        self.k8s_config.load_kube_config()
 -        self.apps_v1_api = client.AppsV1Api()
 -        self.core_v1_api = client.CoreV1Api()
 -        self.custom_api = client.CustomObjectsApi()
 -        self.namespace = "flyte"
          self.release_name = release_name
          self.cleanup_data_service = cleanup_data_service
          self.cluster = cluster

Code Review Run #a24be6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=1.11.0", "kubernetes>=23.6.0", "flyteidl>=1.11.0"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider pinning exact dependency versions

Consider pinning exact versions of dependencies instead of using >= to ensure reproducible builds. The current setup allows any version above the minimum which could lead to compatibility issues.

Code suggestion
Check the AI-generated fix before applying
Suggested change
plugin_requires = ["flytekit>=1.11.0", "kubernetes>=23.6.0", "flyteidl>=1.11.0"]
plugin_requires = ["flytekit>=1.11.0,<2.0.0", "kubernetes>=23.6.0,<24.0.0", "flyteidl>=1.11.0,<2.0.0"]

Code Review Run #a24be6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

input_literal_map,
cache_ignore_input_vars,
),
value.to_flyte_idl().SerializeToString(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for serialization

Consider adding error handling around to_flyte_idl().SerializeToString() call as serialization operations can potentially fail.

Code suggestion
Check the AI-generated fix before applying
Suggested change
value.to_flyte_idl().SerializeToString(),
try:
value.to_flyte_idl().SerializeToString()
except Exception as e:
logger.error(f"Failed to serialize literal map: {e}")
raise

Code Review Run #a24be6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged



def convert_flyte_to_k8s_fields(resources_dict):
return {("memory" if "mem" in k else k): v for k, v in resources_dict.items()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider exact key matching for mem

Consider using a more explicit dictionary comprehension for convert_flyte_to_k8s_fields. The current implementation using 'mem' in k could match unintended keys containing 'mem'. Consider using exact key matching.

Code suggestion
Check the AI-generated fix before applying
Suggested change
return {("memory" if "mem" in k else k): v for k, v in resources_dict.items()}
return {("memory" if k == "mem" else k): v for k, v in resources_dict.items()}

Code Review Run #a24be6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +41 to +46
@patch("flytekitplugins.k8sdataservice.k8s.manager.client.AppsV1Api.create_namespaced_stateful_set")
def test_create_stateful_set_failure(self, mock_create_namespaced_stateful_set):
mock_create_namespaced_stateful_set.side_effect = ApiException("Create failed")
stateful_set_object = self.k8s_manager.create_stateful_set_object()
response = self.k8s_manager.create_stateful_set(stateful_set_object)
self.assertEqual(response, "failed_stateful_set_name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling test coverage

The test case test_create_stateful_set_failure could be improved by verifying the error message from ApiException is properly logged. Consider asserting the logger call with the expected error message.

Code suggestion
Check the AI-generated fix before applying
Suggested change
@patch("flytekitplugins.k8sdataservice.k8s.manager.client.AppsV1Api.create_namespaced_stateful_set")
def test_create_stateful_set_failure(self, mock_create_namespaced_stateful_set):
mock_create_namespaced_stateful_set.side_effect = ApiException("Create failed")
stateful_set_object = self.k8s_manager.create_stateful_set_object()
response = self.k8s_manager.create_stateful_set(stateful_set_object)
self.assertEqual(response, "failed_stateful_set_name")
@patch("flytekitplugins.k8sdataservice.k8s.manager.logger")
def test_create_stateful_set_failure(self, mock_create_namespaced_stateful_set):
mock_create_namespaced_stateful_set.side_effect = ApiException("Create failed")
stateful_set_object = self.k8s_manager.create_stateful_set_object()
response = self.k8s_manager.create_stateful_set(stateful_set_object)
self.assertEqual(response, "failed_stateful_set_name")
mock_logger.error.assert_called_once()
logged_message = mock_logger.error.call_args[0][0]
self.assertIn("Exception when calling AppsV1Api->create_namespaced_stateful_set: Create failed", logged_message)

Code Review Run #a24be6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

shuyingliang and others added 9 commits January 11, 2025 12:27
…In internal things removed

Signed-off-by: Shuying Liang <[email protected]>
Signed-off-by: Shuying Liang <[email protected]>
* Fix pydantic default input

Signed-off-by: Future-Outlier <[email protected]>

* add pydantic integration test

Signed-off-by: Future-Outlier <[email protected]>

* Use duck typing by Thomas's advice

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Thomas J. Fan <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Thomas J. Fan <[email protected]>
Signed-off-by: Shuying Liang <[email protected]>
* fix: Open FlyteFile from remote path

Signed-off-by: JiaWei Jiang <[email protected]>

* Add integration test

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Use ctx as param instead of recreation

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Clean test logic

1. Remove redundant prints
2. Use `mock.patch.dict` to setup `os.environ` for the current test fn
    * Avoid contaminating other tests running in the same process

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Setup local path and downloader in constructor

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Move SimpleFileTransfer to an utility file

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant env var setup

Please refer to flyteorg#3001

Signed-off-by: JiaWei Jiang <[email protected]>

* test: Add another ff use case

Create ff in one task pod and read it in another task pod.

Signed-off-by: JiaWei Jiang <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: Shuying Liang <[email protected]>
* test: Add integration test for attr access of sd

Signed-off-by: JiaWei Jiang <[email protected]>

* Correct file path

Signed-off-by: JiaWei Jiang <[email protected]>

* test: Support interaction with minio s3 bucket

1. Upload a local parquet file to minio s3 bucket
2. Access StructuredDataset attr from a dataclass
3. Open StructuredDataset from a remote path

Signed-off-by: JiaWei Jiang <[email protected]>

* Delete an unmerged integration test

Signed-off-by: JiaWei Jiang <[email protected]>

* Try imagespec with commit sha of corresponding fix

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant test

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove default_factory and create sd dc from input uri

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Clean test logic

1. Remove redundant prints
2. Use `mock.patch.dict` to setup `os.environ` for the current test fn
    * Avoid contaminating other tests running in the same process

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant minio env var setup and add test comments

Signed-off-by: JiaWei Jiang <[email protected]>

* Support uploading tmp pqt file

Signed-off-by: JiaWei Jiang <[email protected]>

* Udpate deprecated module

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant and unused imports

Signed-off-by: JiaWei Jiang <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: Shuying Liang <[email protected]>
* make _downloader function in FlyteFile/Directory pickleable

Signed-off-by: Niels Bantilan <[email protected]>

* make FlyteFile and Directory pickleable

Signed-off-by: Niels Bantilan <[email protected]>

* remove unnecessary helper functions

Signed-off-by: Niels Bantilan <[email protected]>

* fix lint

Signed-off-by: Niels Bantilan <[email protected]>

* use partials instead of lambda

Signed-off-by: Niels Bantilan <[email protected]>

* fix lint

Signed-off-by: Niels Bantilan <[email protected]>

* remove unneeded helper function

Signed-off-by: Niels Bantilan <[email protected]>

* update FlyteFilePathTransformer.downloader method

Signed-off-by: Niels Bantilan <[email protected]>

* remove downloader staticmethod

Signed-off-by: Niels Bantilan <[email protected]>

* fix lint

Signed-off-by: Niels Bantilan <[email protected]>

---------

Signed-off-by: Niels Bantilan <[email protected]>
Signed-off-by: Shuying Liang <[email protected]>
@shuyingliang shuyingliang force-pushed the shuliang/k8sdataservice branch from 43e2733 to 391df53 Compare January 11, 2025 20:35
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Code Review Agent Run #b617d8

Actionable Suggestions - 6
  • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_kube_config.py - 1
    • Consider more specific warning message assertion · Line 23-23
  • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py - 2
    • Missing method implementation for resources · Line 68-68
    • Consider simplifying redundant status check condition · Line 180-180
  • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_agent.py - 1
    • Consider consolidating repeated task metadata setup · Line 328-339
  • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/task.py - 1
    • Consider using snake_case for attributes · Line 18-26
  • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/sensor.py - 1
    • Consider consolidating error handling logic · Line 52-70
Additional Suggestions - 2
  • plugins/flytekit-k8sdataservice/utils/infra.py - 1
    • Variable name shadows built-in function · Line 8-8
  • plugins/setup.py - 1
    • Consider maintaining alphabetical ordering of plugins · Line 32-32
Review Details
  • Files reviewed - 23 · Commit Range: f0402ac..391df53
    • .pre-commit-config.yaml
    • Dockerfile.agent
    • docs/source/plugins/k8sstatefuldataservice.rst
    • plugins/flytekit-k8sdataservice/dev-requirements.txt
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/__init__.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/agent.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/kube_config.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/sensor.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/task.py
    • plugins/flytekit-k8sdataservice/setup.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_kube_config.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_manager.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_agent.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_sensor.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_task.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/utils/test_resources.py
    • plugins/flytekit-k8sdataservice/utils/infra.py
    • plugins/flytekit-k8sdataservice/utils/resources.py
    • plugins/setup.py
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/unit/core/test_generice_idl_type_engine.py
    • tests/flytekit/unit/core/test_type_engine.py
  • Files skipped - 2
    • .github/workflows/pythonbuild.yml - Reason: Filter setting
    • plugins/flytekit-k8sdataservice/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo


with self.assertLogs('flytekit', level='WARNING') as log:
kube_config.load_kube_config()
self.assertIn("Failed to load in-cluster configuration.", log.output[-1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider more specific warning message assertion

Consider adding more specific assertions for the warning message content. The current assertion only checks for a substring which could potentially match unintended messages.

Code suggestion
Check the AI-generated fix before applying
Suggested change
self.assertIn("Failed to load in-cluster configuration.", log.output[-1])
self.assertEqual(f"WARNING:flytekit:Failed to load in-cluster configuration. In-cluster config not found.", log.output[-1])

Code Review Run #b617d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

)
return "success"

if status.replicas > 0 and status.available_replicas is not None and status.available_replicas >= 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider simplifying redundant status check condition

Consider simplifying the condition status.available_replicas is not None and status.available_replicas >= 0 to just status.available_replicas >= 0 since checking for non-None is redundant when comparing with 0.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if status.replicas > 0 and status.available_replicas is not None and status.available_replicas >= 0:
if status.replicas > 0 and status.available_replicas >= 0:

Code Review Run #b617d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +328 to +339
task_metadata = task.TaskMetadata(
discoverable= True,
runtime=task.RuntimeMetadata(task.RuntimeMetadata.RuntimeType.FLYTE_SDK, "1.0.0", "python"),
timeout=timedelta(days=1),
retries=literals.RetryStrategy(3),
interruptible=True,
discovery_version="0.1.1b0",
deprecated_error_message="This is deprecated!",
cache_serializable=True,
pod_template_name="A",
cache_ignore_input_vars=(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider consolidating repeated task metadata setup

Consider consolidating the task metadata initialization by extracting common values into constants or helper functions. The current implementation has repeated task metadata setup across multiple test cases.

Code suggestion
Check the AI-generated fix before applying
Suggested change
task_metadata = task.TaskMetadata(
discoverable= True,
runtime=task.RuntimeMetadata(task.RuntimeMetadata.RuntimeType.FLYTE_SDK, "1.0.0", "python"),
timeout=timedelta(days=1),
retries=literals.RetryStrategy(3),
interruptible=True,
discovery_version="0.1.1b0",
deprecated_error_message="This is deprecated!",
cache_serializable=True,
pod_template_name="A",
cache_ignore_input_vars=(),
)
task_metadata = create_test_task_metadata()

Code Review Run #b617d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +18 to +26
Name: Optional[str] = None
Requests: Optional[Resources] = None
Limits: Optional[Resources] = None
Port: Optional[int] = None
Image: Optional[str] = None
Command: Optional[List[str]] = None
Replicas: Optional[int] = None
ExistingReleaseName: Optional[str] = None
Cluster: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using snake_case for attributes

Consider using snake_case for attribute names in DataServiceConfig class to follow Python naming conventions. Attributes like Name, Requests, Limits etc. should be lowercase.

Code suggestion
Check the AI-generated fix before applying
Suggested change
Name: Optional[str] = None
Requests: Optional[Resources] = None
Limits: Optional[Resources] = None
Port: Optional[int] = None
Image: Optional[str] = None
Command: Optional[List[str]] = None
Replicas: Optional[int] = None
ExistingReleaseName: Optional[str] = None
Cluster: Optional[str] = None
name: Optional[str] = None
requests: Optional[Resources] = None
limits: Optional[Resources] = None
port: Optional[int] = None
image: Optional[str] = None
command: Optional[List[str]] = None
replicas: Optional[int] = None
existing_release_name: Optional[str] = None
cluster: Optional[str] = None

Code Review Run #b617d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +52 to +70
data_service_name = self.release_name
logger.info(f"Sensor got the release name: {self.release_name}")
try:
# Delete the Service associated with the graph engine
self.core_v1_api.delete_namespaced_service(
name=data_service_name, namespace=self.namespace, body=client.V1DeleteOptions()
)
logger.info(f"Deleted Service: {data_service_name}")
except ApiException as e:
logger.error(f"Error deleting Service: {e}")

try:
# Delete the StatefulSet associated with the graph engine
self.apps_v1_api.delete_namespaced_stateful_set(
name=data_service_name, namespace=self.namespace, body=client.V1DeleteOptions()
)
logger.info(f"Deleted StatefulSet: {data_service_name}")
except ApiException as e:
logger.error(f"Error deleting StatefulSet: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider consolidating error handling logic

The error handling for Service and StatefulSet deletion is duplicated. Consider consolidating the error handling logic into a helper method.

Code suggestion
Check the AI-generated fix before applying
Suggested change
data_service_name = self.release_name
logger.info(f"Sensor got the release name: {self.release_name}")
try:
# Delete the Service associated with the graph engine
self.core_v1_api.delete_namespaced_service(
name=data_service_name, namespace=self.namespace, body=client.V1DeleteOptions()
)
logger.info(f"Deleted Service: {data_service_name}")
except ApiException as e:
logger.error(f"Error deleting Service: {e}")
try:
# Delete the StatefulSet associated with the graph engine
self.apps_v1_api.delete_namespaced_stateful_set(
name=data_service_name, namespace=self.namespace, body=client.V1DeleteOptions()
)
logger.info(f"Deleted StatefulSet: {data_service_name}")
except ApiException as e:
logger.error(f"Error deleting StatefulSet: {e}")
def delete_resource(resource_type: str, delete_fn):
try:
delete_fn(
name=self.release_name,
namespace=self.namespace,
body=client.V1DeleteOptions()
)
logger.info(f"Deleted {resource_type}: {self.release_name}")
except ApiException as e:
logger.error(f"Error deleting {resource_type}: {e}")
logger.info(f"Sensor got the release name: {self.release_name}")
delete_resource("Service", self.core_v1_api.delete_namespaced_service)
delete_resource("StatefulSet", self.apps_v1_api.delete_namespaced_stateful_set)

Code Review Run #b617d8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants