Skip to content

Commit

Permalink
test: Docstring updates and more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Jun 10, 2024
1 parent c418deb commit ffba41d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,65 @@ async def test_bigquery_export_workflow(
)


@pytest.mark.parametrize("interval", ["hour"])
@pytest.mark.parametrize("exclude_events", [None], indirect=True)
@pytest.mark.parametrize("batch_export_schema", TEST_SCHEMAS)
async def test_bigquery_export_workflow_without_events(
clickhouse_client,
bigquery_batch_export,
interval,
exclude_events,
ateam,
table_id,
use_json_type,
batch_export_schema,
):
"""Test the BigQuery Export Workflow without any events to export.
The workflow should update the batch export run status to completed and set 0 as `records_completed`.
"""
data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")

workflow_id = str(uuid4())
inputs = BigQueryBatchExportInputs(
team_id=ateam.pk,
batch_export_id=str(bigquery_batch_export.id),
data_interval_end=data_interval_end.isoformat(),
interval=interval,
batch_export_schema=batch_export_schema,
**bigquery_batch_export.destination.config,
)

with freeze_time(TEST_TIME):
async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
task_queue=settings.TEMPORAL_TASK_QUEUE,
workflows=[BigQueryBatchExportWorkflow],
activities=[
start_batch_export_run,
insert_into_bigquery_activity,
finish_batch_export_run,
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
await activity_environment.client.execute_workflow(
BigQueryBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=10),
)

runs = await afetch_batch_export_runs(batch_export_id=bigquery_batch_export.id)
assert len(runs) == 1

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 0


async def test_bigquery_export_workflow_handles_insert_activity_errors(ateam, bigquery_batch_export, interval):
"""Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data."""
data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ async def test_postgres_export_workflow(
)


@pytest.mark.parametrize("interval", ["hour", "day"], indirect=True)
@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True)
@pytest.mark.parametrize("interval", ["hour"], indirect=True)
@pytest.mark.parametrize("exclude_events", [None], indirect=True)
@pytest.mark.parametrize("batch_export_schema", TEST_SCHEMAS)
async def test_postgres_export_workflow_without_events(
clickhouse_client,
Expand All @@ -415,10 +415,9 @@ async def test_postgres_export_workflow_without_events(
table_name,
batch_export_schema,
):
"""Test Postgres Export Workflow end-to-end by using a local PG database.
"""Test Postgres Export Workflow end-to-end without any events to export.
The workflow should update the batch export run status to completed and produce the expected
records to the local development PostgreSQL database.
The workflow should update the batch export run status to completed and set 0 as `records_completed`.
"""
data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,9 @@ async def test_s3_export_workflow_with_minio_bucket_without_events(
s3_key_prefix,
batch_export_schema,
):
"""Test S3BatchExport Workflow end-to-end by using a local MinIO bucket instead of S3.
The workflow should update the batch export run status to completed and produce the expected
records to the MinIO bucket.
"""Test S3BatchExport Workflow end-to-end without any events to export.
We use a BatchExport model to provide accurate inputs to the Workflow and because the Workflow
will require its prescense in the database when running. This model is indirectly parametrized
by several fixtures. Refer to them for more information.
The workflow should update the batch export run status to completed and set 0 as `records_completed`.
"""
data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")

Expand Down

0 comments on commit ffba41d

Please sign in to comment.