-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add PipesLambdaClient #17924
add PipesLambdaClient #17924
Conversation
Current dependencies on/for this PR: This stack of pull requests is managed by Graphite. |
120e6d0
to
57fa0ab
Compare
9854f8d
to
a5233d6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the behavior here when unstructured logs are >4k? Does it still pick up all structured messages?
|
||
|
||
@experimental | ||
class PipesLambdaLogsMessageReader(PipesMessageReader): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document it's capabilities. In particular the limitation on log length and what the product experience would be if you use this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to implement this using the PipesLogReader
API used for databricks? If I'm reading this correctly this doesn't support continuous polling of the logs.
Or does that not work/is otherwise inappropriate for lambda?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda sync invoke API gives you back the last 4k of logs directly in the response, so this initial version is exploring that. While it does have its limitations, its very simple and as long as the meaningful messages are towards the end of the computation (which i believe would be common) then it should work.
The full logs are available via Cloud Watch so need to explore what the exact constraints of using those APIs to trail the logs are (likely via PipesLogReader
).
log_result = base64.b64decode(response["LogResult"]).decode("utf-8") | ||
|
||
for log_line in log_result.splitlines(): | ||
extract_message_or_forward_to_stdout(handler, log_line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmmm should this be stderr?
@@ -75,7 +75,7 @@ class _PipesDockerClient(PipesClient): | |||
the docker client. | |||
context_injector (Optional[PipesContextInjector]): A context injector to use to inject | |||
context into the docker container process. Defaults to :py:class:`PipesEnvContextInjector`. | |||
message_reader (Optional[PipesContextInjector]): A message reader to use to read messages | |||
message_reader (Optional[PipesMessageReader]): A message reader to use to read messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx
|
||
|
||
@experimental | ||
class PipesLambdaLogsMessageReader(PipesMessageReader): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to implement this using the PipesLogReader
API used for databricks? If I'm reading this correctly this doesn't support continuous polling of the logs.
Or does that not work/is otherwise inappropriate for lambda?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda sync invoke API gives you back the last 4k of logs directly in the response, so this initial version is exploring that. While it does have its limitations, its very simple and as long as the meaningful messages are towards the end of the computation (which i believe would be common) then it should work.
This seems very unreliable. I suggest doing something less rickety.
Ideas:
- Structured messages via s3
- Wait to serialize structured messages until the very end to guarantee availability.
Open to other suggestions.
a5233d6
to
efb6de5
Compare
efb6de5
to
32dc6e2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait to serialize structured messages until the very end to guarantee availability.
I did this with [1]
Structured messages via s3
This is under test / demonstrated [2]
A maybe not grounded motivation for keeping the log tails baed default is avoiding the need to get boto3
dep installed in your lambda. Anecdotally, copy pasting the single file for access to dagster-pipes
is pretty low friction where as getting boto3 leads to https://docs.aws.amazon.com/lambda/latest/dg/python-package.html#python-package-dependencies . I assume most mature users of lambda have some tooling to manage and package deps, but not clear to me at this time how prevalent that maturity is.
class PipesBufferedStreamMessageWriterChannel(PipesMessageWriterChannel): | ||
"""Message writer channel that buffers messages and then writes them all out to a | ||
`TextIO` stream on close. | ||
""" | ||
|
||
def __init__(self, stream: TextIO): | ||
self._buffer = [] | ||
self._stream = stream | ||
|
||
def write_message(self, message: PipesMessage) -> None: | ||
self._buffer.append(message) | ||
|
||
def flush(self): | ||
for message in self._buffer: | ||
self._stream.writelines((json.dumps(message), "\n")) | ||
self._buffer = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1]
PipesLambdaClient( | ||
FakeLambdaClient(), | ||
message_reader=PipesS3MessageReader( | ||
client=s3_client, bucket=_S3_TEST_BUCKET, interval=0.001 | ||
), | ||
) | ||
.run( | ||
context=context, | ||
function_name=LambdaFunctions.pipes_s3_messages.__name__, | ||
event={}, | ||
) | ||
.get_materialize_result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[2]
s3_client = boto3.client( | ||
"s3", region_name="us-east-1", endpoint_url="http://localhost:5193" | ||
) | ||
with open_dagster_pipes( | ||
params_loader=PipesMappingParamsLoader(event), | ||
message_writer=PipesS3MessageWriter(client=s3_client, interval=0.001), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[2]
cdff2b3
to
ef4166e
Compare
32dc6e2
to
3746da4
Compare
3746da4
to
0cd8dc5
Compare
class PipesMappingParamsLoader(PipesParamsLoader): | ||
"""Params loader that extracts params from a Mapping provided at init time.""" | ||
|
||
def __init__(self, mapping: Mapping[str, str]): | ||
self._mapping = mapping | ||
|
||
def is_dagster_pipes_process(self) -> bool: | ||
# use the presence of DAGSTER_PIPES_CONTEXT to discern if we are in a pipes process | ||
return DAGSTER_PIPES_CONTEXT_ENV_VAR in os.environ | ||
return DAGSTER_PIPES_CONTEXT_ENV_VAR in self._mapping | ||
|
||
def load_context_params(self) -> PipesParams: | ||
return _param_from_env_var(DAGSTER_PIPES_CONTEXT_ENV_VAR) | ||
raw_value = self._mapping[DAGSTER_PIPES_CONTEXT_ENV_VAR] | ||
return decode_env_var(raw_value) | ||
|
||
def load_messages_params(self) -> PipesParams: | ||
return _param_from_env_var(DAGSTER_PIPES_MESSAGES_ENV_VAR) | ||
raw_value = self._mapping[DAGSTER_PIPES_MESSAGES_ENV_VAR] | ||
return decode_env_var(raw_value) | ||
|
||
|
||
class PipesEnvVarParamsLoader(PipesMappingParamsLoader): | ||
"""Params loader that extracts params from environment variables.""" | ||
|
||
def __init__(self): | ||
super().__init__(mapping=os.environ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially a behavior change if someone smashes os.environ
by assigning a new dictionary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think any action is worth taking on this, but just wanted to call it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok great. Let's write some docs on this and post it to #dagster-pipes for this week's release.
"""Message reader that consumes buffered pipes messages that were flushed on exit from the | ||
final 4k of logs that are returned from issuing a sync lambda invocation. | ||
|
||
Limitations: If the volume of pipes messages exceeds 4k, messages will be lost and it is | ||
recommended to switch to PipesS3MessageWriter & PipesS3MessageReader. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also note explicitly that messages emitted during the computation will only be emitted to the Dagster event log until after the lambda complete.
0cd8dc5
to
3419ee6
Compare
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 3419ee6. |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 3419ee6. |
## Summary & Motivation This PR adds a guide for using Pipes with AWS Lambda (#17924) ## How I Tested These Changes 👀 , help from Alex
Create a pipes client for AWS lambda. ## How I Tested These Changes added unit tests that use a fake version of lambda manually tested against a real lambda instance
## Summary & Motivation This PR adds a guide for using Pipes with AWS Lambda (dagster-io#17924) ## How I Tested These Changes 👀 , help from Alex
Create a pipes client for AWS lambda.
How I Tested These Changes
added unit tests that use a fake version of lambda
manually tested against a real lambda instance