-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the dagster-openai integration library #19697
Implement the dagster-openai integration library #19697
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @maximearmstrong and the rest of your teammates on Graphite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my overall reaction is we can limit the MVP to only enable a subset of APIs and allow for more in a later stack
FINE_TUNING = "fine_tuning" | ||
|
||
|
||
API_RESOURCES_TO_ENDPOINT_METHODS_MAPPING = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks a little scary to me in terms of maintenance cost. wonder if we could slim it down to a even smaller set to start with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could start with the 3 main ones, Chat
, Completions
and Embeddings
and add more from there?
add_to_asset_metadata(context, "openai.calls", 1, output_name) | ||
add_to_asset_metadata(context, "openai.total_tokens", usage.total_tokens, output_name) | ||
add_to_asset_metadata(context, "openai.prompt_tokens", usage.prompt_tokens, output_name) | ||
if hasattr(usage, "completion_tokens"): | ||
add_to_asset_metadata( | ||
context, "openai.completion_tokens", usage.completion_tokens, output_name | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can probably call the add_to_asset_metadata once since we will have all the dict value ready here [1]
context_to_counters = WeakKeyDictionary() | ||
|
||
|
||
def add_to_asset_metadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: [1], then you probably change it so it's update dict instead of add single value to a metadata key
client.fine_tuning.jobs.create = with_usage_metadata( | ||
client.fine_tuning.jobs.create | ||
) | ||
client.fine_tuning.jobs.create( | ||
model="gpt-3.5-turbo", | ||
training_file="some_training_file" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth adding comment here to explain different behaviors
# If none of the OpenAI API resource types are specified, | ||
# we wrap by default the methods of | ||
# `openai.resources.Completions`, `openai.resources.Embeddings` | ||
# and `openai.resources.chat.Completions`. | ||
# This allows the usage metadata to be captured. | ||
if not api_resources: | ||
api_resources = [ | ||
ApiResourceEnum.COMPLETIONS, | ||
ApiResourceEnum.CHAT, | ||
ApiResourceEnum.EMBEDDINGS, | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was imagining this to be an internal implementation to start with.
[2] seems a bit heavy IMO - maybe we could stake a separate PR for this pattern. the MVP can just not allow api_resources - it's easier and better to restrict more and open it up later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense, I can remove the option.
api_resources = [ | ||
ApiResourceEnum.CHAT, | ||
ApiResourceEnum.FINE_TUNING, | ||
] | ||
with openai.get_client(context, api_resources=api_resources) as client: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[2]
have you tested this in a toy dagster project and how does that look like in 1) asset metadata which is available across oss and cloud, 2) cloud insights? |
from pydantic import Field, PrivateAttr | ||
|
||
|
||
class ApiResourceEnum(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming quibble - since this name overloads the Dagster "Resource" noun, it could be pretty confusing to users (they construct an OpenAI Resource & pass it API resources). Are these called resources in OpenAI docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could potentially call this ApiFeaturesEnum
or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming quibble - since this name overloads the Dagster "Resource" noun, it could be pretty confusing to users (they construct an OpenAI Resource & pass it API resources). Are these called resources in OpenAI docs?
I agree, but indeed, they are called resources in the OpenAI library we are wrapping. I also thought of ApiEndpointClassesEnum to make it obvious that it is not related to a dagster resource. I can make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ApiEndpointClassesEnum
sounds not bad to me - i'd lean towards very descriptive to start with.
@contextmanager | ||
def get_client( | ||
self, | ||
context: AssetExecutionContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also be retrieved contextually via AssetExecutionContext.get()
, if that makes your impl cleaner (e.g. this could be optional). Having the option to explicitly supply it might be nice for testing though.
|
||
API_RESOURCES_TO_ENDPOINT_METHODS_MAPPING = { | ||
"completions": [["create"]], | ||
"chat": [["completions", "create"]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: To avoid duplicating strings/promote type checking, could use enum->list mapping
"chat": [["completions", "create"]], | |
ApiResourceEnum.CHAT: [["completions", "create"]], |
It was fully tested locally with a toy dagster project - the metadata is displayed as expected. Next step was cloud, I wanted to make sure we were on the right track before completing all tests. |
@yuhan @benpankow I updated the code in ebe9367 to match the first reviews. User can now use
The code was tested in a toy repo locally and in local cloud deployment. Missing:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking close! look forward to the complete docstring and test cases!
def get_client( | ||
self, | ||
context: Union[AssetExecutionContext, OpExecutionContext], | ||
output_name: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note on unit tests, since you have output_name here: worth testing the behavior in the following cases:
- multi_assets (https://docs.dagster.io/concepts/assets/multi-assets)
- assets with partitions (https://docs.dagster.io/concepts/partitions-schedules-sensors/partitioning-assets)
- graph-backed assets (https://docs.dagster.io/concepts/assets/graph-backed-assets) - i doubt this will behave any different than a regular asset but worth throwing a test case for complexity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bfad41d
to
bbe2b6e
Compare
dd0e2f2
to
8a65cb8
Compare
def get_client( | ||
self, | ||
context: Union[AssetExecutionContext, OpExecutionContext], | ||
asset_key: Optional[AssetKey] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per internal convo, might worth switching to get_client
and get_client_for_asset
to keep the consistency with InsightsSnowflakeResource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ec36d2c
result = ( | ||
Definitions( | ||
assets=[openai_asset], | ||
jobs=[ | ||
define_asset_job( | ||
name="openai_asset_job", selection=AssetSelection.assets(openai_asset) | ||
) | ||
], | ||
resources={ | ||
"openai_resource": OpenAIResource(api_key="xoxp-1234123412341234-12341234-1234") | ||
}, | ||
) | ||
.get_job_def("openai_asset_job") | ||
.execute_in_process() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[2] you can test an asset with resources by directly invoking it: https://docs.dagster.io/concepts/testing#testing-assets-with-resources
Definitions( | ||
assets=[openai_asset], | ||
jobs=[ | ||
define_asset_job( | ||
name="openai_asset_job", selection=AssetSelection.assets(openai_asset) | ||
) | ||
], | ||
resources={ | ||
"openai_resource": OpenAIResource(api_key="xoxp-1234123412341234-12341234-1234") | ||
}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably no need to call Definitions. you can use materialize_to_memory
instead:
materialize_to_memory(
[openai_asset]
resources={"openai_resource": OpenAIResource(api_key="xoxp-1234123412341234-12341234-1234")}
).
https://docs.dagster.io/concepts/testing#testing-multiple-software-defined-assets-together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in d3581d5
outs={ | ||
"status": AssetOut(), | ||
"result": AssetOut(), | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets update it to using specs
which is a more blessed pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in d3581d5
result = ( | ||
Definitions( | ||
assets=[openai_multi_asset], | ||
jobs=[ | ||
define_asset_job( | ||
name="openai_multi_asset_job", | ||
selection=AssetSelection.assets(openai_multi_asset), | ||
) | ||
], | ||
resources={ | ||
"openai_resource": OpenAIResource(api_key="xoxp-1234123412341234-12341234-1234") | ||
}, | ||
) | ||
.get_job_def("openai_multi_asset_job") | ||
.execute_in_process() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as [2]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in d3581d5
abb8098
to
ec36d2c
Compare
Deploy preview for dagster-university ready! ✅ Preview Built with commit ec36d2c. |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit ec36d2c. |
# Set up an OpenAI client based on the API key. | ||
self._client = Client(api_key=self.api_key) | ||
|
||
@contextmanager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably need a @public
decorator here to make it show up in doc string. we can add and test that in the docs PR tho.
) | ||
""" | ||
yield from self._get_client(context=context, asset_key=None) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@public
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's merge this in! And iterate if anything else comes up as we build some examples using it.
Before landing, please make sure we aren't releasing it this week. See how to whitelist in release pipeline here: https://dagsterlabs.slack.com/archives/C03A0D72A6T/p1708645224684229
1bcee6a
to
c0a0a20
Compare
c0a0a20
to
0df4f34
Compare
## Summary & Motivation This PR adds a new `dagster-openai` library to our set of libraries. The main goal of this library is to log the Open AI API usage in the metadata. To do so, we need to wrap the methods called through the client, get the results and update the metadata. Initial code snippets was hardcoding 3 methods, but we want to give the user some flexibility. Constraints: - Results must be captured at the method level - the data we seek is included in the OpenAI API response. The results can't be captured at the client level, at teardown for instance. - Not all the methods existing in the OpenAI library should be wrap (private methods, etc.) - Methods are overloaded in the API Resource classes, so wrapping the methods should be done on the instance. **Solution** Implement `OpenAIResource.get_client`, `OpenAIResource.get_client_for_asset` and the function wrapper `with_usage_metadata`. By default, for assets, the methods for the 3 main API Endpoint classes, `Completions`, `Chat` and `Embeddings`, are wrapped when instantiating the client - wrapping the methods allows to log the usage metadata provided in an OpenAI Completion response. If another endpoint should be wrapped, a user can use `with_usage_metadata` to it and log the metadata. `OpenAIResource.get_client` can be used for assets and ops, but the metadata will not be logged for ops. `OpenAIResource.get_client_for_asset` can only be used with assets and the metadata will be logged. ## TO-DOs - [x] implement the resource - [x] add docstrings - [x] implement tests ## How I Tested These Changes Local implementation BK Dogfood in Purina with a toy example
## Summary & Motivation This PR adds a new `dagster-openai` library to our set of libraries. The main goal of this library is to log the Open AI API usage in the metadata. To do so, we need to wrap the methods called through the client, get the results and update the metadata. Initial code snippets was hardcoding 3 methods, but we want to give the user some flexibility. Constraints: - Results must be captured at the method level - the data we seek is included in the OpenAI API response. The results can't be captured at the client level, at teardown for instance. - Not all the methods existing in the OpenAI library should be wrap (private methods, etc.) - Methods are overloaded in the API Resource classes, so wrapping the methods should be done on the instance. **Solution** Implement `OpenAIResource.get_client`, `OpenAIResource.get_client_for_asset` and the function wrapper `with_usage_metadata`. By default, for assets, the methods for the 3 main API Endpoint classes, `Completions`, `Chat` and `Embeddings`, are wrapped when instantiating the client - wrapping the methods allows to log the usage metadata provided in an OpenAI Completion response. If another endpoint should be wrapped, a user can use `with_usage_metadata` to it and log the metadata. `OpenAIResource.get_client` can be used for assets and ops, but the metadata will not be logged for ops. `OpenAIResource.get_client_for_asset` can only be used with assets and the metadata will be logged. ## TO-DOs - [x] implement the resource - [x] add docstrings - [x] implement tests ## How I Tested These Changes Local implementation BK Dogfood in Purina with a toy example
## Summary & Motivation This PR adds the docs for the `dagster-openai` integration added in PR #19697 ## How I Tested These Changes BK
## Summary & Motivation This PR adds the docs for the `dagster-openai` integration added in PR #19697 ## How I Tested These Changes BK
## Summary & Motivation This PR adds the docs for the `dagster-openai` integration added in PR #19697 ## How I Tested These Changes BK
## Summary & Motivation This PR adds a new `dagster-openai` library to our set of libraries. The main goal of this library is to log the Open AI API usage in the metadata. To do so, we need to wrap the methods called through the client, get the results and update the metadata. Initial code snippets was hardcoding 3 methods, but we want to give the user some flexibility. Constraints: - Results must be captured at the method level - the data we seek is included in the OpenAI API response. The results can't be captured at the client level, at teardown for instance. - Not all the methods existing in the OpenAI library should be wrap (private methods, etc.) - Methods are overloaded in the API Resource classes, so wrapping the methods should be done on the instance. **Solution** Implement `OpenAIResource.get_client`, `OpenAIResource.get_client_for_asset` and the function wrapper `with_usage_metadata`. By default, for assets, the methods for the 3 main API Endpoint classes, `Completions`, `Chat` and `Embeddings`, are wrapped when instantiating the client - wrapping the methods allows to log the usage metadata provided in an OpenAI Completion response. If another endpoint should be wrapped, a user can use `with_usage_metadata` to it and log the metadata. `OpenAIResource.get_client` can be used for assets and ops, but the metadata will not be logged for ops. `OpenAIResource.get_client_for_asset` can only be used with assets and the metadata will be logged. ## TO-DOs - [x] implement the resource - [x] add docstrings - [x] implement tests ## How I Tested These Changes Local implementation BK Dogfood in Purina with a toy example
## Summary & Motivation This PR adds the docs for the `dagster-openai` integration added in PR #19697 ## How I Tested These Changes BK
Summary & Motivation
This PR adds a new
dagster-openai
library to our set of libraries.The main goal of this library is to log the Open AI API usage in the metadata. To do so, we need to wrap the methods called through the client, get the results and update the metadata.
Initial code snippets was hardcoding 3 methods, but we want to give the user some flexibility.
Constraints:
Solution
Implement
OpenAIResource.get_client
,OpenAIResource.get_client_for_asset
and the function wrapperwith_usage_metadata
.By default, for assets, the methods for the 3 main API Endpoint classes,
Completions
,Chat
andEmbeddings
, are wrapped when instantiating the client - wrapping the methods allows to log the usage metadata provided in an OpenAI Completion response. If another endpoint should be wrapped, a user can usewith_usage_metadata
to it and log the metadata.OpenAIResource.get_client
can be used for assets and ops, but the metadata will not be logged for ops.OpenAIResource.get_client_for_asset
can only be used with assets and the metadata will be logged.TO-DOs
How I Tested These Changes
Local implementation
BK
Dogfood in Purina with a toy example