Skip to content

Commit

Permalink
Followup on #2861 (#2923)
Browse files Browse the repository at this point in the history
Co-authored-by: Artem Shelkovnikov <[email protected]>
  • Loading branch information
jedrazb and artem-shelkovnikov authored Nov 7, 2024
1 parent 60e01ad commit cb785c0
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 99 deletions.
77 changes: 39 additions & 38 deletions connectors/agent/connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import secrets
import string

from connectors.agent.logger import get_logger
from connectors.es.index import DocumentNotFoundError
from connectors.protocol import ConnectorIndex
from connectors.utils import generate_random_id

logger = get_logger("agent_connector_record_manager")

Expand All @@ -29,26 +27,30 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None
If the connector record with a given ID doesn't exist, create a new one.
"""

if not self._agent_config_ready(agent_config):
config_ready, msg = self._check_agent_config_ready(agent_config)

if not config_ready:
logger.debug(
f"Agent configuration is not ready to create a connector record. Skipping. Reason: {msg} "
)
return

# Initialize the ES client if it's not already initialized
if not self.connector_index:
self.connector_index = ConnectorIndex(agent_config.get("elasticsearch"))

for connector_config in self._get_connectors(agent_config):
for connector_config in agent_config.get("connectors"):
connector_id, service_type = (
connector_config["connector_id"],
connector_config["service_type"],
)

if not connector_name:
random_connector_name_id = self._generate_random_connector_name_id(
length=4
)
logger.debug("Connector name not provided, generating a random one.")
random_connector_name_id = generate_random_id(length=4)
connector_name = f"[Elastic-managed] {service_type} connector {random_connector_name_id}"

if not await self._connector_exists(connector_id):
if not await self.connector_index.connector_exists(connector_id):
try:
await self.connector_index.connector_put(
connector_id=connector_id,
Expand All @@ -62,43 +64,42 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None
)
raise e

def _agent_config_ready(self, agent_config):
def _check_agent_config_ready(self, agent_config):
"""
Validates the agent configuration to check if all info is present to create a connector record.
Returns:
tuple: (bool, str or None) - True and None if valid, otherwise False and an error message.
"""

connectors = agent_config.get("connectors")
if connectors is None or len(connectors) == 0:
return False
if connectors is None:
return False, "No 'connectors' key found in the service configuration."

if len(connectors) == 0:
return False, "Empty 'connectors' array found in the service configuration."

for connector in connectors:
if "connector_id" not in connector or "service_type" not in connector:
return False
if "connector_id" not in connector:
return (
False,
"No 'connector_id' key found in the connector object.",
)

if "service_type" not in connector:
return (
False,
"No 'service_type' key found in the connector object.",
)

elasticsearch_config = agent_config.get("elasticsearch")
if not elasticsearch_config:
return False

if "host" not in elasticsearch_config or "api_key" not in elasticsearch_config:
return False

return True

async def _connector_exists(self, connector_id):
try:
doc = await self.connector_index.fetch_by_id(connector_id)
return doc is not None
except DocumentNotFoundError:
return False
except Exception as e:
logger.error(
f"Error while checking existence of connector '{connector_id}': {e}"
)
raise e
return False, "No 'elasticsearch' key found in the service configuration."

if "host" not in elasticsearch_config:
return False, "No 'host' key found in the elasticsearch configuration."

def _get_connectors(self, agent_config):
return agent_config.get("connectors")
if "api_key" not in elasticsearch_config:
return False, "No 'api_key' key found in the elasticsearch configuration."

def _generate_random_connector_name_id(self, length=4):
return "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(length)
)
return True, None
13 changes: 13 additions & 0 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from connectors.es import ESDocument, ESIndex
from connectors.es.client import with_concurrency_control
from connectors.es.index import DocumentNotFoundError
from connectors.filtering.validation import (
FilteringValidationState,
InvalidFilteringError,
Expand Down Expand Up @@ -172,6 +173,18 @@ async def connector_put(
index_name=index_name,
)

async def connector_exists(self, connector_id):
try:
doc = await self.fetch_by_id(connector_id)
return doc is not None
except DocumentNotFoundError:
return False
except Exception as e:
logger.error(
f"Error while checking existence of connector '{connector_id}': {e}"
)
raise e

async def connector_update_scheduling(
self, connector_id, full=None, incremental=None, access_control=None
):
Expand Down
14 changes: 11 additions & 3 deletions connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import os
import platform
import re
import secrets
import shutil
import ssl
import string
import subprocess # noqa S404
import time
import urllib.parse
Expand Down Expand Up @@ -754,10 +756,10 @@ def truncate_id(_id):

def has_duplicates(strings_list):
seen = set()
for string in strings_list:
if string in seen:
for s in strings_list:
if s in seen:
return True
seen.add(string)
seen.add(s)
return False


Expand Down Expand Up @@ -994,3 +996,9 @@ def get(self, key) -> int:

def to_dict(self):
return deepcopy(self._storage)


def generate_random_id(length=4):
return "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(length)
)
94 changes: 36 additions & 58 deletions tests/agent/test_connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
from unittest.mock import AsyncMock, Mock, patch
from unittest.mock import AsyncMock, patch

import pytest

from connectors.agent.connector_record_manager import (
ConnectorRecordManager,
)
from connectors.es.index import DocumentNotFoundError
from connectors.protocol import ConnectorIndex


Expand All @@ -35,49 +34,53 @@ def connector_record_manager(mock_connector_index):


@pytest.mark.asyncio
@patch("connectors.protocol.ConnectorIndex", new_callable=AsyncMock)
async def test_ensure_connector_records_exist_creates_connectors_if_not_exist(
mock_connector_index, mock_agent_config
connector_record_manager, mock_agent_config
):
manager = ConnectorRecordManager()
manager.connector_index = mock_connector_index
mock_connector_index.fetch_by_id.side_effect = DocumentNotFoundError
mock_connector_index.connector_put = AsyncMock()
connector_ui_id = "1234"
manager._generate_random_connector_name_id = Mock(return_value=connector_ui_id)

await manager.ensure_connector_records_exist(mock_agent_config)
assert mock_connector_index.connector_put.call_count == 1
mock_connector_index.connector_put.assert_any_await(
connector_id="1",
service_type="service1",
connector_name=f"[Elastic-managed] service1 connector {connector_ui_id}",
)
random_connector_name_id = "1234"

with patch(
"connectors.agent.connector_record_manager.generate_random_id",
return_value=random_connector_name_id,
):
connector_record_manager.connector_index.connector_exists = AsyncMock(
return_value=False
)
connector_record_manager.connector_index.connector_put = AsyncMock()

await connector_record_manager.ensure_connector_records_exist(mock_agent_config)
assert connector_record_manager.connector_index.connector_put.call_count == 1
connector_record_manager.connector_index.connector_put.assert_any_await(
connector_id="1",
service_type="service1",
connector_name=f"[Elastic-managed] service1 connector {random_connector_name_id}",
)


@pytest.mark.asyncio
async def test_ensure_connector_records_exist_connector_already_exists(
connector_record_manager, mock_agent_config
):
connector_record_manager._connector_exists = AsyncMock(return_value=True)
connector_record_manager.connector_index.connector_exists = AsyncMock(
return_value=True
)
await connector_record_manager.ensure_connector_records_exist(mock_agent_config)
assert connector_record_manager.connector_index.connector_put.call_count == 0


@pytest.mark.asyncio
@patch("connectors.protocol.ConnectorIndex", new_callable=AsyncMock)
async def test_ensure_connector_records_raises_on_non_404_error(
mock_connector_index, mock_agent_config
connector_record_manager, mock_agent_config
):
manager = ConnectorRecordManager()
manager.connector_index = mock_connector_index
mock_connector_index.fetch_by_id.side_effect = Exception("Unexpected error")
mock_connector_index.connector_put = AsyncMock()
connector_record_manager.connector_index.connector_exists = AsyncMock(
side_effect=Exception("Unexpected error")
)
connector_record_manager.connector_index.connector_put = AsyncMock()

with pytest.raises(Exception, match="Unexpected error"):
await manager.ensure_connector_records_exist(mock_agent_config)
await connector_record_manager.ensure_connector_records_exist(mock_agent_config)

assert mock_connector_index.connector_put.call_count == 0
assert connector_record_manager.connector_index.connector_put.call_count == 0


@pytest.mark.asyncio
Expand All @@ -93,45 +96,20 @@ async def test_ensure_connector_records_exist_agent_config_not_ready(
async def test_ensure_connector_records_exist_exception_on_create(
connector_record_manager, mock_agent_config
):
connector_record_manager._connector_exists = AsyncMock(return_value=False)
connector_record_manager.connector_index.connector_exists = AsyncMock(
return_value=False
)
connector_record_manager.connector_index.connector_put = AsyncMock(
side_effect=Exception("Failed to create")
)
with pytest.raises(Exception, match="Failed to create"):
await connector_record_manager.ensure_connector_records_exist(mock_agent_config)


@pytest.mark.asyncio
async def test_connector_exists_returns_true_when_found(connector_record_manager):
connector_record_manager.connector_index.fetch_by_id = AsyncMock(
return_value={"id": "1"}
)
exists = await connector_record_manager._connector_exists("1")
assert exists is True


@pytest.mark.asyncio
async def test_connector_exists_returns_false_when_not_found(connector_record_manager):
connector_record_manager.connector_index.fetch_by_id = AsyncMock(
side_effect=DocumentNotFoundError
)
exists = await connector_record_manager._connector_exists("1")
assert exists is False


@pytest.mark.asyncio
async def test_connector_exists_raises_non_404_exception(connector_record_manager):
connector_record_manager.connector_index.fetch_by_id = AsyncMock(
side_effect=Exception("Fetch error")
)
with pytest.raises(Exception, match="Fetch error"):
await connector_record_manager._connector_exists("1")


def test_agent_config_ready_with_valid_config(
connector_record_manager, mock_agent_config
):
ready = connector_record_manager._agent_config_ready(mock_agent_config)
ready, _ = connector_record_manager._check_agent_config_ready(mock_agent_config)
assert ready is True


Expand All @@ -141,13 +119,13 @@ def test_agent_config_ready_with_invalid_config_missing_connectors(
invalid_config = {
"elasticsearch": {"host": "http://localhost:9200", "api_key": "dummy_key"}
}
ready = connector_record_manager._agent_config_ready(invalid_config)
ready, _ = connector_record_manager._check_agent_config_ready(invalid_config)
assert ready is False


def test_agent_config_ready_with_invalid_config_missing_elasticsearch(
connector_record_manager,
):
invalid_config = {"connectors": [{"connector_id": "1", "service_type": "service1"}]}
ready = connector_record_manager._agent_config_ready(invalid_config)
ready, _ = connector_record_manager._check_agent_config_ready(invalid_config)
assert ready is False
Loading

0 comments on commit cb785c0

Please sign in to comment.