Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create connector record in agent component #2861

Merged
merged 7 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion connectors/agent/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ async def run(self):
action_handler = ConnectorActionHandler()
self.connector_service_manager = ConnectorServiceManager(self.config_wrapper)
checkin_handler = ConnectorCheckinHandler(
client, self.config_wrapper, self.connector_service_manager
client,
self.config_wrapper,
self.connector_service_manager,
)

self.multi_service = MultiService(
Expand Down
86 changes: 50 additions & 36 deletions connectors/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,57 +24,30 @@ class ConnectorsAgentConfigurationWrapper:
def __init__(self):
"""Inits the class.

There's default config that allows us to run connectors natively (see _force_allow_native flag),
when final configuration is reported these defaults will be merged with defaults from Connectors
Service config and specific config coming from Agent.
There's default config that allows us to run connectors service. When final
configuration is reported these defaults will be merged with defaults from
Connectors Service config and specific config coming from Agent.
"""
self._default_config = {
"service": {
"log_level": "INFO",
"_use_native_connector_api_keys": False,
},
"_force_allow_native": True,
"native_service_types": [
"azure_blob_storage",
"box",
"confluence",
"dropbox",
"github",
"gmail",
"google_cloud_storage",
"google_drive",
"jira",
"mongodb",
"mssql",
"mysql",
"notion",
"onedrive",
"oracle",
"outlook",
"network_drive",
"postgresql",
"s3",
"salesforce",
"servicenow",
"sharepoint_online",
"slack",
"microsoft_teams",
"zoom",
],
"connectors": [],
}

self.specific_config = {}

def try_update(self, unit):
def try_update(self, connector_id, service_type, output_unit):
"""Try update the configuration and see if it changed.

This method takes the check-in event coming from Agent and checks if config needs an update.
This method takes the check-in event data (connector_id, service_type and output) coming
from Agent and checks if config needs an update.

If update is needed, configuration is updated and method returns True. If no update is needed
the method returns False.
"""

source = unit.config.source
source = output_unit.config.source

# TODO: find a good link to what this object is.
has_hosts = source.fields.get("hosts")
Expand All @@ -83,9 +56,17 @@ def try_update(self, unit):

assumed_configuration = {}

# Connector-related
assumed_configuration["connectors"] = [
{
"connector_id": connector_id,
"service_type": service_type,
}
]

# Log-related
assumed_configuration["service"] = {}
assumed_configuration["service"]["log_level"] = unit.log_level
assumed_configuration["service"]["log_level"] = output_unit.log_level

# Auth-related
if has_hosts and (has_api_key or has_basic_auth):
Expand Down Expand Up @@ -154,6 +135,32 @@ def _elasticsearch_config_changed():
"elasticsearch"
)

def _connectors_config_changes():
current_connectors = current_config.get("connectors", [])
new_connectors = new_config.get("connectors", [])

if len(current_connectors) != len(new_connectors):
return True

current_connectors_dict = {
connector["connector_id"]: connector for connector in current_connectors
}
new_connectors_dict = {
connector["connector_id"]: connector for connector in new_connectors
}

if set(current_connectors_dict.keys()) != set(new_connectors_dict.keys()):
return True

for connector_id in current_connectors_dict:
current_connector = current_connectors_dict[connector_id]
new_connector = new_connectors_dict[connector_id]

if current_connector != new_connector:
return True

return False

if _log_level_changed():
logger.debug("log_level changed")
return True
Expand All @@ -162,6 +169,10 @@ def _elasticsearch_config_changed():
logger.debug("elasticsearch changed")
return True

if _connectors_config_changes():
logger.debug("connectors changed")
return True

return False

def get(self):
Expand All @@ -182,3 +193,6 @@ def get(self):
configuration = dict(add_defaults(config))

return configuration

def get_specific_config(self):
return self.specific_config
104 changes: 104 additions & 0 deletions connectors/agent/connector_record_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import secrets
import string

from connectors.agent.logger import get_logger
from connectors.es.index import DocumentNotFoundError
from connectors.protocol import ConnectorIndex

logger = get_logger("agent_connector_record_manager")


class ConnectorRecordManager:
"""
Manages connector records in Elasticsearch, ensuring that connectors tied to agent components
exist in the connector index. It creates the connector record if necessary.
"""

def __init__(self):
self.connector_index = None

async def ensure_connector_records_exist(self, agent_config, connector_name=None):
"""
Ensure that connector records exist for all connectors specified in the agent configuration.

If the connector record with a given ID doesn't exist, create a new one.
"""

if not self._agent_config_ready(agent_config):
jedrazb marked this conversation as resolved.
Show resolved Hide resolved
return

# Initialize the ES client if it's not already initialized
if not self.connector_index:
self.connector_index = ConnectorIndex(agent_config.get("elasticsearch"))
Comment on lines +36 to +37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it a cached property instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, we don't have access to agent_config when instantiating ConnectorRecordManager, the ensure_connector_records_exist is called from checking handler with up to date config, so cached_property can't be used.


for connector_config in self._get_connectors(agent_config):
connector_id, service_type = (
connector_config["connector_id"],
connector_config["service_type"],
)

if not connector_name:
random_connector_name_id = self._generate_random_connector_name_id(
jedrazb marked this conversation as resolved.
Show resolved Hide resolved
length=4
)
connector_name = f"[Elastic-managed] {service_type} connector {random_connector_name_id}"

if not await self._connector_exists(connector_id):
try:
await self.connector_index.connector_put(
connector_id=connector_id,
service_type=service_type,
connector_name=connector_name,
)
logger.info(f"Created connector record for {connector_id}")
Comment on lines +53 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel on also filling up RCFs here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be nice especially on serverless as afaik it would take a bit of time until this gets autopopulated, will file an enhancement issue to track it 👍 we might want to test this thoroughly with new connector api _configuration endpoint

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except Exception as e:
logger.error(
f"Failed to create connector record for {connector_id}: {e}"
)
raise e

def _agent_config_ready(self, agent_config):
"""
Validates the agent configuration to check if all info is present to create a connector record.
"""
connectors = agent_config.get("connectors")
if connectors is None or len(connectors) == 0:
return False

for connector in connectors:
if "connector_id" not in connector or "service_type" not in connector:
return False

elasticsearch_config = agent_config.get("elasticsearch")
if not elasticsearch_config:
return False

if "host" not in elasticsearch_config or "api_key" not in elasticsearch_config:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be checking for cloud_id instead of api_key?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe in addition?
We must have either host or cloud_id, and we MUST have api_key. Right?

Copy link
Member Author

@jedrazb jedrazb Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are checking elasticsearch config as seen by the connector service.

cloud_id is a first for me, I don't see a single reference to it in the repo https://github.com/search?q=repo%3Aelastic%2Fconnectors%20cloud_id&type=code (also in the example config I don't see it)

Also, agent doesn't specify the cloud_id anywhere https://www.elastic.co/guide/en/fleet/current/elasticsearch-output.html

Can you explain what do you mean with cloud id? I don't think we must check for it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud ID is present for all cloud deployments and serverless projects.

Screenshot 2024-10-04 at 2 47 59 PM

And it should be the defacto way to connect to Elasticsearch, see: https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/connecting.html#connect-ec

When connecting to Elastic Cloud with the Python Elasticsearch client you should always use the cloud_id parameter to connect.

I'm surprised we don't support an elasticsearch.cloud_id in our YAML config, IDK how I never noticed that was missing. We should fix that, but I suppose it's a problem for another day. We can disregard it for this PR I suppose, and leave the logic as is, but understand now you're checking to make sure that both host and api key are provided.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for explaining. I had no idea we could use cloud id for connection. I will file an issue to support the cloud id in the config YML!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return False

return True

async def _connector_exists(self, connector_id):
try:
doc = await self.connector_index.fetch_by_id(connector_id)
return doc is not None
except DocumentNotFoundError:
return False
except Exception as e:
logger.error(
f"Error while checking existence of connector '{connector_id}': {e}"
)
raise e
jedrazb marked this conversation as resolved.
Show resolved Hide resolved

def _get_connectors(self, agent_config):
return agent_config.get("connectors")
jedrazb marked this conversation as resolved.
Show resolved Hide resolved

def _generate_random_connector_name_id(self, length=4):
return "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(length)
)
jedrazb marked this conversation as resolved.
Show resolved Hide resolved
100 changes: 85 additions & 15 deletions connectors/agent/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
from elastic_agent_client.handler.action import BaseActionHandler
from elastic_agent_client.handler.checkin import BaseCheckinHandler

from connectors.agent.connector_record_manager import ConnectorRecordManager
from connectors.agent.logger import get_logger

logger = get_logger("protocol")


CONNECTORS_INPUT_TYPE = "connectors-py"
ELASTICSEARCH_OUTPUT_TYPE = "elasticsearch"


class ConnectorActionHandler(BaseActionHandler):
"""Class handling Agent actions.

Expand All @@ -38,16 +43,24 @@ class ConnectorCheckinHandler(BaseCheckinHandler):

This class reads the events, sees if there's a reported change to connector-specific settings,
tries to update the configuration and, if the configuration is updated, restarts the Connectors Service.

If the connector document with given ID doesn't exist, it creates a new one.
"""

def __init__(self, client, agent_connectors_config_wrapper, service_manager):
def __init__(
self,
client,
agent_connectors_config_wrapper,
service_manager,
):
"""Inits the class.

Initing this class should not produce side-effects.
"""
super().__init__(client)
self.agent_connectors_config_wrapper = agent_connectors_config_wrapper
self.service_manager = service_manager
self.connector_record_manager = ConnectorRecordManager()

async def apply_from_client(self):
"""Implementation of BaseCheckinHandler.apply_from_client
Expand All @@ -73,26 +86,83 @@ async def apply_from_client(self):

# Filter Elasticsearch outputs from the available outputs
elasticsearch_outputs = [
output
for output in outputs
if output.config and output.config.type == "elasticsearch"
output_unit
for output_unit in outputs
if output_unit.config
and output_unit.config.type == ELASTICSEARCH_OUTPUT_TYPE
]

if elasticsearch_outputs:
if len(elasticsearch_outputs) > 1:
inputs = [
unit
for unit in self.client.units
if unit.unit_type == proto.UnitType.INPUT
]

# Ensure only the single valid connector input is selected from the inputs
connector_inputs = [
input_unit
for input_unit in inputs
if input_unit.config.type == CONNECTORS_INPUT_TYPE
]

if connector_inputs:
if len(connector_inputs) > 1:
logger.warning(
"Multiple Elasticsearch outputs detected. The first ES output defined in the agent policy will be used."
"Multiple connector inputs detected. The first connector input defined in the agent policy will be used."
)

logger.debug("Elasticsearch outputs found.")
logger.debug("Connector input found.")

connector_input = connector_inputs[0]

def _extract_unit_config_value(unit, field_name):
field_value = unit.config.source.fields.get(field_name)
return field_value.string_value if field_value else None

configuration_changed = self.agent_connectors_config_wrapper.try_update(
elasticsearch_outputs[0]
service_type = _extract_unit_config_value(
connector_input, "service_type"
)
if configuration_changed:
logger.info(
"Connector service manager config updated. Restarting service manager."
connector_name = _extract_unit_config_value(
connector_input, "connector_name"
)
connector_id = _extract_unit_config_value(connector_input, "id")

logger.info(
f"Connector input found. Service type: {service_type}, Connector ID: {connector_id}, Connector Name: {connector_name}"
)

if elasticsearch_outputs:
if len(elasticsearch_outputs) > 1:
logger.warning(
"Multiple Elasticsearch outputs detected. The first ES output defined in the agent policy will be used."
)

logger.debug("Elasticsearch outputs found.")

elasticsearch_output = elasticsearch_outputs[0]

configuration_changed = (
self.agent_connectors_config_wrapper.try_update(
connector_id=connector_id,
service_type=service_type,
output_unit=elasticsearch_output,
)
)
self.service_manager.restart()

# After updating the configuration, ensure all connector records exist in the connector index
await self.connector_record_manager.ensure_connector_records_exist(
agent_config=self.agent_connectors_config_wrapper.get_specific_config(),
connector_name=connector_name,
)

if configuration_changed:
logger.info(
"Connector service manager config updated. Restarting service manager."
)
self.service_manager.restart()
else:
logger.debug("No changes to connectors config")
else:
logger.debug("No changes to connectors config")
logger.warning("No Elasticsearch output found")
else:
logger.warning("No connector integration input found")
Loading