Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check liveness #12

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/agent/agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
# Defines the interval between two heartbeat signals sent to bluechi in milliseconds. A value of 0 disables it.
#HeartbeatInterval=2000

#
# Defines the threshold to actively disconnect bluechi-agent based on the time difference from the last received heartbeat
# signal from bluechi-controller. In milliseconds. A value of 0 disables it.
#ControllerHeartbeatThreshold=0

#
# The level used for logging. Supported values are: DEBUG, INFO, WARN and ERROR.
#LogLevel=INFO
Expand Down
4 changes: 2 additions & 2 deletions config/controller/controller.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

#
# Defines the interval for which the bluechi-controller periodically checks the connectivity of all nodes based on
# the received heartbeat signals sent to bluechi-agent to bluechi-controller, in milliseconds.
# the received heartbeat signals sent to bluechi-agent to bluechi-controller, in milliseconds. A value of 0 disables it.
#HeartbeatInterval=0

#
# Defines the threshold to actively disconnect nodes based on the time difference from the last received heartbeat
# signal from the respective bluechi-agent. In milliseconds.
# signal from the respective bluechi-agent. In milliseconds. A value of 0 disables it.
#NodeHeartbeatThreshold=6000

#
Expand Down
9 changes: 9 additions & 0 deletions data/org.eclipse.bluechi.Agent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="false" />
</property>

<!--
LastSeenTimestamp:

A timestamp indicating when the last connection test (e.g. via heartbeat) was successful.
-->
<property name="LastSeenTimestamp" type="t" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="false" />
</property>

<!--
ControllerAddress:

Expand Down
1 change: 1 addition & 0 deletions data/org.eclipse.bluechi.internal.Controller.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
<method name="Register">
<arg name="name" type="s" direction="in" />
</method>
<signal name="Heartbeat" />
</interface>
</node>
6 changes: 6 additions & 0 deletions doc/docs/api/description.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ Object path: `/org/eclipse/bluechi/internal`
Before anything else can happen the node must call this method to register with the controller, giving its unique name.
If this succeeds, then the controller will consider the node online and start forwarding operations to it.

#### Signals

* `Heartbeat()`

This is a periodic signal from the controller to the agent.

### interface org.eclipse.bluechi.internal.Agent

This is the main interface that the node implements and that is used by the controller to affect change on the node.
Expand Down
4 changes: 4 additions & 0 deletions doc/man/bluechi-agent.conf.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ The port on which `bluechi` is listening for connection request and the `bluechi

The interval between two heartbeat signals sent to bluechi in milliseconds. If an agent is not connected, it will retry to connect on each heartbeat. Setting this options to values smaller or equal to 0 disables it. This option will overwrite the heartbeat interval defined in the configuration file.

#### **ControllerHeartbeatThreshold** (long)

The threshold in milliseconds to determine whether a bluechi agent is disconnected. If the controller's last heartbeat signal was received before this threshold, bluechi agent assumes that the controller is down or the connection was cut off and performs a disconnect.

#### **LogLevel** (string)

The level used for logging. Supported values are:
Expand Down
87 changes: 85 additions & 2 deletions src/agent/agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,43 @@ static int agent_disconnected(UNUSED sd_bus_message *message, void *userdata, UN

static int agent_reset_heartbeat_timer(Agent *agent, sd_event_source **event_source);

static bool agent_check_controller_liveness(Agent *agent) {
uint64_t diff = 0;
uint64_t now = 0;

if (agent->controller_heartbeat_threshold_msec <= 0) {
/* checking liveness by heartbeat disabled since configured threshold is <=0 */
return true;
}

now = get_time_micros();
if (now == 0) {
bc_log_error("Failed to get the time");
return true;
}

if (now < agent->controller_last_seen) {
bc_log_error("Clock skew detected");
return true;
}

diff = now - agent->controller_last_seen;
if (diff > (uint64_t) agent->controller_heartbeat_threshold_msec * USEC_PER_MSEC) {
bc_log_infof("Did not receive heartbeat from controller since '%d'ms. Disconnecting it...",
agent->controller_heartbeat_threshold_msec);
agent_disconnected(NULL, agent, NULL);
return false;
}

return true;
}

static int agent_heartbeat_timer_callback(sd_event_source *event_source, UNUSED uint64_t usec, void *userdata) {
Agent *agent = (Agent *) userdata;

int r = 0;
if (agent->connection_state == AGENT_CONNECTION_STATE_CONNECTED) {
if (agent->connection_state == AGENT_CONNECTION_STATE_CONNECTED &&
agent_check_controller_liveness(agent)) {
r = sd_bus_emit_signal(
agent->peer_dbus,
INTERNAL_AGENT_OBJECT_PATH,
Expand Down Expand Up @@ -415,6 +447,7 @@ Agent *agent_new(void) {
agent->unit_infos = unit_infos;
agent->connection_state = AGENT_CONNECTION_STATE_DISCONNECTED;
agent->connection_retry_count = 0;
agent->controller_last_seen = 0;
agent->wildcard_subscription_active = false;
agent->metrics_enabled = false;
agent->disconnect_timestamp = 0;
Expand Down Expand Up @@ -557,6 +590,16 @@ bool agent_set_heartbeat_interval(Agent *agent, const char *interval_msec) {
return true;
}

bool agent_set_controller_heartbeat_threshold(Agent *agent, const char *threshold_msec) {
long threshold = 0;

if (!parse_long(threshold_msec, &threshold)) {
bc_log_errorf("Invalid heartbeat threshold format '%s'", threshold_msec);
return false;
}
agent->controller_heartbeat_threshold_msec = threshold;
return true;
}

void agent_set_systemd_user(Agent *agent, bool systemd_user) {
agent->systemd_user = systemd_user;
Expand Down Expand Up @@ -628,6 +671,13 @@ bool agent_apply_config(Agent *agent) {
}
}

value = cfg_get_value(agent->config, CFG_CONTROLLER_HEARTBEAT_THRESHOLD);
if (value) {
if (!agent_set_controller_heartbeat_threshold(agent, value)) {
return false;
}
}

/* Set socket options used for peer connections with the agents */
const char *keepidle = cfg_get_value(agent->config, CFG_TCP_KEEPALIVE_TIME);
if (keepidle) {
Expand Down Expand Up @@ -1914,7 +1964,7 @@ static int agent_property_get_status(
}

/*************************************************************************
**** org.eclipse.bluechi.Agent.LastSuccessfulHeartbeat ****************
**** org.eclipse.bluechi.Agent.DisconnectTimestamp ****************
*************************************************************************/

static int agent_property_get_disconnect_timestamp(
Expand Down Expand Up @@ -1977,6 +2027,11 @@ static const sd_bus_vtable agent_vtable[] = {
agent_property_get_disconnect_timestamp,
0,
SD_BUS_VTABLE_PROPERTY_EXPLICIT),
SD_BUS_PROPERTY("LastSeenTimestamp",
"t",
NULL,
offsetof(Agent, controller_last_seen),
SD_BUS_VTABLE_PROPERTY_EXPLICIT),
SD_BUS_PROPERTY("ControllerAddress",
"s",
NULL,
Expand Down Expand Up @@ -2251,6 +2306,19 @@ static int agent_match_job_removed(sd_bus_message *m, void *userdata, UNUSED sd_
return 0;
}

static int agent_match_heartbeat(UNUSED sd_bus_message *m, void *userdata, UNUSED sd_bus_error *error) {
Agent *agent = userdata;

uint64_t now = get_time_micros();
if (now == 0) {
bc_log_error("Failed to get current time on heartbeat");
return 0;
}

agent->controller_last_seen = now;
return 1;
}

static int debug_systemd_message_handler(
sd_bus_message *m, UNUSED void *userdata, UNUSED sd_bus_error *ret_error) {
bc_log_infof("Incoming message from systemd: path: %s, iface: %s, member: %s, signature: '%s'",
Expand Down Expand Up @@ -2661,6 +2729,7 @@ static bool agent_connect(Agent *agent) {

agent->connection_state = AGENT_CONNECTION_STATE_CONNECTED;
agent->connection_retry_count = 0;
agent->controller_last_seen = get_time_micros();
agent->disconnect_timestamp = 0;

r = sd_bus_emit_properties_changed(
Expand All @@ -2669,6 +2738,20 @@ static bool agent_connect(Agent *agent) {
bc_log_errorf("Failed to emit status property changed: %s", strerror(-r));
}

r = sd_bus_match_signal(
agent->peer_dbus,
NULL,
NULL,
INTERNAL_CONTROLLER_OBJECT_PATH,
INTERNAL_CONTROLLER_INTERFACE,
CONTROLLER_HEARTBEAT_SIGNAL_NAME,
agent_match_heartbeat,
agent);
if (r < 0) {
bc_log_errorf("Failed to add heartbeat signal match: %s", strerror(-r));
return false;
}

r = sd_bus_match_signal_async(
agent->peer_dbus,
NULL,
Expand Down
2 changes: 2 additions & 0 deletions src/agent/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ struct Agent {
int port;
char *controller_address;
long heartbeat_interval_msec;
long controller_heartbeat_threshold_msec;

AgentConnectionState connection_state;
uint64_t connection_retry_count;
uint64_t controller_last_seen;
time_t disconnect_timestamp;

char *orch_addr;
Expand Down
9 changes: 9 additions & 0 deletions src/bindings/python/bluechi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ def disconnect_timestamp(self) -> UInt64:
"""
return self.get_proxy().DisconnectTimestamp

@property
def last_seen_timestamp(self) -> UInt64:
"""
LastSeenTimestamp:

A timestamp indicating when the last connection test (e.g. via heartbeat) was successful.
"""
return self.get_proxy().LastSeenTimestamp

@property
def log_level(self) -> str:
"""
Expand Down
61 changes: 43 additions & 18 deletions src/controller/controller.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ bool controller_apply_config(Controller *controller) {
}
}

const char *threshold_msec = cfg_get_value(controller->config, CFG_HEARTBEAT_THRESHOLD);
const char *threshold_msec = cfg_get_value(controller->config, CFG_NODE_HEARTBEAT_THRESHOLD);
if (threshold_msec) {
if (!controller_set_heartbeat_threshold(controller, threshold_msec)) {
return false;
Expand Down Expand Up @@ -483,37 +483,62 @@ static bool controller_setup_node_connection_handler(Controller *controller) {

static int controller_reset_heartbeat_timer(Controller *controller, sd_event_source **event_source);

static bool controller_check_node_liveness(Controller *controller, Node *node, uint64_t now) {
uint64_t diff = 0;

if (controller->heartbeat_threshold_msec <= 0) {
/* checking liveness of node by heartbeat disabled since configured threshold is <=0" */
return true;
}

if (now == 0) {
bc_log_error("Current time is wrong");
return true;
}

if (now < node->last_seen) {
bc_log_error("Clock skew detected");
return true;
}

diff = now - node->last_seen;
if (diff > (uint64_t) controller->heartbeat_threshold_msec * USEC_PER_MSEC) {
bc_log_infof("Did not receive heartbeat from node '%s' since '%d'ms. Disconnecting it...",
node->name,
controller->heartbeat_threshold_msec);
node_disconnect(node);
return false;
}

return true;
}

static int controller_heartbeat_timer_callback(
sd_event_source *event_source, UNUSED uint64_t usec, void *userdata) {
Controller *controller = (Controller *) userdata;
Node *node = NULL;
uint64_t now = get_time_micros();
int r = 0;

LIST_FOREACH(nodes, node, controller->nodes) {
uint64_t diff = 0;
uint64_t now = 0;

if (!node_is_online(node)) {
continue;
}

now = get_time_micros();
if (now == 0) {
bc_log_error("Failed to get the time");
if (!controller_check_node_liveness(controller, node, now)) {
continue;
}

if (now < node->last_seen) {
bc_log_error("Clock skew detected");
continue;
}

diff = now - node->last_seen;
if (diff > (uint64_t) controller->heartbeat_threshold_msec * USEC_PER_MSEC) {
bc_log_infof("Did not receive heartbeat from node '%s' since '%d'ms. Disconnecting it...",
node->name,
controller->heartbeat_threshold_msec);
node_disconnect(node);
r = sd_bus_emit_signal(
node->agent_bus,
INTERNAL_CONTROLLER_OBJECT_PATH,
INTERNAL_CONTROLLER_INTERFACE,
"Heartbeat",
"");
if (r < 0) {
bc_log_errorf("Failed to emit heartbeat signal to node '%s': %s",
node->name,
strerror(-r));
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/controller/node.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ static int node_property_get_last_seen(
sd_bus_error *ret_error);

static const sd_bus_vtable internal_controller_controller_vtable[] = {
SD_BUS_VTABLE_START(0), SD_BUS_METHOD("Register", "s", "", node_method_register, 0), SD_BUS_VTABLE_END
SD_BUS_VTABLE_START(0),
SD_BUS_METHOD("Register", "s", "", node_method_register, 0),
SD_BUS_SIGNAL("Heartbeat", "", 0),
SD_BUS_VTABLE_END
};

static const sd_bus_vtable node_vtable[] = {
Expand Down
9 changes: 8 additions & 1 deletion src/libbluechi/common/cfg.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,13 @@ int cfg_agent_def_conf(struct config *config) {
return result;
}

if ((result = cfg_set_value(
config,
CFG_CONTROLLER_HEARTBEAT_THRESHOLD,
AGENT_DEFAULT_CONTROLLER_HEARTBEAT_THRESHOLD_MSEC)) != 0) {
return result;
}

if ((result = cfg_set_value(config, CFG_CONTROLLER_PORT, BC_DEFAULT_PORT)) != 0) {
return result;
}
Expand Down Expand Up @@ -515,7 +522,7 @@ int cfg_controller_def_conf(struct config *config) {

if ((result = cfg_set_value(
config,
CFG_HEARTBEAT_THRESHOLD,
CFG_NODE_HEARTBEAT_THRESHOLD,
CONTROLLER_DEFAULT_NODE_HEARTBEAT_THRESHOLD_MSEC)) != 0) {
return result;
}
Expand Down
3 changes: 2 additions & 1 deletion src/libbluechi/common/cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
#define CFG_NODE_NAME "NodeName"
#define CFG_ALLOWED_NODE_NAMES "AllowedNodeNames"
#define CFG_HEARTBEAT_INTERVAL "HeartbeatInterval"
#define CFG_HEARTBEAT_THRESHOLD "NodeHeartbeatThreshold"
#define CFG_NODE_HEARTBEAT_THRESHOLD "NodeHeartbeatThreshold"
#define CFG_CONTROLLER_HEARTBEAT_THRESHOLD "ControllerHeartbeatThreshold"
#define CFG_IP_RECEIVE_ERRORS "IPReceiveErrors"
#define CFG_TCP_KEEPALIVE_TIME "TCPKeepAliveTime"
#define CFG_TCP_KEEPALIVE_INTERVAL "TCPKeepAliveInterval"
Expand Down
Loading