diff --git a/src/agent/agent.c b/src/agent/agent.c index 5c6725c06c..51ee5e0f92 100644 --- a/src/agent/agent.c +++ b/src/agent/agent.c @@ -169,6 +169,13 @@ static int agent_heartbeat_timer_callback(sd_event_source *event_source, UNUSED Agent *agent = (Agent *) userdata; int r = 0; + if (agent->connection_state == AGENT_CONNECTION_STATE_CONNECTING) { + bc_log_error("Agent connection attempt failed, retrying"); + if (agent->register_call_slot != NULL) { + sd_bus_slot_unrefp(&agent->register_call_slot); + } + agent->connection_state = AGENT_CONNECTION_STATE_RETRY; + } if (agent->connection_state == AGENT_CONNECTION_STATE_CONNECTED && agent_check_controller_liveness(agent)) { r = sd_bus_emit_signal( @@ -532,6 +539,9 @@ void agent_unref(Agent *agent) { sd_event_unrefp(&agent->event); } + if (agent->register_call_slot != NULL) { + sd_bus_slot_unrefp(&agent->register_call_slot); + } if (agent->metrics_slot != NULL) { sd_bus_slot_unrefp(&agent->metrics_slot); } @@ -2684,53 +2694,21 @@ void agent_stop(Agent *agent) { } } -static bool agent_connect(Agent *agent) { - peer_bus_close(agent->peer_dbus); - - bc_log_infof("Connecting to controller on %s", agent->orch_addr); - - agent->peer_dbus = peer_bus_open(agent->event, "peer-bus-to-controller", agent->orch_addr); - if (agent->peer_dbus == NULL) { - bc_log_error("Failed to open peer dbus"); - return false; - } - - bus_socket_set_options(agent->peer_dbus, agent->peer_socket_options); - - int r = sd_bus_add_object_vtable( - agent->peer_dbus, - NULL, - INTERNAL_AGENT_OBJECT_PATH, - INTERNAL_AGENT_INTERFACE, - internal_agent_vtable, - agent); - if (r < 0) { - bc_log_errorf("Failed to add agent vtable: %s", strerror(-r)); - return false; - } +static int agent_process_register_callback(sd_bus_message *m, Agent *agent) { + int r = 0; - _cleanup_sd_bus_message_ sd_bus_message *bus_msg = NULL; - sd_bus_error error = SD_BUS_ERROR_NULL; - r = sd_bus_call_method( - agent->peer_dbus, - BC_DBUS_NAME, - INTERNAL_CONTROLLER_OBJECT_PATH, - INTERNAL_CONTROLLER_INTERFACE, - "Register", - &error, - &bus_msg, - "s", - agent->name); - if (r < 0) { - bc_log_errorf("Registering as '%s' failed: %s", agent->name, error.message); - sd_bus_error_free(&error); - return false; + if (sd_bus_message_is_method_error(m, NULL)) { + bc_log_errorf("Registering as '%s' failed: %s", + agent->name, + sd_bus_message_get_error(m)->message); + return -EPERM; } - r = sd_bus_message_read(bus_msg, ""); + bc_log_info("Register call response received"); + r = sd_bus_message_read(m, ""); if (r < 0) { bc_log_errorf("Failed to parse response message: %s", strerror(-r)); - return false; + return r; } bc_log_infof("Connected to controller as '%s'", agent->name); @@ -2759,7 +2737,7 @@ static bool agent_connect(Agent *agent) { agent); if (r < 0) { bc_log_errorf("Failed to add heartbeat signal match: %s", strerror(-r)); - return false; + return r; } r = sd_bus_match_signal_async( @@ -2774,7 +2752,7 @@ static bool agent_connect(Agent *agent) { agent); if (r < 0) { bc_log_errorf("Failed to request match for Disconnected signal: %s", strerror(-r)); - return false; + return r; } /* re-emit ProxyNew signals */ @@ -2791,6 +2769,73 @@ static bool agent_connect(Agent *agent) { } } + return 0; +} + +static int agent_register_callback(sd_bus_message *m, void *userdata, UNUSED sd_bus_error *ret_error) { + Agent *agent = (Agent *) userdata; + + if (agent->connection_state != AGENT_CONNECTION_STATE_CONNECTING) { + bc_log_error("Agent is not in CONNECTING state, dropping Register callback"); + if (agent->register_call_slot != NULL) { + sd_bus_slot_unrefp(&agent->register_call_slot); + } + return -EPERM; + } + int r = 0; + r = agent_process_register_callback(m, agent); + if (r < 0) { + agent->connection_state = AGENT_CONNECTION_STATE_RETRY; + return r; + } + return 0; +} + +static bool agent_connect(Agent *agent) { + peer_bus_close(agent->peer_dbus); + + bc_log_infof("Connecting to controller on %s", agent->orch_addr); + agent->connection_state = AGENT_CONNECTION_STATE_CONNECTING; + + agent->peer_dbus = peer_bus_open(agent->event, "peer-bus-to-controller", agent->orch_addr); + if (agent->peer_dbus == NULL) { + bc_log_error("Failed to open peer dbus"); + return false; + } + + bus_socket_set_options(agent->peer_dbus, agent->peer_socket_options); + + int r = sd_bus_add_object_vtable( + agent->peer_dbus, + NULL, + INTERNAL_AGENT_OBJECT_PATH, + INTERNAL_AGENT_INTERFACE, + internal_agent_vtable, + agent); + if (r < 0) { + bc_log_errorf("Failed to add agent vtable: %s", strerror(-r)); + return false; + } + + _cleanup_sd_bus_message_ sd_bus_message *bus_msg = NULL; + sd_bus_error error = SD_BUS_ERROR_NULL; + r = sd_bus_call_method_async( + agent->peer_dbus, + NULL, // No slot needed since peer_dbus is closed on reconnect + BC_DBUS_NAME, + INTERNAL_CONTROLLER_OBJECT_PATH, + INTERNAL_CONTROLLER_INTERFACE, + "Register", + agent_register_callback, + agent, + "s", + agent->name); + if (r < 0) { + bc_log_errorf("Registering as '%s' failed: %s", agent->name, error.message); + sd_bus_error_free(&error); + return false; + } + return true; } diff --git a/src/agent/agent.h b/src/agent/agent.h index 9a41b9c821..6a8d7ba3a8 100644 --- a/src/agent/agent.h +++ b/src/agent/agent.h @@ -48,7 +48,8 @@ typedef struct JobTracker JobTracker; typedef enum { AGENT_CONNECTION_STATE_DISCONNECTED, AGENT_CONNECTION_STATE_CONNECTED, - AGENT_CONNECTION_STATE_RETRY + AGENT_CONNECTION_STATE_RETRY, + AGENT_CONNECTION_STATE_CONNECTING } AgentConnectionState; struct Agent { @@ -82,6 +83,7 @@ struct Agent { sd_bus *systemd_dbus; sd_bus *peer_dbus; + sd_bus_slot *register_call_slot; sd_bus_slot *metrics_slot; LIST_HEAD(SystemdRequest, outstanding_requests); diff --git a/tests/bluechi_test/config.py b/tests/bluechi_test/config.py index 4c1aeca24d..0f77ff1c00 100644 --- a/tests/bluechi_test/config.py +++ b/tests/bluechi_test/config.py @@ -117,6 +117,7 @@ def serialize(self) -> str: ControllerAddress={self.controller_address} HeartbeatInterval={self.heartbeat_interval} ControllerHeartbeatThreshold={self.controller_heartbeat_threshold} +RegisterCallTimeout={self.register_call_timeout} LogLevel={self.log_level} LogTarget={self.log_target} LogIsQuiet={self.log_is_quiet}