Skip to content

Commit

Permalink
Merge branch 'master' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeroendevr authored Jul 15, 2024
2 parents d676f4b + 08203c7 commit 6ae6d00
Show file tree
Hide file tree
Showing 1,062 changed files with 12,681 additions and 13,554 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.63.7
current_version = 0.63.8
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
runs-on: connector-publish-large
steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Publish modified connectors [On merge to master]
id: publish-modified-connectors
if: github.event_name == 'push'
Expand Down Expand Up @@ -101,7 +101,7 @@ jobs:
if: ${{ failure() && github.ref == 'refs/heads/master' }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Match GitHub User to Slack User
id: match-github-to-slack-user
uses: ./.github/actions/match-github-to-slack-user
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run-mypy-on-modified-cdk-files.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Connector Extensibility - Run mypy on modified cdk files
name: Python CDK - Run mypy on modified cdk files

on:
pull_request:
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 3.4.1
resumable full refresh: Fix bug where checkpoint reader stops syncing too early if first partition is complete

## 3.4.0
file-based cdk: add config option to limit number of files for schema discover

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class CheckpointMode(Enum):
FULL_REFRESH = "full_refresh"


FULL_REFRESH_COMPLETE_STATE: Mapping[str, Any] = {"__ab_full_refresh_sync_complete": True}


class CheckpointReader(ABC):
"""
CheckpointReader manages how to iterate over a stream's partitions and serves as the bridge for interpreting the current state
Expand Down Expand Up @@ -111,24 +114,43 @@ def next(self) -> Optional[Mapping[str, Any]]:
"""

try:
if self._current_slice is None:
self._current_slice = self._get_next_slice()
state_for_slice = self._cursor.select_state(self._current_slice)
if state_for_slice == {"__ab_full_refresh_sync_complete": True}:
return None
else:
return self._current_slice
if self._read_state_from_cursor:
state_for_slice = self._cursor.select_state(self._current_slice)
if state_for_slice == {"__ab_full_refresh_sync_complete": True}:
self._current_slice = self._get_next_slice()
# We need to check that `current_slice is None` as opposed to `not current_slice` because the current_slice
# could be the empty StreamSlice() which derives to the falsy empty mapping {}. The slice still requires
# iterating over the cursor state in the else block until it hits the terminal value
if self.current_slice is None:
next_slice = self._get_next_slice()
state_for_slice = self._cursor.select_state(next_slice)
if state_for_slice == FULL_REFRESH_COMPLETE_STATE:
# This is a dummy initialization since we'll iterate at least once to get the next slice
next_candidate_slice = StreamSlice(cursor_slice={}, partition={})
has_more = True
while has_more:
next_candidate_slice = self._get_next_slice()
state_for_slice = self._cursor.select_state(next_candidate_slice)
has_more = state_for_slice == FULL_REFRESH_COMPLETE_STATE
self.current_slice = StreamSlice(cursor_slice=state_for_slice or {}, partition=next_candidate_slice.partition)
else:
self.current_slice = StreamSlice(cursor_slice=state_for_slice or {}, partition=next_slice.partition)
else:
self._current_slice = StreamSlice(cursor_slice=state_for_slice or {}, partition=self._current_slice.partition)
state_for_slice = self._cursor.select_state(self.current_slice)
if state_for_slice == FULL_REFRESH_COMPLETE_STATE:
# Skip every slice that already has the terminal complete value indicating that a previous attempt
# successfully synced the slice
next_candidate_slice = None
has_more = True
while has_more:
next_candidate_slice = self._get_next_slice()
state_for_slice = self._cursor.select_state(next_candidate_slice)
has_more = state_for_slice == FULL_REFRESH_COMPLETE_STATE
self.current_slice = StreamSlice(cursor_slice=state_for_slice or {}, partition=next_candidate_slice.partition)
else:
self.current_slice = StreamSlice(cursor_slice=state_for_slice or {}, partition=self.current_slice.partition)
else:
# Unlike RFR cursors that iterate dynamically based on how stream state is updated, most cursors operate on a
# fixed set of slices determined before reading records. They should just iterate to the next slice
self._current_slice = self._get_next_slice()
return self._current_slice
self.current_slice = self._get_next_slice()
return self.current_slice
except StopIteration:
self._finished_sync = True
return None
Expand All @@ -155,6 +177,14 @@ def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
else:
return None

@property
def current_slice(self) -> Optional[StreamSlice]:
return self._current_slice

@current_slice.setter
def current_slice(self, value: StreamSlice) -> None:
self._current_slice = value


class ResumableFullRefreshCheckpointReader(CheckpointReader):
"""
Expand All @@ -173,7 +203,7 @@ def next(self) -> Optional[Mapping[str, Any]]:
if self._first_page:
self._first_page = False
return self._state
elif self._state == {"__ab_full_refresh_sync_complete": True}:
elif self._state == FULL_REFRESH_COMPLETE_STATE:
return None
else:
return self._state
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "airbyte-cdk"
version = "3.4.0"
version = "3.4.1"
description = "A framework for writing Airbyte Connectors."
authors = ["Airbyte <[email protected]>"]
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def test_cursor_based_checkpoint_reader_incremental():

incremental_cursor = Mock()
incremental_cursor.stream_slices.return_value = expected_slices
incremental_cursor.select_state.return_value = expected_stream_state
incremental_cursor.get_stream_state.return_value = expected_stream_state

checkpoint_reader = CursorBasedCheckpointReader(
Expand All @@ -126,7 +127,6 @@ def test_cursor_based_checkpoint_reader_incremental():
def test_cursor_based_checkpoint_reader_resumable_full_refresh():
expected_slices = [
StreamSlice(cursor_slice={}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 2}, partition={}), # The reader calls select_state() on first stream slice retrieved
StreamSlice(cursor_slice={"next_page_token": 2}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 3}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 4}, partition={}),
Expand All @@ -137,7 +137,7 @@ def test_cursor_based_checkpoint_reader_resumable_full_refresh():

rfr_cursor = Mock()
rfr_cursor.stream_slices.return_value = [StreamSlice(cursor_slice={}, partition={})]
rfr_cursor.select_state.side_effect = expected_slices[1:]
rfr_cursor.select_state.side_effect = expected_slices
rfr_cursor.get_stream_state.return_value = expected_stream_state

checkpoint_reader = CursorBasedCheckpointReader(
Expand All @@ -147,9 +147,9 @@ def test_cursor_based_checkpoint_reader_resumable_full_refresh():
assert checkpoint_reader.next() == expected_slices[0]
actual_state = checkpoint_reader.get_checkpoint()
assert actual_state == expected_stream_state
assert checkpoint_reader.next() == expected_slices[1]
assert checkpoint_reader.next() == expected_slices[2]
assert checkpoint_reader.next() == expected_slices[3]
assert checkpoint_reader.next() == expected_slices[4]
finished = checkpoint_reader.next()
assert finished is None

Expand All @@ -159,29 +159,75 @@ def test_cursor_based_checkpoint_reader_resumable_full_refresh():

def test_cursor_based_checkpoint_reader_resumable_full_refresh_parents():
expected_slices = [
StreamSlice(cursor_slice={"start_date": "2024-01-01", "end_date": "2024-02-01"}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 2}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 3}, partition={}),
StreamSlice(cursor_slice={"start_date": "2024-02-01", "end_date": "2024-03-01"}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 2}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 3}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 2}, partition={"parent_id": "zaheer"}),
StreamSlice(cursor_slice={"next_page_token": 3}, partition={"parent_id": "zaheer"}),
StreamSlice(cursor_slice={"next_page_token": 2}, partition={"parent_id": "pli"}),
StreamSlice(cursor_slice={"next_page_token": 3}, partition={"parent_id": "pli"}),
]

expected_stream_state = {"next_page_token": 2}

rfr_cursor = Mock()
rfr_cursor.stream_slices.return_value = [
StreamSlice(cursor_slice={"start_date": "2024-01-01", "end_date": "2024-02-01"}, partition={}),
StreamSlice(cursor_slice={"start_date": "2024-02-01", "end_date": "2024-03-01"}, partition={}),
StreamSlice(cursor_slice={}, partition={"parent_id": "zaheer"}),
StreamSlice(cursor_slice={}, partition={"parent_id": "pli"}),
]
rfr_cursor.select_state.side_effect = [
StreamSlice(cursor_slice={"next_page_token": 2}, partition={}), # Accounts for the first invocation when getting the first element
StreamSlice(cursor_slice={"next_page_token": 2}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 3}, partition={}),
StreamSlice(cursor_slice={"__ab_full_refresh_sync_complete": True}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 2}, partition={}),
StreamSlice(cursor_slice={"next_page_token": 3}, partition={}),
StreamSlice(cursor_slice={"__ab_full_refresh_sync_complete": True}, partition={}),
{"next_page_token": 2},
{"next_page_token": 3},
{"__ab_full_refresh_sync_complete": True},
{"next_page_token": 2},
{"next_page_token": 3},
{"__ab_full_refresh_sync_complete": True},
]
rfr_cursor.get_stream_state.return_value = expected_stream_state

checkpoint_reader = CursorBasedCheckpointReader(
cursor=rfr_cursor, stream_slices=rfr_cursor.stream_slices(), read_state_from_cursor=True
)

assert checkpoint_reader.next() == expected_slices[0]
actual_state = checkpoint_reader.get_checkpoint()
assert actual_state == expected_stream_state
assert checkpoint_reader.next() == expected_slices[1]
assert checkpoint_reader.next() == expected_slices[2]
assert checkpoint_reader.next() == expected_slices[3]
finished = checkpoint_reader.next()
assert finished is None

# A finished checkpoint_reader should return None for the final checkpoint to avoid emitting duplicate state
assert checkpoint_reader.get_checkpoint() is None


def test_cursor_based_checkpoint_reader_skip_completed_parent_slices():
expected_slices = [
StreamSlice(cursor_slice={"next_page_token": 2}, partition={"parent_id": "bolin"}),
StreamSlice(cursor_slice={"next_page_token": 3}, partition={"parent_id": "bolin"}),
StreamSlice(cursor_slice={"next_page_token": 7}, partition={"parent_id": "pabu"}),
StreamSlice(cursor_slice={"next_page_token": 8}, partition={"parent_id": "pabu"}),
]

expected_stream_state = {"next_page_token": 2}

rfr_cursor = Mock()
rfr_cursor.stream_slices.return_value = [
StreamSlice(cursor_slice={}, partition={"parent_id": "korra"}),
StreamSlice(cursor_slice={}, partition={"parent_id": "mako"}),
StreamSlice(cursor_slice={}, partition={"parent_id": "bolin"}),
StreamSlice(cursor_slice={}, partition={"parent_id": "asami"}),
StreamSlice(cursor_slice={}, partition={"parent_id": "naga"}),
StreamSlice(cursor_slice={}, partition={"parent_id": "pabu"}),
]
rfr_cursor.select_state.side_effect = [
{"__ab_full_refresh_sync_complete": True},
{"__ab_full_refresh_sync_complete": True},
{"next_page_token": 2},
{"next_page_token": 3},
{"__ab_full_refresh_sync_complete": True},
{"__ab_full_refresh_sync_complete": True},
{"__ab_full_refresh_sync_complete": True},
{"next_page_token": 7},
{"next_page_token": 8},
]
rfr_cursor.get_stream_state.return_value = expected_stream_state

Expand All @@ -195,8 +241,6 @@ def test_cursor_based_checkpoint_reader_resumable_full_refresh_parents():
assert checkpoint_reader.next() == expected_slices[1]
assert checkpoint_reader.next() == expected_slices[2]
assert checkpoint_reader.next() == expected_slices[3]
assert checkpoint_reader.next() == expected_slices[4]
assert checkpoint_reader.next() == expected_slices[5]
finished = checkpoint_reader.next()
assert finished is None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_secret_value(secret_manager_client: secretmanager.SecretManagerServiceCl
return response.payload.data.decode("UTF-8")
except PermissionDenied as e:
logging.exception(
f"Permission denied while trying to access secret {secret_id}. Please write to #dev-extensibility in Airbyte Slack for help.",
f"Permission denied while trying to access secret {secret_id}. Please write to #dev-tooling in Airbyte Slack for help.",
exc_info=e,
)
raise e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,5 @@ def create_slack_message(self) -> str:
assert self.report is not None, "Report should be set when state is successful"
message += f"⏲️ Run duration: {format_duration(self.report.run_duration)}\n"
if self.state is ContextState.FAILURE:
message += "\ncc. <!subteam^S03BQLNTFNC>"
message += "\ncc. <!subteam^S077R8636CV>"
return message
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: destination
definitionId: 0eeee7fb-518f-4045-bacc-9619e31c43ea
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
dockerRepository: airbyte/destination-amazon-sqs
githubIssueLabel: destination-amazon-sqs
icon: awssqs.svg
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.1.12"
version = "0.1.13"
name = "destination-amazon-sqs"
description = "Destination implementation for Amazon Sqs."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 042ce96f-1158-4662-9543-e2ff015be97a
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
dockerRepository: airbyte/destination-astra
githubIssueLabel: destination-astra
icon: astra.svg
Expand Down
12 changes: 6 additions & 6 deletions airbyte-integrations/connectors/destination-astra/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "airbyte-destination-astra"
version = "0.1.12"
version = "0.1.13"
description = "Airbyte destination implementation for Astra DB."
authors = ["Airbyte <[email protected]>"]
license = "MIT"
Expand Down
Loading

0 comments on commit 6ae6d00

Please sign in to comment.