Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Respect connector_id defined in agentless #2989

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 34 additions & 22 deletions Dockerfile.agent
Original file line number Diff line number Diff line change
@@ -1,35 +1,47 @@
# This file is for internal experimental purposes only.
# Please do not use this file for any real-world workloads.

FROM docker.elastic.co/elastic-agent/elastic-agent:9.0.0-SNAPSHOT

USER root
# Install dependencies
RUN apt update
RUN apt install software-properties-common -y
RUN add-apt-repository ppa:deadsnakes/ppa
RUN apt install python3.11 python3.11-venv make -y

# TEMPORARY STUFF
# I need vim to edit some fields
# Git is needed to pull connectors repo
# yq is needed to append our input to elastic-agent.yml
RUN add-apt-repository ppa:rmescandon/yq
RUN apt install vim git yq -y

# Copy and install python agent client
# TODO: also package this with revision and everything

# Install apt-get dependencies
RUN apt-get update && apt-get install -y \
software-properties-common \
vim \
wget \
git \
make \
&& add-apt-repository ppa:deadsnakes/ppa \
&& apt-get update && apt-get install -y python3.11 python3.11-venv \
&& apt-get clean && rm -rf /var/lib/apt/lists/*

# Install Go-based yq separately
RUN wget https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64 -O /usr/bin/yq && \
chmod +x /usr/bin/yq

# Copy project files
COPY ./ /usr/share/connectors

# Set working directory
WORKDIR /usr/share/connectors

# Install Python agent client
RUN PYTHON=python3.11 make clean install install-agent

# Add component
# Agent directory name is dynamic and based on build hash, so we need to move in two steps
# Copy and move the component files into the dynamic agent directory
COPY ./resources/agent/python-elastic-agent-client /tmp/python-elastic-agent-client
COPY ./resources/agent/python-elastic-agent-client.spec.yml /tmp/python-elastic-agent-client.spec.yml
RUN mv /tmp/python-elastic-agent-client /usr/share/elastic-agent/data/elastic-agent-$(cat /usr/share/elastic-agent/.build_hash.txt| cut -c 1-6)/components/python-elastic-agent-client
RUN mv /tmp/python-elastic-agent-client.spec.yml /usr/share/elastic-agent/data/elastic-agent-$(cat /usr/share/elastic-agent/.build_hash.txt| cut -c 1-6)/components/python-elastic-agent-client.spec.yml

# add input to the elastic-agent.yml
RUN yq eval --inplace '.inputs += { "type": "connectors-py", "id": "connectors-py", "use_output": "default"}' /usr/share/elastic-agent/elastic-agent.yml
RUN BUILD_DIR=$(cat /usr/share/elastic-agent/.build_hash.txt | cut -c 1-6) && \
mv /tmp/python-elastic-agent-client \
/usr/share/elastic-agent/data/elastic-agent-${BUILD_DIR}/components/python-elastic-agent-client && \
mv /tmp/python-elastic-agent-client.spec.yml \
/usr/share/elastic-agent/data/elastic-agent-${BUILD_DIR}/components/python-elastic-agent-client.spec.yml

WORKDIR /usr/share/elastic-agent
# Modify the elastic-agent.yml file
RUN yq eval --inplace '.inputs += { "type": "connectors-py", "id": "connectors-py", "use_output": "default"}' \
/usr/share/elastic-agent/elastic-agent.yml

# Set the final working directory
WORKDIR /usr/share/elastic-agent
1 change: 1 addition & 0 deletions connectors/agent/connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None
connector_id=connector_id,
service_type=service_type,
connector_name=connector_name,
is_native=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we've settled on is_native flag reusage in the end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I understood from the "mega-thread" we've had last week, will DM you the link

)
logger.info(f"Created connector record for {connector_id}")
except Exception as e:
Expand Down
11 changes: 10 additions & 1 deletion connectors/agent/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,16 @@ def _extract_unit_config_value(unit, field_name):
connector_name = _extract_unit_config_value(
connector_input, "connector_name"
)
connector_id = _extract_unit_config_value(connector_input, "id")

connector_id = _extract_unit_config_value(
connector_input, "connector_id"
)
# If "connector_id" is not explicitly provided as a policy parameter,
# use the "id" from the fleet policy as a fallback for the connector ID.
# The connector ID must be encoded in the policy to associate the integration
# with the connector being managed by the policy.
if not connector_id:
connector_id = _extract_unit_config_value(connector_input, "id")

logger.info(
f"Connector input found. Service type: {service_type}, Connector ID: {connector_id}, Connector Name: {connector_name}"
Expand Down
3 changes: 2 additions & 1 deletion connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def connector_check_in(self, connector_id):
)

async def connector_put(
self, connector_id, service_type, connector_name, index_name
self, connector_id, service_type, connector_name, index_name, is_native
):
return await self._retrier.execute_with_retry(
partial(
Expand All @@ -108,6 +108,7 @@ async def connector_put(
service_type=service_type,
name=connector_name,
index_name=index_name,
is_native=is_native,
)
)

Expand Down
9 changes: 7 additions & 2 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,19 @@ async def heartbeat(self, doc_id):
await self.update(doc_id=doc_id, doc={"last_seen": iso_utc()})

async def connector_put(
self, connector_id, service_type, connector_name=None, index_name=None
self,
connector_id,
service_type,
connector_name=None,
index_name=None,
is_native=False,
):
await self.api.connector_put(
connector_id=connector_id,
service_type=service_type,
connector_name=connector_name,
index_name=index_name,
is_native=is_native,
)

async def connector_exists(self, connector_id):
Expand Down Expand Up @@ -231,7 +237,6 @@ async def supported_connectors(self, native_service_types=None, connector_ids=No
custom_connectors_query = {
"bool": {
"filter": [
{"term": {"is_native": False}},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we treat all connectors the same (discover by ID), remove this filter

{"terms": {"_id": connector_ids}},
]
}
Expand Down
2 changes: 1 addition & 1 deletion connectors/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class BaseDataSource:
advanced_rules_enabled = False
dls_enabled = False
incremental_sync_enabled = False
native_connector_api_keys_enabled = True
native_connector_api_keys_enabled = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is old setting to enable native connector secrets for native connectors. Since we no longer rely on secrets for elastic managed connectors I just set this flag to false without thinking worrying about proper cleanup, I've filed followup ticket:
#2990

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it'll break connectors running in native mode in traditional setup in 9.x, but we don't care since it's something nobody's supposed to do, right?

Copy link
Member Author

@jedrazb jedrazb Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The native connectors would be broken anyway since only service account that manages the ent-search node has access to read the secrets https://github.com/elastic/elasticsearch/blob/8c20ac5884158b88fdd598e422db632e1734aabb/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/service/ElasticServiceAccounts.java#L48

After migration to 9.0 those secrets are useless because noone has permissions to read them


def __init__(self, configuration):
# Initialize to the global logger
Expand Down
37 changes: 0 additions & 37 deletions connectors/sync_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
from elasticsearch import (
AuthorizationException as ElasticAuthorizationException,
)
from elasticsearch import NotFoundError as ElasticNotFoundError

from connectors.config import DataSourceFrameworkConfig
from connectors.es.client import License, with_concurrency_control
from connectors.es.index import DocumentNotFoundError
from connectors.es.license import requires_platinum_license
from connectors.es.management_client import ESManagementClient
from connectors.es.sink import (
CREATES_QUEUED,
DELETES_QUEUED,
Expand Down Expand Up @@ -173,14 +171,6 @@ async def execute(self):
bulk_options = self.bulk_options.copy()
self.data_provider.tweak_bulk_options(bulk_options)

if (
self.connector.native
and self.connector.features.native_connector_api_keys_enabled()
and self.service_config.get("_use_native_connector_api_keys", True)
):
# Update the config so native connectors can use API key authentication during sync
await self._update_native_connector_authentication()

self.sync_orchestrator = SyncOrchestrator(
self.es_config, self.sync_job.logger
)
Expand Down Expand Up @@ -227,33 +217,6 @@ async def execute(self):
if self.data_provider is not None:
await self.data_provider.close()

async def _update_native_connector_authentication(self):
"""
The connector secrets API endpoint can only be accessed by the Enterprise Search system role,
so we need to use a client initialised with the config's username and password to first fetch
the API key for Elastic managed connectors.
After that, we can provide the API key to the sync orchestrator to initialise a new client
so that an API key can be used for the sync.
This function should not be run for connector clients.
"""
es_management_client = ESManagementClient(self.es_config)
try:
self.sync_job.log_debug(
f"Checking secrets storage for API key for index [{self.connector.index_name}]..."
)
api_key = await es_management_client.get_connector_secret(
self.connector.api_key_secret_id
)
self.sync_job.log_debug(
f"API key found in secrets storage for index [{self.connector.index_name}], will use this for authorization."
)
self.es_config["api_key"] = api_key
except ElasticNotFoundError as e:
msg = f"API key not found in secrets storage for index [{self.connector.index_name}]."
raise ApiKeyNotFoundError(msg) from e
finally:
await es_management_client.close()

def _data_source_framework_config(self):
builder = DataSourceFrameworkConfig.Builder().with_max_file_size(
self.service_config.get("max_file_download_size")
Expand Down
1 change: 1 addition & 0 deletions tests/agent/test_connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async def test_ensure_connector_records_exist_creates_connectors_if_not_exist(
connector_id="1",
service_type="service1",
connector_name=f"[Elastic-managed] service1 connector {random_connector_name_id}",
is_native=True,
)


Expand Down
Loading
Loading