diff --git a/Dockerfile.agent b/Dockerfile.agent index 2284feb1c..e6732406a 100644 --- a/Dockerfile.agent +++ b/Dockerfile.agent @@ -1,35 +1,47 @@ # This file is for internal experimental purposes only. # Please do not use this file for any real-world workloads. + FROM docker.elastic.co/elastic-agent/elastic-agent:9.0.0-SNAPSHOT + USER root -# Install dependencies -RUN apt update -RUN apt install software-properties-common -y -RUN add-apt-repository ppa:deadsnakes/ppa -RUN apt install python3.11 python3.11-venv make -y - -# TEMPORARY STUFF -# I need vim to edit some fields -# Git is needed to pull connectors repo -# yq is needed to append our input to elastic-agent.yml -RUN add-apt-repository ppa:rmescandon/yq -RUN apt install vim git yq -y - -# Copy and install python agent client -# TODO: also package this with revision and everything + +# Install apt-get dependencies +RUN apt-get update && apt-get install -y \ + software-properties-common \ + vim \ + wget \ + git \ + make \ + && add-apt-repository ppa:deadsnakes/ppa \ + && apt-get update && apt-get install -y python3.11 python3.11-venv \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +# Install Go-based yq separately +RUN wget https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64 -O /usr/bin/yq && \ + chmod +x /usr/bin/yq + +# Copy project files COPY ./ /usr/share/connectors + +# Set working directory WORKDIR /usr/share/connectors + +# Install Python agent client RUN PYTHON=python3.11 make clean install install-agent -# Add component -# Agent directory name is dynamic and based on build hash, so we need to move in two steps +# Copy and move the component files into the dynamic agent directory COPY ./resources/agent/python-elastic-agent-client /tmp/python-elastic-agent-client COPY ./resources/agent/python-elastic-agent-client.spec.yml /tmp/python-elastic-agent-client.spec.yml -RUN mv /tmp/python-elastic-agent-client /usr/share/elastic-agent/data/elastic-agent-$(cat /usr/share/elastic-agent/.build_hash.txt| cut -c 1-6)/components/python-elastic-agent-client -RUN mv /tmp/python-elastic-agent-client.spec.yml /usr/share/elastic-agent/data/elastic-agent-$(cat /usr/share/elastic-agent/.build_hash.txt| cut -c 1-6)/components/python-elastic-agent-client.spec.yml -# add input to the elastic-agent.yml -RUN yq eval --inplace '.inputs += { "type": "connectors-py", "id": "connectors-py", "use_output": "default"}' /usr/share/elastic-agent/elastic-agent.yml +RUN BUILD_DIR=$(cat /usr/share/elastic-agent/.build_hash.txt | cut -c 1-6) && \ + mv /tmp/python-elastic-agent-client \ + /usr/share/elastic-agent/data/elastic-agent-${BUILD_DIR}/components/python-elastic-agent-client && \ + mv /tmp/python-elastic-agent-client.spec.yml \ + /usr/share/elastic-agent/data/elastic-agent-${BUILD_DIR}/components/python-elastic-agent-client.spec.yml -WORKDIR /usr/share/elastic-agent +# Modify the elastic-agent.yml file +RUN yq eval --inplace '.inputs += { "type": "connectors-py", "id": "connectors-py", "use_output": "default"}' \ + /usr/share/elastic-agent/elastic-agent.yml +# Set the final working directory +WORKDIR /usr/share/elastic-agent diff --git a/connectors/agent/connector_record_manager.py b/connectors/agent/connector_record_manager.py index 331a95fba..686025ecc 100644 --- a/connectors/agent/connector_record_manager.py +++ b/connectors/agent/connector_record_manager.py @@ -56,6 +56,7 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None connector_id=connector_id, service_type=service_type, connector_name=connector_name, + is_native=True, ) logger.info(f"Created connector record for {connector_id}") except Exception as e: diff --git a/connectors/agent/protocol.py b/connectors/agent/protocol.py index 726f1c73a..b136691cc 100644 --- a/connectors/agent/protocol.py +++ b/connectors/agent/protocol.py @@ -125,7 +125,16 @@ def _extract_unit_config_value(unit, field_name): connector_name = _extract_unit_config_value( connector_input, "connector_name" ) - connector_id = _extract_unit_config_value(connector_input, "id") + + connector_id = _extract_unit_config_value( + connector_input, "connector_id" + ) + # If "connector_id" is not explicitly provided as a policy parameter, + # use the "id" from the fleet policy as a fallback for the connector ID. + # The connector ID must be encoded in the policy to associate the integration + # with the connector being managed by the policy. + if not connector_id: + connector_id = _extract_unit_config_value(connector_input, "id") logger.info( f"Connector input found. Service type: {service_type}, Connector ID: {connector_id}, Connector Name: {connector_name}" diff --git a/connectors/es/index.py b/connectors/es/index.py index 8ee5c7efb..4e6b753a2 100644 --- a/connectors/es/index.py +++ b/connectors/es/index.py @@ -99,7 +99,7 @@ async def connector_check_in(self, connector_id): ) async def connector_put( - self, connector_id, service_type, connector_name, index_name + self, connector_id, service_type, connector_name, index_name, is_native ): return await self._retrier.execute_with_retry( partial( @@ -108,6 +108,7 @@ async def connector_put( service_type=service_type, name=connector_name, index_name=index_name, + is_native=is_native, ) ) diff --git a/connectors/protocol/connectors.py b/connectors/protocol/connectors.py index 4bff9d39c..a71434a7d 100644 --- a/connectors/protocol/connectors.py +++ b/connectors/protocol/connectors.py @@ -164,13 +164,19 @@ async def heartbeat(self, doc_id): await self.update(doc_id=doc_id, doc={"last_seen": iso_utc()}) async def connector_put( - self, connector_id, service_type, connector_name=None, index_name=None + self, + connector_id, + service_type, + connector_name=None, + index_name=None, + is_native=False, ): await self.api.connector_put( connector_id=connector_id, service_type=service_type, connector_name=connector_name, index_name=index_name, + is_native=is_native, ) async def connector_exists(self, connector_id): @@ -231,7 +237,6 @@ async def supported_connectors(self, native_service_types=None, connector_ids=No custom_connectors_query = { "bool": { "filter": [ - {"term": {"is_native": False}}, {"terms": {"_id": connector_ids}}, ] } diff --git a/connectors/source.py b/connectors/source.py index 2dd1b204b..b714e6387 100644 --- a/connectors/source.py +++ b/connectors/source.py @@ -384,7 +384,7 @@ class BaseDataSource: advanced_rules_enabled = False dls_enabled = False incremental_sync_enabled = False - native_connector_api_keys_enabled = True + native_connector_api_keys_enabled = False def __init__(self, configuration): # Initialize to the global logger diff --git a/connectors/sync_job_runner.py b/connectors/sync_job_runner.py index cd0a19fb4..9e7015e5d 100644 --- a/connectors/sync_job_runner.py +++ b/connectors/sync_job_runner.py @@ -10,13 +10,11 @@ from elasticsearch import ( AuthorizationException as ElasticAuthorizationException, ) -from elasticsearch import NotFoundError as ElasticNotFoundError from connectors.config import DataSourceFrameworkConfig from connectors.es.client import License, with_concurrency_control from connectors.es.index import DocumentNotFoundError from connectors.es.license import requires_platinum_license -from connectors.es.management_client import ESManagementClient from connectors.es.sink import ( CREATES_QUEUED, DELETES_QUEUED, @@ -173,14 +171,6 @@ async def execute(self): bulk_options = self.bulk_options.copy() self.data_provider.tweak_bulk_options(bulk_options) - if ( - self.connector.native - and self.connector.features.native_connector_api_keys_enabled() - and self.service_config.get("_use_native_connector_api_keys", True) - ): - # Update the config so native connectors can use API key authentication during sync - await self._update_native_connector_authentication() - self.sync_orchestrator = SyncOrchestrator( self.es_config, self.sync_job.logger ) @@ -227,33 +217,6 @@ async def execute(self): if self.data_provider is not None: await self.data_provider.close() - async def _update_native_connector_authentication(self): - """ - The connector secrets API endpoint can only be accessed by the Enterprise Search system role, - so we need to use a client initialised with the config's username and password to first fetch - the API key for Elastic managed connectors. - After that, we can provide the API key to the sync orchestrator to initialise a new client - so that an API key can be used for the sync. - This function should not be run for connector clients. - """ - es_management_client = ESManagementClient(self.es_config) - try: - self.sync_job.log_debug( - f"Checking secrets storage for API key for index [{self.connector.index_name}]..." - ) - api_key = await es_management_client.get_connector_secret( - self.connector.api_key_secret_id - ) - self.sync_job.log_debug( - f"API key found in secrets storage for index [{self.connector.index_name}], will use this for authorization." - ) - self.es_config["api_key"] = api_key - except ElasticNotFoundError as e: - msg = f"API key not found in secrets storage for index [{self.connector.index_name}]." - raise ApiKeyNotFoundError(msg) from e - finally: - await es_management_client.close() - def _data_source_framework_config(self): builder = DataSourceFrameworkConfig.Builder().with_max_file_size( self.service_config.get("max_file_download_size") diff --git a/tests/agent/test_connector_record_manager.py b/tests/agent/test_connector_record_manager.py index 34818289d..9657f3943 100644 --- a/tests/agent/test_connector_record_manager.py +++ b/tests/agent/test_connector_record_manager.py @@ -54,6 +54,7 @@ async def test_ensure_connector_records_exist_creates_connectors_if_not_exist( connector_id="1", service_type="service1", connector_name=f"[Elastic-managed] service1 connector {random_connector_name_id}", + is_native=True, ) diff --git a/tests/test_sync_job_runner.py b/tests/test_sync_job_runner.py index 2948e34d2..5537966cd 100644 --- a/tests/test_sync_job_runner.py +++ b/tests/test_sync_job_runner.py @@ -7,15 +7,9 @@ from unittest.mock import ANY, AsyncMock, Mock, patch import pytest -from elasticsearch import ( - AuthorizationException as ElasticAuthorizationException, -) from elasticsearch import ( ConflictError, ) -from elasticsearch import ( - NotFoundError as ElasticNotFoundError, -) from connectors.es.client import License from connectors.es.index import DocumentNotFoundError @@ -24,7 +18,6 @@ from connectors.protocol.connectors import ProtocolError from connectors.source import BaseDataSource from connectors.sync_job_runner import ( - ApiKeyNotFoundError, SyncJobRunner, SyncJobStartError, ) @@ -44,7 +37,7 @@ def mock_connector(): connector.last_sync_status = JobStatus.COMPLETED connector.features.sync_rules_enabled.return_value = True connector.features.incremental_sync_enabled.return_value = True - connector.features.native_connector_api_keys_enabled.return_value = True + connector.features.native_connector_api_keys_enabled.return_value = False connector.sync_cursor = SYNC_CURSOR connector.document_count = AsyncMock(return_value=TOTAL_DOCUMENT_COUNT) connector.sync_starts = AsyncMock(return_value=True) @@ -147,21 +140,6 @@ def sync_orchestrator_mock(): yield sync_orchestrator_mock -@pytest.fixture(autouse=True) -def es_management_client_mock(): - with patch( - "connectors.sync_job_runner.ESManagementClient" - ) as es_management_client_klass_mock: - es_management_client_mock = Mock() - es_management_client_mock.get_connector_secret = AsyncMock( - return_value="my-secret" - ) - es_management_client_mock.close = AsyncMock() - es_management_client_klass_mock.return_value = es_management_client_mock - - yield es_management_client_mock - - def create_runner_yielding_docs(docs=None): if docs is None: docs = [] @@ -954,284 +932,6 @@ async def test_unsupported_job_type(): await sync_job_runner.execute() -@pytest.mark.parametrize( - "job_type, sync_cursor", - [ - (JobType.FULL, SYNC_CURSOR), - (JobType.INCREMENTAL, SYNC_CURSOR), - (JobType.ACCESS_CONTROL, None), - ], -) -@pytest.mark.asyncio -async def test_native_connector_sync_fails_when_api_key_secret_missing( - job_type, sync_cursor, sync_orchestrator_mock, es_management_client_mock -): - ingestion_stats = { - "indexed_document_count": 0, - "indexed_document_volume": 0, - "deleted_document_count": 0, - "total_document_count": TOTAL_DOCUMENT_COUNT, - } - sync_orchestrator_mock.ingestion_stats.return_value = ingestion_stats - es_management_client_mock.get_connector_secret.side_effect = ElasticNotFoundError( - message="not found", meta=None, body={} - ) - - sync_job_runner = create_runner(job_type=job_type, sync_cursor=sync_cursor) - - await sync_job_runner.execute() - - sync_job_runner.sync_job.claim.assert_awaited() - sync_job_runner.sync_job.fail.assert_awaited_with( - ANY, ingestion_stats=ingestion_stats - ) - sync_job_runner.sync_job.done.assert_not_awaited() - sync_job_runner.sync_job.cancel.assert_not_awaited() - sync_job_runner.sync_job.suspend.assert_not_awaited() - - assert sync_job_runner.sync_orchestrator is None - - sync_job_runner.connector.sync_starts.assert_awaited_with(job_type) - sync_job_runner.connector.sync_done.assert_awaited_with( - sync_job_runner.sync_job, cursor=sync_cursor - ) - - -@patch( - "connectors.sync_job_runner.SyncJobRunner._update_native_connector_authentication", - AsyncMock(), -) -@pytest.mark.parametrize( - "job_type, sync_cursor", - [ - (JobType.FULL, SYNC_CURSOR), - (JobType.INCREMENTAL, SYNC_CURSOR), - (JobType.ACCESS_CONTROL, None), - ], -) -@pytest.mark.asyncio -async def test_native_sync_runs_with_secrets_disabled_when_no_permissions( - job_type, sync_cursor, sync_orchestrator_mock -): - ingestion_stats = { - "indexed_document_count": 25, - "indexed_document_volume": 30, - "deleted_document_count": 20, - } - sync_orchestrator_mock.ingestion_stats.return_value = ingestion_stats - - sync_job_runner = create_runner( - job_type=job_type, - sync_cursor=sync_cursor, - service_config={"_use_native_connector_api_keys": False}, - ) - - error_meta = Mock() - error_meta.status = 403 - sync_job_runner._update_native_connector_authentication.side_effect = ( - ElasticAuthorizationException(message=None, meta=error_meta, body={}) - ) - - await sync_job_runner.execute() - - ingestion_stats["total_document_count"] = TOTAL_DOCUMENT_COUNT - - sync_job_runner.connector.sync_starts.assert_awaited_with(job_type) - sync_job_runner.sync_job.claim.assert_awaited() - sync_job_runner._update_native_connector_authentication.assert_not_awaited() - sync_job_runner.sync_orchestrator.async_bulk.assert_awaited() - sync_job_runner.sync_job.done.assert_awaited_with(ingestion_stats=ingestion_stats) - sync_job_runner.sync_job.fail.assert_not_awaited() - sync_job_runner.sync_job.cancel.assert_not_awaited() - sync_job_runner.sync_job.suspend.assert_not_awaited() - sync_job_runner.connector.sync_done.assert_awaited_with( - sync_job_runner.sync_job, cursor=sync_cursor - ) - sync_job_runner.sync_orchestrator.cancel.assert_called_once() - - -@patch( - "connectors.sync_job_runner.SyncJobRunner._update_native_connector_authentication", - AsyncMock(), -) -@pytest.mark.parametrize( - "job_type, sync_cursor", - [ - (JobType.FULL, SYNC_CURSOR), - (JobType.INCREMENTAL, SYNC_CURSOR), - (JobType.ACCESS_CONTROL, None), - ], -) -@pytest.mark.asyncio -async def test_native_sync_fails_with_secrets_enabled_when_no_permissions( - job_type, sync_cursor, sync_orchestrator_mock, es_management_client_mock -): - expected_error = f"Connector is not authorized to access index [{SEARCH_INDEX_NAME}]. API key may need to be regenerated. Status code: [403]." - ingestion_stats = { - "indexed_document_count": 0, - "indexed_document_volume": 0, - "deleted_document_count": 0, - "total_document_count": TOTAL_DOCUMENT_COUNT, - } - sync_orchestrator_mock.ingestion_stats.return_value = ingestion_stats - - sync_job_runner = create_runner( - job_type=job_type, - sync_cursor=sync_cursor, - ) - - error_meta = Mock() - error_meta.status = 403 - sync_job_runner._update_native_connector_authentication.side_effect = ( - ElasticAuthorizationException(message=None, meta=error_meta, body={}) - ) - - await sync_job_runner.execute() - - sync_job_runner.sync_job.claim.assert_awaited() - sync_job_runner._update_native_connector_authentication.assert_awaited() - sync_job_runner.sync_job.fail.assert_awaited_with( - expected_error, ingestion_stats=ingestion_stats - ) - sync_job_runner.sync_job.done.assert_not_awaited() - sync_job_runner.sync_job.cancel.assert_not_awaited() - sync_job_runner.sync_job.suspend.assert_not_awaited() - - sync_job_runner.connector.sync_starts.assert_awaited_with(job_type) - sync_job_runner.connector.sync_done.assert_awaited_with( - sync_job_runner.sync_job, cursor=sync_cursor - ) - - -@pytest.mark.parametrize( - "job_type, sync_cursor", - [ - (JobType.FULL, SYNC_CURSOR), - (JobType.INCREMENTAL, SYNC_CURSOR), - (JobType.ACCESS_CONTROL, None), - ], -) -@pytest.mark.asyncio -async def test_connector_client_sync_succeeds_when_api_key_secret_missing( - job_type, sync_cursor, sync_orchestrator_mock -): - connector = mock_connector() - connector.native = False - - ingestion_stats = { - "indexed_document_count": 25, - "indexed_document_volume": 30, - "deleted_document_count": 20, - } - sync_orchestrator_mock.ingestion_stats.return_value = ingestion_stats - sync_orchestrator_mock.update_authorization = AsyncMock( - side_effect=ApiKeyNotFoundError() - ) - - sync_job_runner = create_runner( - job_type=job_type, connector=connector, sync_cursor=sync_cursor - ) - await sync_job_runner.execute() - - ingestion_stats["total_document_count"] = TOTAL_DOCUMENT_COUNT - - sync_job_runner.connector.sync_starts.assert_awaited_with(job_type) - sync_job_runner.sync_job.claim.assert_awaited() - sync_job_runner.sync_orchestrator.async_bulk.assert_awaited() - sync_job_runner.sync_job.done.assert_awaited_with(ingestion_stats=ingestion_stats) - sync_job_runner.sync_job.fail.assert_not_awaited() - sync_job_runner.sync_job.cancel.assert_not_awaited() - sync_job_runner.sync_job.suspend.assert_not_awaited() - sync_job_runner.connector.sync_done.assert_awaited_with( - sync_job_runner.sync_job, cursor=sync_cursor - ) - - -@pytest.mark.parametrize( - "job_type, sync_cursor", - [ - (JobType.FULL, SYNC_CURSOR), - (JobType.INCREMENTAL, SYNC_CURSOR), - ], -) -@pytest.mark.asyncio -async def test_native_connector_content_sync_fails_when_api_key_invalid( - job_type, sync_cursor, sync_orchestrator_mock -): - expected_error = f"Connector is not authorized to access index [{SEARCH_INDEX_NAME}]. API key may need to be regenerated. Status code: [403]." - ingestion_stats = { - "indexed_document_count": 0, - "indexed_document_volume": 0, - "deleted_document_count": 0, - "total_document_count": TOTAL_DOCUMENT_COUNT, - } - sync_orchestrator_mock.ingestion_stats.return_value = ingestion_stats - - error_meta = Mock() - error_meta.status = 403 - sync_orchestrator_mock.prepare_content_index.side_effect = ( - ElasticAuthorizationException(message=None, meta=error_meta, body={}) - ) - - sync_job_runner = create_runner(job_type=job_type, sync_cursor=sync_cursor) - - await sync_job_runner.execute() - - sync_job_runner.sync_job.claim.assert_awaited() - sync_job_runner.sync_job.fail.assert_awaited_with( - expected_error, ingestion_stats=ingestion_stats - ) - sync_job_runner.sync_job.done.assert_not_awaited() - sync_job_runner.sync_job.cancel.assert_not_awaited() - sync_job_runner.sync_job.suspend.assert_not_awaited() - - sync_job_runner.connector.sync_starts.assert_awaited_with(job_type) - sync_job_runner.connector.sync_done.assert_awaited_with( - sync_job_runner.sync_job, cursor=sync_cursor - ) - - -@pytest.mark.asyncio -async def test_native_acl_connector_sync_fails_when_api_key_invalid( - sync_orchestrator_mock, -): - job_type = JobType.ACCESS_CONTROL - expected_error = f"Connector is not authorized to access index [{ACCESS_CONTROL_INDEX_NAME}]. API key may need to be regenerated. Status code: [403]." - - ingestion_stats = { - "indexed_document_count": 0, - "indexed_document_volume": 0, - "deleted_document_count": 0, - "total_document_count": TOTAL_DOCUMENT_COUNT, - } - sync_orchestrator_mock.ingestion_stats.return_value = ingestion_stats - - error_meta = Mock() - error_meta.status = 403 - sync_orchestrator_mock.async_bulk.side_effect = ElasticAuthorizationException( - message=None, meta=error_meta, body={} - ) - - sync_job_runner = create_runner( - job_type=job_type, index_name=ACCESS_CONTROL_INDEX_NAME, sync_cursor=None - ) - - await sync_job_runner.execute() - - sync_job_runner.sync_job.claim.assert_awaited() - sync_job_runner.sync_job.fail.assert_awaited_with( - expected_error, ingestion_stats=ingestion_stats - ) - sync_job_runner.sync_job.done.assert_not_awaited() - sync_job_runner.sync_job.cancel.assert_not_awaited() - sync_job_runner.sync_job.suspend.assert_not_awaited() - - sync_job_runner.connector.sync_starts.assert_awaited_with(job_type) - sync_job_runner.connector.sync_done.assert_awaited_with( - sync_job_runner.sync_job, cursor=None - ) - - @pytest.mark.parametrize( "sync_job_config,pipeline_config,expected_enabled,expected_log", [