diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index 3a06ca981a14f..b74e784492556 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -473,6 +473,65 @@ async def test_bigquery_export_workflow( ) +@pytest.mark.parametrize("interval", ["hour"]) +@pytest.mark.parametrize("exclude_events", [None], indirect=True) +@pytest.mark.parametrize("batch_export_schema", TEST_SCHEMAS) +async def test_bigquery_export_workflow_without_events( + clickhouse_client, + bigquery_batch_export, + interval, + exclude_events, + ateam, + table_id, + use_json_type, + batch_export_schema, +): + """Test the BigQuery Export Workflow without any events to export. + + The workflow should update the batch export run status to completed and set 0 as `records_completed`. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = BigQueryBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(bigquery_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + batch_export_schema=batch_export_schema, + **bigquery_batch_export.destination.config, + ) + + with freeze_time(TEST_TIME): + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[BigQueryBatchExportWorkflow], + activities=[ + start_batch_export_run, + insert_into_bigquery_activity, + finish_batch_export_run, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + await activity_environment.client.execute_workflow( + BigQueryBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) + + runs = await afetch_batch_export_runs(batch_export_id=bigquery_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Completed" + assert run.records_completed == 0 + + async def test_bigquery_export_workflow_handles_insert_activity_errors(ateam, bigquery_batch_export, interval): """Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data.""" data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index de25cff0366f7..5dedb8c8c0faf 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -401,8 +401,8 @@ async def test_postgres_export_workflow( ) -@pytest.mark.parametrize("interval", ["hour", "day"], indirect=True) -@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) +@pytest.mark.parametrize("interval", ["hour"], indirect=True) +@pytest.mark.parametrize("exclude_events", [None], indirect=True) @pytest.mark.parametrize("batch_export_schema", TEST_SCHEMAS) async def test_postgres_export_workflow_without_events( clickhouse_client, @@ -415,10 +415,9 @@ async def test_postgres_export_workflow_without_events( table_name, batch_export_schema, ): - """Test Postgres Export Workflow end-to-end by using a local PG database. + """Test Postgres Export Workflow end-to-end without any events to export. - The workflow should update the batch export run status to completed and produce the expected - records to the local development PostgreSQL database. + The workflow should update the batch export run status to completed and set 0 as `records_completed`. """ data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index bc6728816e145..77d9e6a5486bb 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -544,14 +544,9 @@ async def test_s3_export_workflow_with_minio_bucket_without_events( s3_key_prefix, batch_export_schema, ): - """Test S3BatchExport Workflow end-to-end by using a local MinIO bucket instead of S3. - - The workflow should update the batch export run status to completed and produce the expected - records to the MinIO bucket. + """Test S3BatchExport Workflow end-to-end without any events to export. - We use a BatchExport model to provide accurate inputs to the Workflow and because the Workflow - will require its prescense in the database when running. This model is indirectly parametrized - by several fixtures. Refer to them for more information. + The workflow should update the batch export run status to completed and set 0 as `records_completed`. """ data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")