Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set connector status to CONNECTED after successful validation and ping #3059

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,10 @@ async def error(self, error):
}
await self.index.update(doc_id=self.id, doc=doc)

async def connected(self):
doc = {"status": Status.CONNECTED.value, "error": None}
await self.index.update(doc_id=self.id, doc=doc)

async def sync_done(self, job, cursor=None):
job_status = JobStatus.ERROR if job is None else job.status
job_error = JOB_NOT_FOUND_ERROR if job is None else job.error
Expand Down
5 changes: 5 additions & 0 deletions connectors/services/job_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ async def _schedule(self, connector):

if connector.features.sync_rules_enabled():
await connector.validate_filtering(validator=data_source)

self.logger.info(
"Connector is configured correctly and can reach the data source"
)
await connector.connected()
except Exception as e:
connector.log_error(e, exc_info=True)
await connector.error(e)
Expand Down
12 changes: 11 additions & 1 deletion tests/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,28 @@ class AsyncIterator:
Async documents generator fake class, which records the args and kwargs it was called with.
"""

def __init__(self, items):
def __init__(self, items, reusable=False):
"""
AsyncIterator is a test-only abstraction to mock async iterables.
By default it's usable only once: once iterated over, he iterator will not
iterate again any more.
If reusable is True, then iterator can be re-used, but only if it's used by a single coroutine.
If AsyncIterator is used in several coroutines, it'll not work correctly at all
"""
self.items = items
self.call_args = []
self.call_kwargs = []
self.i = 0
self.call_count = 0
self.reusable = reusable

def __aiter__(self):
return self

async def __anext__(self):
if self.i >= len(self.items):
if self.reusable:
self.i = 0
Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see it's only for tests so not a big deal but does this open a risk for infinite looping anywhere?

raise StopAsyncIteration

item = self.items[self.i]
Expand Down
40 changes: 40 additions & 0 deletions tests/services/test_job_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def mock_connector(
connector.heartbeat = AsyncMock()
connector.reload = AsyncMock()
connector.error = AsyncMock()
connector.connected = AsyncMock()
connector.update_last_sync_scheduled_at_by_job_type = AsyncMock()

return connector
Expand Down Expand Up @@ -377,6 +378,45 @@ def _source_klass(config):
connector.error.assert_awaited_with(actual_error)


@pytest.mark.asyncio
@patch("connectors.services.job_scheduling.get_source_klass")
async def test_run_when_connector_failed_validation_then_succeeded(
get_source_klass_mock, connector_index_mock, set_env
):
error_message = "Something invalid is in config!"
actual_error = Exception(error_message)

data_source_mock = Mock()

def _source_klass(config):
return data_source_mock

def _error_once():
data_source_mock.validate_config.reset_mock(side_effect=True)
raise actual_error

get_source_klass_mock.return_value = _source_klass

data_source_mock.validate_config_fields = Mock()
data_source_mock.validate_config = AsyncMock(side_effect=_error_once)
data_source_mock.ping = AsyncMock()
data_source_mock.close = AsyncMock()

connector = mock_connector(next_sync=datetime.now(timezone.utc))
connector_index_mock.supported_connectors.return_value = AsyncIterator(
[connector], reusable=True
)
await create_and_run_service(JobSchedulingService, stop_after=0.15)

data_source_mock.validate_config_fields.assert_called()
data_source_mock.validate_config.assert_awaited()
data_source_mock.ping.assert_awaited()
data_source_mock.close.assert_awaited()

connector.error.assert_awaited_with(actual_error)
connector.connected.assert_awaited()


@pytest.mark.asyncio
@patch("connectors.services.job_scheduling.get_source_klass")
async def test_run_when_connector_ping_fails(
Expand Down
Loading