Skip to content

Commit

Permalink
Merge pull request #321 from DalgoT4D/296-select-airbyte-cursor-field
Browse files Browse the repository at this point in the history
updating cursor field in edit and create connection apis
  • Loading branch information
fatchat authored Sep 6, 2023
2 parents da76c88 + b047f16 commit 7966bb9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
19 changes: 17 additions & 2 deletions ddpui/ddpairbyte/airbyte_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,13 +710,20 @@ def create_connection(
stream_name in selected_streams
and selected_streams[stream_name]["selected"]
):
# set schema_cat['config']['syncMode']
# from schema_cat['stream']['supportedSyncModes'] here
schema_cat["config"]["selected"] = True
schema_cat["config"]["syncMode"] = selected_streams[stream_name]["syncMode"]
schema_cat["config"]["destinationSyncMode"] = selected_streams[stream_name][
"destinationSyncMode"
]
# update the cursorField when the mode is incremental
# weirdhly the cursor field is an array of single element eg ["created_on"] or []
if schema_cat["config"]["syncMode"] == "incremental":
schema_cat["config"]["cursorField"] = [
selected_streams[stream_name]["cursorField"]
]
else:
schema_cat["config"]["cursorField"] = []

payload["syncCatalog"]["streams"].append(schema_cat)

res = abreq("connections/create", payload)
Expand Down Expand Up @@ -769,6 +776,14 @@ def update_connection(
schema_cat["config"]["destinationSyncMode"] = selected_streams[stream_name][
"destinationSyncMode"
]
# update the cursorField when the mode is incremental
# weirdhly the cursor field is an array of single element eg ["created_on"] or []
if schema_cat["config"]["syncMode"] == "incremental":
schema_cat["config"]["cursorField"] = [
selected_streams[stream_name]["cursorField"]
]
else:
schema_cat["config"]["cursorField"] = []
current_connection["syncCatalog"]["streams"].append(schema_cat)

res = abreq("connections/update", current_connection)
Expand Down
4 changes: 4 additions & 0 deletions ddpui/tests/integration_tests/test_airbyte_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ def test_a_create_connection(
"selected": True,
"syncMode": "full_refresh",
"destinationSyncMode": "overwrite",
"cursorField": "default",
}
],
normalize=False,
Expand All @@ -437,6 +438,7 @@ def test_a_create_connection(
for stream in conn["syncCatalog"]["streams"]:
assert "config" in stream
assert stream["config"]["selected"] is True
assert stream["config"]["cursorField"] == []

except ValidationError as error:
raise ValueError(f"Response validation failed: {error.errors()}") from error
Expand Down Expand Up @@ -466,6 +468,7 @@ def test_update_connection(
"selected": True,
"syncMode": "full_refresh",
"destinationSyncMode": "append",
"cursorField": ["default"],
}
],
name="New Connection Name",
Expand All @@ -485,6 +488,7 @@ def test_update_connection(
for stream in conn["syncCatalog"]["streams"]:
assert "config" in stream
assert stream["config"]["selected"] is True
assert stream["config"]["cursorField"] == []

except ValidationError as error:
raise ValueError(f"Response validation failed: {error.errors()}") from error
Expand Down

0 comments on commit 7966bb9

Please sign in to comment.