Skip to content

Commit

Permalink
fix(ingestion): fix stateful ingestion for GCS source
Browse files Browse the repository at this point in the history
Remove pipeline name before passing context to equivalent s3 source to avoid error "Checkpointing provider DatahubIngestionCheckpointingProvider already registered."
  • Loading branch information
josges committed Nov 18, 2024
1 parent 37fa076 commit f2601d2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import logging
from typing import Dict, Iterable, List, Optional
from urllib.parse import unquote
Expand Down Expand Up @@ -88,7 +89,10 @@ def __init__(self, config: GCSSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.config = config
self.report = GCSSourceReport()
self.s3_source = self.create_equivalent_s3_source(ctx)
self.platform: str = "gcs"
s3_ctx = copy.deepcopy(ctx)
s3_ctx.pipeline_name = None
self.s3_source = self.create_equivalent_s3_source(s3_ctx)

@classmethod
def create(cls, config_dict, ctx):
Expand Down
7 changes: 6 additions & 1 deletion metadata-ingestion/tests/unit/test_gcs_source.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from unittest import mock

import pytest
from pydantic import ValidationError

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.data_lake_common.data_lake_utils import PLATFORM_GCS
from datahub.ingestion.source.gcs.gcs_source import GCSSource


def test_gcs_source_setup():
ctx = PipelineContext(run_id="test-gcs")
graph = mock.MagicMock(spec=DataHubGraph)
ctx = PipelineContext(run_id="test-gcs", graph=graph, pipeline_name="test-gcs")

# Baseline: valid config
source: dict = {
Expand All @@ -18,6 +22,7 @@ def test_gcs_source_setup():
}
],
"credential": {"hmac_access_id": "id", "hmac_access_secret": "secret"},
"stateful_ingestion": {"enabled": "true"},
}
gcs = GCSSource.create(source, ctx)
assert gcs.s3_source.source_config.platform == PLATFORM_GCS
Expand Down

0 comments on commit f2601d2

Please sign in to comment.