Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve agent plugin documentation with running examples and additional notes #5399

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 43 additions & 17 deletions docs/flyte_agents/developing_agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,46 +41,72 @@ To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/fl
- `get`: This method retrieves the job resource (jobID or output literal) associated with the task, such as a BigQuery job ID or Databricks task ID.
- `delete`: Invoking this method will send a request to delete the corresponding job.

Below is a skeleton for an example async agent. Modify it as needed.

```python
# agent.py
from typing import Optional
from dataclasses import dataclass

from flyteidl.core.execution_pb2 import TaskExecution
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource, ResourceMeta


@dataclass
class BigQueryMetadata(ResourceMeta):
class ExampleMetadata(ResourceMeta):
"""
This is the metadata for the job. For example, the id of the job.
"""

job_id: str

class BigQueryAgent(AsyncAgentBase):

class ExampleAgent(AsyncAgentBase):
def __init__(self):
super().__init__(task_type_name="bigquery", metadata_type=BigQueryMetadata)
super().__init__(task_type_name="example", metadata_type=ExampleMetadata)

def create(
self,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
**kwargs,
) -> BigQueryMetadata:
job_id = submit_bigquery_job(inputs)
return BigQueryMetadata(job_id=job_id)

def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource:
phase, outputs = get_job_status(resource_meta.job_id)
return Resource(phase=phase, outputs=outputs)

def delete(self, resource_meta: BigQueryMetadata, **kwargs):
cancel_bigquery_job(resource_meta.job_id)

# To register the bigquery agent
AgentRegistry.register(BigQueryAgent())
) -> ExampleMetadata:
print(f"create called task_template={task_template}")
# pull out plugin specific configuration from the task
custom = task_template.custom
# pull out the environment field set from the task
environment = custom["environment"]

# submit job on external platform
# job_id = submit_job(inputs, environment)
job_id = "temp"

# return metadata which will be used for following get & delete calls
return ExampleMetadata(job_id=job_id)

def get(self, resource_meta: ExampleMetadata, **kwargs) -> Resource:
print(f"get called resource_meta={resource_meta}")
# query external platform for job status
# phase = get_job_status(resource_meta.job_id)
phase = TaskExecution.SUCCEEDED

# phases can be TaskExecution.RUNNING, TaskExecution.SUCCEEDED, TaskExecution.FAILED, etc
return Resource(phase=phase)

def delete(self, resource_meta: ExampleMetadata, **kwargs):
print(f"delete called resource_meta={resource_meta}")
# cancel job on external platform
# cancel_job(resource_meta.job_id)
pass


# To register the example agent
AgentRegistry.register(ExampleAgent())
```

For an example implementation, see the [BigQuery agent](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L43).
For an example implementation of a real agent, see the [BigQuery agent](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L43).

#### Sync agent interface specification

Expand Down
117 changes: 95 additions & 22 deletions docs/flyte_agents/testing_agents_in_a_local_python_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,121 @@ You can test agents locally without running the backend server.
To test an agent locally, create a class for the agent task that inherits from `SyncAgentExecutorMixin` or `AsyncAgentExecutorMixin`.
These mixins can handle synchronous and asynchronous tasks, respectively, and allow flytekit to mimic FlytePropeller's behavior in calling the agent.

## BigQuery example
## Example Task

To test the BigQuery agent, copy the following code to a file called `bigquery_task.py`, modifying as needed.
To test the example agent defined in {ref}`developing agents <developing_agents>`, copy the following code to a file called `task.py`, modifying as needed.

```python
# task.py
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Dict, Optional

from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct

from flytekit import PythonFunctionTask
from flytekit.configuration import SerializationSettings
from flytekit.extend import TaskPlugins
from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin


@dataclass
class ExampleConfig(object):
"""
ExampleConfig should be used to configure an ExampleTask.
"""

environment: str


# Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task.
# This task extends PythonFunctionTask but you can extend different base tasks depending on your needs (ie. SQLTask)
class ExampleTask(AsyncAgentExecutorMixin, PythonFunctionTask[ExampleConfig]):
# This must match the task type defined in the agent
_TASK_TYPE = "example"

def __init__(
self,
task_config: Optional[ExampleConfig],
task_function: Callable,
**kwargs,
) -> None:
outputs = None
super().__init__(
task_config=task_config,
task_function=task_function,
task_type=self._TASK_TYPE,
**kwargs,
)

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
"""
Return plugin-specific data as a serializable dictionary. This is required for your plugin to access task_template.custom.
"""
config = {
"environment": self.task_config.environment,
}
s = Struct()
s.update(config)
return json_format.MessageToDict(s)


# Register the Example Task into the flytekit core plugin system
TaskPlugins.register_pythontask_plugin(ExampleConfig, ExampleTask)
```

```{note}

In some cases, you will need to store credentials in your local environment when testing locally.
For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable when running BigQuery tasks to test the BigQuery agent.
The ExampleTask implements `get_custom` which is originally defined in the base Task object. You will need to implement
`_get_custom` if you wish to pass plugin-specific data through the task_template's `custom` field.

```

Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task.
Flytekit will automatically use the agent to run the task in the local execution.
```python
class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]):
...
# example.py
from flytekit.configuration.default_images import DefaultImages
from flytekit import task

class ChatGPTTask(SyncAgentExecutorMixin, PythonTask):
...
# Import agent to trigger agent registration
from .agent import ExampleAgent
# Import task to trigger task registration and pass plugin specific config
from .task import ExampleConfig

```

Flytekit will automatically use the agent to run the task in the local execution.
```python
bigquery_doge_coin = BigQueryTask(
name=f"bigquery.doge_coin",
inputs=kwtypes(version=int),
query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
output_structured_dataset_type=StructuredDataset,
task_config=BigQueryConfig(ProjectID="flyte-test-340607")
)
@task(task_config=ExampleConfig(environment="dev"), container_image=DefaultImages.default_image())
def say_hello(name: str) -> str:
print(f"Hello, {name}!")
return f"Hello, {name}!"

```

You can run the above example task locally and test the agent with the following command:
You can run locally and test the agent with the following command:

```bash
pyflyte run bigquery_task.py bigquery_doge_coin --version 10
pyflyte run example.py example --name world
Running Execution on local.
create called task_template=<FlyteLiteral(TaskTemplate) id { resource_type: TASK name: "armada_test_agent.example.say_hello" } type: "example" metadata { runtime { type: FLYTE_SDK version: "1.12.0" flavor: "python" } retries { } } interface { inputs { variables { key: "name" value { type { simple: STRING } description: "name" } } } outputs { variables { key: "o0" value { type { simple: STRING } description: "o0" } } } } custom { fields { key: "environment" value { string_value: "dev" } } } container { image: "cr.flyte.org/flyteorg/flytekit:py3.11-1.12.0" args: "pyflyte-execute" args: "--inputs" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9/inputs.pb" args: "--output-prefix" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9" args: "--raw-output-data-prefix" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9/raw_output" args: "--checkpoint-path" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9/checkpoint_output" args: "--prev-checkpoint" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9/prev_checkpoint" args: "--resolver" args: "flytekit.core.python_auto_container.default_task_resolver" args: "--" args: "task-module" args: "armada_test_agent.example" args: "task-name" args: "say_hello" resources { } }>
get called resource_meta=ExampleMetadata(job_id='temp')
```

You can also run a BigQuery task in your Python interpreter to test the agent locally.
If it doesn't appear that your agent is running you can debug what might be going on by passing the `-v` verbose flag to `pyflyte`.

## Existing Flyte Agent Tasks
Sovietaced marked this conversation as resolved.
Show resolved Hide resolved

You can also run a existing Flyte Agent tasks in your Python interpreter to test the agent locally.
Sovietaced marked this conversation as resolved.
Show resolved Hide resolved

![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/bigquery_task.png)


```{note}

In some cases, you will need to store credentials in your local environment when testing locally.
For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable when running BigQuery tasks to test the BigQuery agent.

```

## Databricks example
To test the Databricks agent, copy the following code to a file called `databricks_task.py`, modifying as needed.

Expand Down
Loading