Skip to content

Commit

Permalink
cloudwatch message reader docs
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Aug 6, 2024
1 parent cf82b96 commit a4f6654
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 16 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
13 changes: 6 additions & 7 deletions docs/content/concepts/dagster-pipes/aws-glue.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,26 @@ Call `open_dagster_pipes` in the Glue job script to create a context that can be
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesDefaultMessageWriter,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
message_writer = PipesS3MessageWriter(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
Expand Down Expand Up @@ -107,7 +109,7 @@ Next, add the `PipesGlueClient` resource to your project's <PyObject object="Def

```python file=/guides/dagster/dagster_pipes/glue/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions # noqa
from dagster_aws.pipes import PipesGlueContextInjector, PipesS3MessageReader
from dagster_aws.pipes import PipesGlueContextInjector


bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"]
Expand All @@ -122,9 +124,6 @@ defs = Definitions(
client=boto3.client("s3"),
bucket=bucket,
),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=bucket
),
)
},
)
Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def glue_pipes_asset(
# start_definitions_marker

from dagster import Definitions # noqa
from dagster_aws.pipes import PipesGlueContextInjector, PipesS3MessageReader
from dagster_aws.pipes import PipesGlueContextInjector


bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"]
Expand All @@ -39,9 +39,6 @@ def glue_pipes_asset(
client=boto3.client("s3"),
bucket=bucket,
),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=bucket
),
)
},
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesDefaultMessageWriter,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
message_writer = PipesS3MessageWriter(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
Expand Down
17 changes: 15 additions & 2 deletions python_modules/libraries/dagster-aws/dagster_aws/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class PipesCloudWatchMessageReader(PipesMessageReader):

def __init__(self, client: Optional[boto3.client] = None):
"""Args:
client (boto3.client): boto3 CloudWatch client.
client (boto3.client): boto3 CloudWatch client.
"""
self.client = client or boto3.client("logs")

Expand All @@ -204,6 +204,19 @@ def consume_cloudwatch_logs(
start_time: Optional[int] = None,
end_time: Optional[int] = None,
) -> None:
"""Reads logs from AWS CloudWatch and forwards them to Dagster for events extraction and logging.
Args:
log_group (str): CloudWatch log group name
log_stream (str): CLoudWatch log stream name
start_time (Optional[int]): The start of the time range, expressed as the number of
milliseconds after Jan 1, 1970 00:00:00 UTC. Events with a timestamp equal to this
time or later than this time are included.
Events with a timestamp earlier than this time are not included.
end_time (Optional[int]): The end of the time range, expressed as the number of
milliseconds after Jan 1, 1970 00:00:00 UTC. Events with a timestamp equal to or
later than this time are not included.
"""
handler = check.not_none(
self._handler, "Can only consume logs within context manager scope."
)
Expand All @@ -216,7 +229,7 @@ def consume_cloudwatch_logs(
extract_message_or_forward_to_stdout(handler, log_line)

def no_messages_debug_text(self) -> str:
return "Attempted to read messages by extracting them from the tail of CloudWatch logs directly."
return "Attempted to read messages by extracting them from CloudWatch logs directly."

def _get_all_cloudwatch_events(
self,
Expand Down

0 comments on commit a4f6654

Please sign in to comment.