From 52c78304feb121edbc28e65f9974ade5af124a53 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Mon, 25 Mar 2024 07:02:43 -0400 Subject: [PATCH] [backfills] Add batch inserts for ranged materialization/observation backfills (#19862) ## Summary & Motivation Internal companion PR: https://github.com/dagster-io/internal/pull/8634 See this comment for up-to-date summary: https://github.com/dagster-io/dagster/pull/19862#issuecomment-1962397047 Add batch insert support for asset materializations and observations emitted in a single step. Previously, a ranged backfill that generated N `AssetMaterialization` events would perform N DB inserts. After this PR, it generates only a few inserts (more than 1 because a few tables are hit). ## How it works - Logged dagster events take the following pathway: - `DagsterEvent.` (e.g. `DagsterEvent.asset_materialization`) - `DagsterEvent.from_step` - `log_dagster_event` - `DagsterLogManager.log` - `DagsterLogManager._log` - `DagsterLogHandler.filter` - `DagsterLogHandler.emit` - `_EventListenerLogHandler` (in `_core/instance/__init__.py`) - `DagsterInstance.handle_new_event` - This pathway has been modified to pass batch metadata, represented in a new `DagsterEventBatchMetadata` class, along the entire path. `DagsterEventBatchMetadata` has two fields: `id` and `is_end`. - `DagsterEventBatchMetadata` must be passed in to the static method that creates the event (`DagsterEvent.`). It is then packaged side by side with the event and passed up the call chain. `DagsterEvent` itself is not modified. - Batching management is done in `DagsterInstance.handle_new_event`, which receives optional `DagsterEventBatchMetadata`. If a call receives batch metadata, the event is buffered under the batch id. The buffer is cleared and a write is performed if (a) the buffer has hit the `EVENT_BATCH_SIZE` threshold (set at 1000, overridable via env var); or (b) `DagsterEventBatchMetadata.is_end` is set. - The write is implemented via a new method `EventLogStorage.store_event_batch`. The default implementation of this is just to loop over the events in the batch and call `store_event`, i.e. equivalent to the old behavior. In the `EventLogStorage` implementations in cloud and `dagster-postgres`, this method is overridden to perform a batch insert instead. ## How I Tested These Changes - Add new storage test that performs a single-run partition range materialization. The organization of the storage tests is unfamiliar to me so I'm not sure I put it in the appropriate place. - Add simple single run backfill test that actually executes the full backfill. --- .../dagster/dagster/_core/events/__init__.py | 33 +++++- .../_core/execution/plan/execute_step.py | 40 ++++--- .../dagster/_core/instance/__init__.py | 83 +++++++++++-- .../dagster/dagster/_core/log_manager.py | 67 +++++++++-- .../dagster/_core/storage/event_log/base.py | 4 + .../_core/storage/event_log/sql_event_log.py | 109 ++++++++++-------- .../event_log/sqlite/sqlite_event_log.py | 2 +- ...t_asset_backfill_with_backfill_policies.py | 74 +++++++++++- .../logging_tests/test_python_logging.py | 2 +- .../storage_tests/utils/event_log_storage.py | 70 +++++++++-- .../dagster_postgres/event_log/event_log.py | 28 ++++- 11 files changed, 408 insertions(+), 104 deletions(-) diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index a89f5b8ae3de4..f3649b92fe7a3 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -3,6 +3,7 @@ import logging import os import sys +import uuid from enum import Enum from typing import ( TYPE_CHECKING, @@ -243,6 +244,12 @@ class DagsterEventType(str, Enum): PIPELINE_RUN_STATUS_TO_EVENT_TYPE = {v: k for k, v in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.items()} +# These are the only events currently supported in `EventLogStorage.store_event_batch` +BATCH_WRITABLE_EVENTS = { + DagsterEventType.ASSET_MATERIALIZATION, + DagsterEventType.ASSET_OBSERVATION, +} + ASSET_EVENTS = { DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, @@ -327,7 +334,15 @@ def _validate_event_specific_data( return event_specific_data -def log_step_event(step_context: IStepContext, event: "DagsterEvent") -> None: +def generate_event_batch_id(): + return str(uuid.uuid4()) + + +def log_step_event( + step_context: IStepContext, + event: "DagsterEvent", + batch_metadata: Optional["DagsterEventBatchMetadata"], +) -> None: event_type = DagsterEventType(event.event_type_value) log_level = logging.ERROR if event_type in FAILURE_EVENTS else logging.DEBUG @@ -335,6 +350,7 @@ def log_step_event(step_context: IStepContext, event: "DagsterEvent") -> None: level=log_level, msg=event.message or f"{event_type} for step {step_context.step.key}", dagster_event=event, + batch_metadata=batch_metadata, ) @@ -393,6 +409,11 @@ def handle_unpack_error( ) +class DagsterEventBatchMetadata(NamedTuple): + id: str + is_end: bool + + @whitelist_for_serdes( serializer=DagsterEventSerializer, storage_field_names={ @@ -439,6 +460,7 @@ def from_step( step_context: IStepContext, event_specific_data: Optional["EventSpecificData"] = None, message: Optional[str] = None, + batch_metadata: Optional["DagsterEventBatchMetadata"] = None, ) -> "DagsterEvent": event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, @@ -452,7 +474,7 @@ def from_step( pid=os.getpid(), ) - log_step_event(step_context, event) + log_step_event(step_context, event, batch_metadata) return event @@ -982,6 +1004,7 @@ def step_skipped_event(step_context: IStepContext) -> "DagsterEvent": def asset_materialization( step_context: IStepContext, materialization: AssetMaterialization, + batch_metadata: Optional[DagsterEventBatchMetadata] = None, ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_MATERIALIZATION, @@ -994,16 +1017,20 @@ def asset_materialization( label_clause=f" {materialization.label}" if materialization.label else "" ) ), + batch_metadata=batch_metadata, ) @staticmethod def asset_observation( - step_context: IStepContext, observation: AssetObservation + step_context: IStepContext, + observation: AssetObservation, + batch_metadata: Optional[DagsterEventBatchMetadata] = None, ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_OBSERVATION, step_context=step_context, event_specific_data=AssetObservationData(observation), + batch_metadata=batch_metadata, ) @staticmethod diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index d28c41e6bc5e5..ceb5f81747dd0 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -61,7 +61,7 @@ DagsterTypeCheckError, user_code_error_boundary, ) -from dagster._core.events import DagsterEvent +from dagster._core.events import DagsterEvent, DagsterEventBatchMetadata, generate_event_batch_id from dagster._core.execution.context.compute import enter_execution_context from dagster._core.execution.context.output import OutputContext from dagster._core.execution.context.system import StepExecutionContext, TypeCheckContext @@ -666,6 +666,7 @@ def _get_output_asset_events( if execution_type == AssetExecutionType.MATERIALIZATION: event_class = AssetMaterialization + event_class = AssetMaterialization elif execution_type == AssetExecutionType.OBSERVATION: event_class = AssetObservation else: @@ -922,26 +923,33 @@ def _log_materialization_or_observation_events_for_asset( f"Unexpected asset execution type {execution_type}", ) - yield from ( - ( - _dagster_event_for_asset_event(step_context, event) - for event in _get_output_asset_events( - asset_key, - partitions, - output, - output_def, - manager_metadata, - step_context, - execution_type, - ) + asset_events = list( + _get_output_asset_events( + asset_key, + partitions, + output, + output_def, + manager_metadata, + step_context, + execution_type, ) ) + batch_id = generate_event_batch_id() + last_index = len(asset_events) - 1 + for i, asset_event in enumerate(asset_events): + batch_metadata = ( + DagsterEventBatchMetadata(batch_id, i == last_index) if partitions else None + ) + yield _dagster_event_for_asset_event(step_context, asset_event, batch_metadata) + def _dagster_event_for_asset_event( - step_context: StepExecutionContext, asset_event: Union[AssetMaterialization, AssetObservation] + step_context: StepExecutionContext, + asset_event: Union[AssetMaterialization, AssetObservation], + batch_metadata: Optional[DagsterEventBatchMetadata], ): if isinstance(asset_event, AssetMaterialization): - return DagsterEvent.asset_materialization(step_context, asset_event) + return DagsterEvent.asset_materialization(step_context, asset_event, batch_metadata) else: # observation - return DagsterEvent.asset_observation(step_context, asset_event) + return DagsterEvent.asset_observation(step_context, asset_event, batch_metadata) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 02d95a25ba4a1..9b21ba90d2faa 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -16,6 +16,7 @@ Any, Callable, Dict, + Final, Generic, Iterable, List, @@ -103,6 +104,13 @@ RUNLESS_RUN_ID = "" RUNLESS_JOB_NAME = "" +# Sets the number of events that will be buffered before being written to the event log. Only +# applies to explicitly batched events. Currently this defaults to 0, which turns off batching +# entirely (multiple store_event calls are made instead of store_event_batch). This makes batching +# opt-in. +EVENT_BATCH_SIZE: Final = int(os.getenv("DAGSTER_EVENT_BATCH_SIZE", "0")) + + if TYPE_CHECKING: from dagster._core.debug import DebugRunPayload from dagster._core.definitions.asset_check_spec import AssetCheckKey @@ -122,6 +130,7 @@ from dagster._core.events import ( AssetMaterialization, DagsterEvent, + DagsterEventBatchMetadata, DagsterEventType, EngineEventData, ) @@ -182,6 +191,10 @@ DagsterInstanceOverrides: TypeAlias = Mapping[str, Any] +def _is_batch_writing_enabled() -> bool: + return EVENT_BATCH_SIZE > 0 + + def _check_run_equality( pipeline_run: DagsterRun, candidate_run: DagsterRun ) -> Mapping[str, Tuple[Any, Any]]: @@ -224,18 +237,21 @@ def emit(self, record: logging.LogRecord) -> None: from dagster._core.events import EngineEventData from dagster._core.events.log import StructuredLoggerMessage, construct_event_record + record_metadata = get_log_record_metadata(record) event = construct_event_record( StructuredLoggerMessage( name=record.name, message=record.msg, level=record.levelno, - meta=get_log_record_metadata(record), + meta=record_metadata, record=record, ) ) try: - self._instance.handle_new_event(event) + self._instance.handle_new_event( + event, batch_metadata=record_metadata["dagster_event_batch_metadata"] + ) except Exception as e: sys.stderr.write(f"Exception while writing logger call to event log: {e}\n") if event.dagster_event: @@ -479,6 +495,9 @@ def __init__( " them. Consider switching to Postgres or Mysql.", ) + # Used for batched event handling + self._event_buffer: Dict[str, List[EventLogEntry]] = defaultdict(list) + # ctors @public @@ -2355,16 +2374,62 @@ def get_handlers(self) -> Sequence[logging.Handler]: def store_event(self, event: "EventLogEntry") -> None: self._event_storage.store_event(event) - def handle_new_event(self, event: "EventLogEntry") -> None: - run_id = event.run_id + def handle_new_event( + self, + event: "EventLogEntry", + *, + batch_metadata: Optional["DagsterEventBatchMetadata"] = None, + ) -> None: + """Handle a new event by storing it and notifying subscribers. + + Events may optionally be sent with `batch_metadata`. If batch writing is enabled, then + events sent with `batch_metadata` will not trigger an immediate write. Instead, they will be + kept in a batch-specific buffer (identified by `batch_metadata.id`) until either the buffer + reaches the EVENT_BATCH_SIZE or the end of the batch is reached (signaled by + `batch_metadata.is_end`). When this point is reached, all events in the buffer will be sent + to the storage layer in a single batch. If an error occurrs during batch writing, then we + fall back to iterative individual event writes. - self._event_storage.store_event(event) + Args: + event (EventLogEntry): The event to handle. + batch_metadata (Optional[DagsterEventBatchMetadata]): Metadata for batch writing. + """ + if batch_metadata is None or not _is_batch_writing_enabled(): + events = [event] + else: + batch_id, is_batch_end = batch_metadata.id, batch_metadata.is_end + self._event_buffer[batch_id].append(event) + if is_batch_end or len(self._event_buffer[batch_id]) == EVENT_BATCH_SIZE: + events = self._event_buffer[batch_id] + del self._event_buffer[batch_id] + else: + return + + if len(events) == 1: + self._event_storage.store_event(events[0]) + else: + try: + self._event_storage.store_event_batch(events) + + # Fall back to storing events one by one if writing a batch fails. We catch a generic + # Exception because that is the parent class of the actually received error, + # dagster_cloud_cli.core.errors.GraphQLStorageError, which we cannot import here due to + # it living in a cloud package. + except Exception as e: + sys.stderr.write(f"Exception while storing event batch: {e}\n") + sys.stderr.write( + "Falling back to storing multiple single-event storage requests...\n" + ) + for event in events: + self._event_storage.store_event(event) - if event.is_dagster_event and event.get_dagster_event().is_job_event: - self._run_storage.handle_run_event(run_id, event.get_dagster_event()) + for event in events: + run_id = event.run_id + if event.is_dagster_event and event.get_dagster_event().is_job_event: + self._run_storage.handle_run_event(run_id, event.get_dagster_event()) - for sub in self._subscribers[run_id]: - sub(event) + for sub in self._subscribers[run_id]: + sub(event) def add_event_listener(self, run_id: str, cb) -> None: self._subscribers[run_id].append(cb) diff --git a/python_modules/dagster/dagster/_core/log_manager.py b/python_modules/dagster/dagster/_core/log_manager.py index fcc6c1e68e2b6..a32aac22c12c5 100644 --- a/python_modules/dagster/dagster/_core/log_manager.py +++ b/python_modules/dagster/dagster/_core/log_manager.py @@ -20,7 +20,7 @@ if TYPE_CHECKING: from dagster import DagsterInstance - from dagster._core.events import DagsterEvent + from dagster._core.events import DagsterEvent, DagsterEventBatchMetadata from dagster._core.storage.dagster_run import DagsterRun # Python's logging system allows you to attach arbitrary values to a log message/record by passing a @@ -46,6 +46,23 @@ def has_log_record_event(record: logging.LogRecord) -> bool: return hasattr(record, LOG_RECORD_EVENT_ATTR) +LOG_RECORD_EVENT_BATCH_METADATA_ATTR: Final = "dagster_event_batch_metadata" + + +def get_log_record_event_batch_metadata(record: logging.LogRecord) -> "DagsterEventBatchMetadata": + return cast("DagsterEventBatchMetadata", getattr(record, LOG_RECORD_EVENT_BATCH_METADATA_ATTR)) + + +def set_log_record_event_batch_metadata( + record: logging.LogRecord, event: "DagsterEventBatchMetadata" +) -> None: + setattr(record, LOG_RECORD_EVENT_BATCH_METADATA_ATTR, event) + + +def has_log_record_event_batch_metadata(record: logging.LogRecord) -> bool: + return hasattr(record, LOG_RECORD_EVENT_BATCH_METADATA_ATTR) + + LOG_RECORD_METADATA_ATTR: Final = "dagster_meta" @@ -93,6 +110,7 @@ class DagsterLogRecordMetadata(TypedDict): resource_name: Optional[str] resource_fn_name: Optional[str] dagster_event: Optional["DagsterEvent"] + dagster_event_batch_metadata: Optional["DagsterEventBatchMetadata"] orig_message: str log_message_id: str log_timestamp: str @@ -141,7 +159,10 @@ def _error_str_for_event(event: "DagsterEvent") -> Optional[str]: def construct_log_record_metadata( - handler_metadata: DagsterLogHandlerMetadata, orig_message: str, event: Optional["DagsterEvent"] + handler_metadata: DagsterLogHandlerMetadata, + orig_message: str, + event: Optional["DagsterEvent"], + event_batch_metadata: Optional["DagsterEventBatchMetadata"], ) -> DagsterLogRecordMetadata: step_key = handler_metadata["step_key"] or (event.step_key if event else None) timestamp = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None).isoformat() @@ -156,18 +177,19 @@ def construct_log_record_metadata( log_message_id=make_new_run_id(), log_timestamp=timestamp, dagster_event=event, + dagster_event_batch_metadata=event_batch_metadata, step_key=step_key, ) class DagsterLogHandler(logging.Handler): """Internal class used to turn regular logs into Dagster logs by adding Dagster-specific - metadata (such as pipeline_name or step_key), as well as reformatting the underlying message. + metadata (such as job_name or step_key), as well as reformatting the underlying message. Note: The `loggers` argument will be populated with the set of @loggers supplied to the current - pipeline run. These essentially work as handlers (they do not create their own log messages, - they simply re-log messages that are created from context.log.x() calls), which is why they are - referenced from within this handler class. + run. These essentially work as handlers (they do not create their own log messages, they simply + re-log messages that are created from context.log.x() calls), which is why they are referenced + from within this handler class. """ def __init__( @@ -212,7 +234,14 @@ def _extract_extra(self, record: logging.LogRecord) -> Mapping[str, Any]: def _convert_record(self, record: logging.LogRecord) -> logging.LogRecord: # If this was a logged DagsterEvent, the event will be stored on the record event = get_log_record_event(record) if has_log_record_event(record) else None - metadata = construct_log_record_metadata(self._metadata, record.getMessage(), event) + event_batch_metadata = ( + get_log_record_event_batch_metadata(record) + if has_log_record_event_batch_metadata(record) + else None + ) + metadata = construct_log_record_metadata( + self._metadata, record.getMessage(), event, event_batch_metadata + ) message = construct_log_record_message(metadata) # update the message to be formatted like other dagster logs @@ -356,7 +385,11 @@ def end_python_log_capture(self) -> None: logger.removeHandler(self._dagster_handler) def log_dagster_event( - self, level: Union[str, int], msg: str, dagster_event: "DagsterEvent" + self, + level: Union[str, int], + msg: str, + dagster_event: "DagsterEvent", + batch_metadata: Optional["DagsterEventBatchMetadata"] = None, ) -> None: """Log a DagsterEvent at the given level. Attributes about the context it was logged in (such as the asset or job name) will be automatically attached to the created record. @@ -366,10 +399,24 @@ def log_dagster_event( or an integer level such as logging.INFO or logging.DEBUG. msg (str): message describing the event dagster_event (DagsterEvent): DagsterEvent that will be logged + batch_metadata (BatchMetadata): Metadata about the batch that the event is a part of. """ - self.log(level=level, msg=msg, extra={LOG_RECORD_EVENT_ATTR: dagster_event}) + self.log( + level=level, + msg=msg, + extra={ + LOG_RECORD_EVENT_ATTR: dagster_event, + LOG_RECORD_EVENT_BATCH_METADATA_ATTR: batch_metadata, + }, + ) - def log(self, level: Union[str, int], msg: object, *args: Any, **kwargs: Any) -> None: + def log( + self, + level: Union[str, int], + msg: object, + *args: Any, + **kwargs: Any, + ) -> None: """Log a message at the given level. Attributes about the context it was logged in (such as the asset or job name) will be automatically attached to the created record. diff --git a/python_modules/dagster/dagster/_core/storage/event_log/base.py b/python_modules/dagster/dagster/_core/storage/event_log/base.py index 03c20b4ed29cf..774d92eedd597 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/base.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/base.py @@ -196,6 +196,10 @@ def store_event(self, event: "EventLogEntry") -> None: event (EventLogEntry): The event to store. """ + def store_event_batch(self, events: Sequence["EventLogEntry"]) -> None: + for event in events: + self.store_event(event) + @abstractmethod def delete_events(self, run_id: str) -> None: """Remove events for a given run id.""" diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py index f27922c94db4b..91e845857a60e 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py @@ -183,36 +183,46 @@ def upgrade(self) -> None: def has_table(self, table_name: str) -> bool: """This method checks if a table exists in the database.""" - def prepare_insert_event(self, event): + def prepare_insert_event(self, event: EventLogEntry) -> Any: """Helper method for preparing the event log SQL insertion statement. Abstracted away to have a single place for the logical table representation of the event, while having a way for SQL backends to implement different execution implementations for `store_event`. See the `dagster-postgres` implementation which overrides the generic SQL implementation of `store_event`. """ + # https://stackoverflow.com/a/54386260/324449 + return SqlEventLogStorageTable.insert().values(**self._event_to_row(event)) + + def prepare_insert_event_batch(self, events: Sequence[EventLogEntry]) -> Any: + # https://stackoverflow.com/a/54386260/324449 + return SqlEventLogStorageTable.insert().values( + [self._event_to_row(event) for event in events] + ) + + def _event_to_row(self, event: EventLogEntry) -> Dict[str, Any]: dagster_event_type = None asset_key_str = None partition = None step_key = event.step_key if event.is_dagster_event: - dagster_event_type = event.dagster_event.event_type_value - step_key = event.dagster_event.step_key - if event.dagster_event.asset_key: - check.inst_param(event.dagster_event.asset_key, "asset_key", AssetKey) - asset_key_str = event.dagster_event.asset_key.to_string() - if event.dagster_event.partition: - partition = event.dagster_event.partition + dagster_event = event.get_dagster_event() + dagster_event_type = dagster_event.event_type_value + step_key = dagster_event.step_key + if dagster_event.asset_key: + check.inst_param(dagster_event.asset_key, "asset_key", AssetKey) + asset_key_str = dagster_event.asset_key.to_string() + if dagster_event.partition: + partition = dagster_event.partition - # https://stackoverflow.com/a/54386260/324449 - return SqlEventLogStorageTable.insert().values( - run_id=event.run_id, - event=serialize_value(event), - dagster_event_type=dagster_event_type, - timestamp=self._event_insert_timestamp(event), - step_key=step_key, - asset_key=asset_key_str, - partition=partition, - ) + return { + "run_id": event.run_id, + "event": serialize_value(event), + "dagster_event_type": dagster_event_type, + "timestamp": self._event_insert_timestamp(event), + "step_key": step_key, + "asset_key": asset_key_str, + "partition": partition, + } def has_asset_key_col(self, column_name: str) -> bool: with self.index_connection() as conn: @@ -389,41 +399,40 @@ def add_asset_event_tags( ], ) - def store_asset_event_tags(self, event: EventLogEntry, event_id: int) -> None: - check.inst_param(event, "event", EventLogEntry) - check.int_param(event_id, "event_id") + def store_asset_event_tags( + self, events: Sequence[EventLogEntry], event_ids: Sequence[int] + ) -> None: + check.sequence_param(events, "events", EventLogEntry) + check.sequence_param(event_ids, "event_ids", int) + + all_values = [ + dict( + event_id=event_id, + asset_key=check.not_none(event.get_dagster_event().asset_key).to_string(), + key=key, + value=value, + event_timestamp=self._event_insert_timestamp(event), + ) + for event_id, event in zip(event_ids, events) + for key, value in self._tags_for_asset_event(event).items() + ] + + # Only execute if tags table exists. This is to support OSS users who have not yet run the + # migration to create the table. On read, we will throw an error if the table does not + # exist. + if len(all_values) > 0 and self.has_table(AssetEventTagsTable.name): + with self.index_connection() as conn: + conn.execute(AssetEventTagsTable.insert(), all_values) + def _tags_for_asset_event(self, event: EventLogEntry) -> Mapping[str, str]: if event.dagster_event and event.dagster_event.asset_key: if event.dagster_event.is_step_materialization: - tags = event.dagster_event.step_materialization_data.materialization.tags - elif event.dagster_event.is_asset_observation: - tags = event.dagster_event.asset_observation_data.asset_observation.tags - else: - tags = None - - if not tags or not self.has_table(AssetEventTagsTable.name): - # If tags table does not exist, silently exit. This is to support OSS - # users who have not yet run the migration to create the table. - # On read, we will throw an error if the table does not exist. - return - - check.inst_param(event.dagster_event.asset_key, "asset_key", AssetKey) - asset_key_str = event.dagster_event.asset_key.to_string() - - with self.index_connection() as conn: - conn.execute( - AssetEventTagsTable.insert(), - [ - dict( - event_id=event_id, - asset_key=asset_key_str, - key=key, - value=value, - event_timestamp=self._event_insert_timestamp(event), - ) - for key, value in tags.items() - ], + return ( + event.get_dagster_event().step_materialization_data.materialization.tags or {} ) + elif event.dagster_event.is_asset_observation: + return event.get_dagster_event().asset_observation_data.asset_observation.tags + return {} def store_event(self, event: EventLogEntry) -> None: """Store an event corresponding to a pipeline run. @@ -453,7 +462,7 @@ def store_event(self, event: EventLogEntry) -> None: "Cannot store asset event tags for null event id." ) - self.store_asset_event_tags(event, event_id) + self.store_asset_event_tags([event], [event_id]) if event.is_dagster_event and event.dagster_event_type in ASSET_CHECK_EVENTS: self.store_asset_check_event(event, event_id) diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py index cb6df489ca0b9..e8773a5391eb8 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py @@ -263,7 +263,7 @@ def store_event(self, event: EventLogEntry) -> None: "Cannot store asset event tags for null event id." ) - self.store_asset_event_tags(event, event_id) + self.store_asset_event_tags([event], [event_id]) if event.is_dagster_event and event.dagster_event_type in ASSET_CHECK_EVENTS: self.store_asset_check_event(event, None) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py index 9abdc32988b7d..8a148a4924a76 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill_with_backfill_policies.py @@ -1,5 +1,6 @@ import math -from unittest.mock import MagicMock +from contextlib import ExitStack +from unittest.mock import MagicMock, patch import pendulum import pytest @@ -14,8 +15,12 @@ WeeklyPartitionsDefinition, asset, ) +from dagster._core.definitions.partition import StaticPartitionsDefinition from dagster._core.errors import DagsterBackfillFailedError +from dagster._core.event_api import EventRecordsFilter +from dagster._core.events import DagsterEventType from dagster._core.execution.asset_backfill import AssetBackfillData, AssetBackfillStatus +from dagster._core.instance_for_test import instance_for_test from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, @@ -1002,3 +1007,70 @@ def foo_child(): PartitionKeyRange("2023-10-03", "2023-10-04"), PartitionKeyRange("2023-10-05", "2023-10-06"), ] + + +# 0 turns off batching +# 2 will require multiple batches to fulfill the backfill +# 10 will require a single to fulfill the backfill +@pytest.mark.parametrize("batch_size", [0, 2, 10]) +@pytest.mark.parametrize("throw_store_event_batch_error", [False, True]) +def test_single_run_backfill_full_execution( + batch_size: int, throw_store_event_batch_error: bool, capsys +): + time_now = pendulum.now("UTC") + + partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d"]) + + @asset(partitions_def=partitions_def, backfill_policy=BackfillPolicy.single_run()) + def partitioned_asset(): + return {"a": 1, "b": 2} + + assets_by_repo_name = { + "repo": [ + partitioned_asset, + ] + } + asset_graph = get_asset_graph(assets_by_repo_name) + + backfill_data = AssetBackfillData.from_asset_partitions( + partition_names=None, + asset_graph=asset_graph, + asset_selection=[ + partitioned_asset.key, + ], + dynamic_partitions_store=MagicMock(), + all_partitions=True, + backfill_start_time=time_now, + ) + + with instance_for_test() as instance: + with ExitStack() as stack: + stack.enter_context(patch("dagster._core.instance.EVENT_BATCH_SIZE", new=batch_size)) + if throw_store_event_batch_error: + stack.enter_context( + patch( + "dagster._core.storage.event_log.base.EventLogStorage.store_event_batch", + side_effect=Exception("failed"), + ) + ) + run_backfill_to_completion( + asset_graph, + assets_by_repo_name, + backfill_data, + [], + instance, + ) + events = instance.get_event_records( + EventRecordsFilter( + asset_key=partitioned_asset.key, + event_type=DagsterEventType.ASSET_MATERIALIZATION, + ) + ) + assert len(events) == 4 + + if batch_size > 0 and throw_store_event_batch_error: + stderr = capsys.readouterr().err + assert "Exception while storing event batch" in stderr + assert ( + "Falling back to storing multiple single-event storage requests...\n" in stderr + ) diff --git a/python_modules/dagster/dagster_tests/logging_tests/test_python_logging.py b/python_modules/dagster/dagster_tests/logging_tests/test_python_logging.py index fd1d028d5a2e4..a39139094b45c 100644 --- a/python_modules/dagster/dagster_tests/logging_tests/test_python_logging.py +++ b/python_modules/dagster/dagster_tests/logging_tests/test_python_logging.py @@ -403,7 +403,7 @@ def test_failure_logging(managed_loggers, reset_logging): orig_handle_new_event = instance.handle_new_event # run still succeeds even if user-generated logger writes fail - def _fake_handle_new_event(event): + def _fake_handle_new_event(event, *, batch_metadata=None): # fail all user-generated log calls if not event.dagster_event: raise Exception("failed writing user-generated event") diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index a807f23d77c49..5434ebf27bdf9 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -48,7 +48,7 @@ from dagster._core.definitions.dependency import NodeHandle from dagster._core.definitions.job_base import InMemoryJob from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionKey -from dagster._core.definitions.partition import PartitionKeyRange +from dagster._core.definitions.partition import PartitionKeyRange, StaticPartitionsDefinition from dagster._core.definitions.time_window_partitions import ( DailyPartitionsDefinition, PartitionKeysTimeWindowPartitionsSubset, @@ -92,8 +92,13 @@ ) from dagster._core.storage.event_log.schema import SqlEventLogStorageTable from dagster._core.storage.event_log.sqlite.sqlite_event_log import SqliteEventLogStorage +from dagster._core.storage.io_manager import IOManager from dagster._core.storage.partition_status_cache import AssetStatusCacheValue from dagster._core.storage.sqlalchemy_compat import db_select +from dagster._core.storage.tags import ( + ASSET_PARTITION_RANGE_END_TAG, + ASSET_PARTITION_RANGE_START_TAG, +) from dagster._core.test_utils import create_run_for_test, instance_for_test from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.utils import make_new_run_id @@ -257,20 +262,31 @@ def _default_loggers(event_callback): # This exists to create synthetic events to test the store def _synthesize_events( - ops_fn, run_id=None, check_success=True, instance=None, run_config=None + ops_fn_or_assets, run_id=None, check_success=True, instance=None, run_config=None, tags=None ) -> Tuple[List[EventLogEntry], JobExecutionResult]: events = [] def _append_event(event): events.append(event) - @job( - resource_defs=_default_resources(), - logger_defs=_default_loggers(_append_event), - executor_def=in_process_executor, - ) - def a_job(): - ops_fn() + if isinstance(ops_fn_or_assets, list): # assets + job_def = Definitions( + assets=ops_fn_or_assets, + loggers=_default_loggers(_append_event), + resources=_default_resources(), + executor=in_process_executor, + ).get_implicit_job_def_for_assets([k for a in ops_fn_or_assets for k in a.keys]) + assert job_def + a_job = job_def + else: # op_fn + + @job( + resource_defs=_default_resources(), + logger_defs=_default_loggers(_append_event), + executor_def=in_process_executor, + ) + def a_job(): + ops_fn_or_assets() result = None @@ -283,7 +299,9 @@ def a_job(): **(run_config if run_config else {}), } - dagster_run = instance.create_run_for_job(a_job, run_id=run_id, run_config=run_config) + dagster_run = instance.create_run_for_job( + a_job, run_id=run_id, run_config=run_config, tags=tags + ) result = execute_run(InMemoryJob(a_job), dagster_run, instance) if check_success: @@ -1107,6 +1125,38 @@ def _ops(): assert record.event_log_entry.dagster_event.asset_key == asset_key assert result.cursor == EventLogCursor.from_storage_id(record.storage_id).to_string() + def test_asset_materialization_range(self, storage, test_run_id): + partitions_def = StaticPartitionsDefinition(["a", "b"]) + + class DummyIOManager(IOManager): + def handle_output(self, context, obj): + pass + + def load_input(self, context): + return 1 + + @asset(partitions_def=partitions_def, io_manager_def=DummyIOManager()) + def foo(): + return {"a": 1, "b": 2} + + with instance_for_test() as instance: + if not storage.has_instance: + storage.register_instance(instance) + + events, _ = _synthesize_events( + [foo], + instance=instance, + run_id=test_run_id, + tags={ASSET_PARTITION_RANGE_START_TAG: "a", ASSET_PARTITION_RANGE_END_TAG: "b"}, + ) + materializations = [ + e for e in events if e.dagster_event.event_type == "ASSET_MATERIALIZATION" + ] + storage.store_event_batch(materializations) + + result = storage.fetch_materializations(foo.key, limit=100) + assert len(result.records) == 2 + def test_asset_materialization_null_key_fails(self): with pytest.raises(check.CheckError): AssetMaterialization(asset_key=None) diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/event_log.py b/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/event_log.py index ff9fb78b9d756..3f2603851b5b1 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/event_log.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/event_log.py @@ -1,5 +1,5 @@ from contextlib import contextmanager -from typing import Any, ContextManager, Iterator, Mapping, Optional, Sequence +from typing import Any, ContextManager, Iterator, Mapping, Optional, Sequence, cast import dagster._check as check import sqlalchemy as db @@ -8,7 +8,7 @@ from dagster._config.config_schema import UserConfigSchema from dagster._core.errors import DagsterInvariantViolationError from dagster._core.event_api import EventHandlerFn -from dagster._core.events import ASSET_CHECK_EVENTS, ASSET_EVENTS +from dagster._core.events import ASSET_CHECK_EVENTS, ASSET_EVENTS, BATCH_WRITABLE_EVENTS from dagster._core.events.log import EventLogEntry from dagster._core.storage.config import pg_config from dagster._core.storage.event_log import ( @@ -173,6 +173,7 @@ def store_event(self, event: EventLogEntry) -> None: event (EventLogEntry): The event to store. """ check.inst_param(event, "event", EventLogEntry) + insert_event_statement = self.prepare_insert_event(event) # from SqlEventLogStorage.py with self._connect() as conn: result = conn.execute( @@ -202,11 +203,32 @@ def store_event(self, event: EventLogEntry) -> None: "Cannot store asset event tags for null event id." ) - self.store_asset_event_tags(event, event_id) + self.store_asset_event_tags([event], [event_id]) if event.is_dagster_event and event.dagster_event_type in ASSET_CHECK_EVENTS: self.store_asset_check_event(event, event_id) + def store_event_batch(self, events: Sequence[EventLogEntry]) -> None: + check.sequence_param(events, "event", of_type=EventLogEntry) + + check.invariant( + all(event.get_dagster_event().event_type in BATCH_WRITABLE_EVENTS for event in events), + f"{BATCH_WRITABLE_EVENTS} are the only currently supported events for batch writes.", + ) + + insert_event_statement = self.prepare_insert_event_batch(events) + with self._connect() as conn: + result = conn.execute(insert_event_statement.returning(SqlEventLogStorageTable.c.id)) + event_ids = [cast(int, row[0]) for row in result.fetchall()] + + # We only update the asset table with the last event + self.store_asset_event(events[-1], event_ids[-1]) + + if any((event_id is None for event_id in event_ids)): + raise DagsterInvariantViolationError("Cannot store asset event tags for null event id.") + + self.store_asset_event_tags(events, event_ids) + def store_asset_event(self, event: EventLogEntry, event_id: int) -> None: check.inst_param(event, "event", EventLogEntry) if not (event.dagster_event and event.dagster_event.asset_key):