Skip to content

Commit

Permalink
Merge pull request #279 from lf-lang/minor-cleanups
Browse files Browse the repository at this point in the history
Minor cleanups
  • Loading branch information
lhstrh authored Oct 2, 2023
2 parents eb96e25 + 7c009b1 commit fedd174
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 41 deletions.
57 changes: 35 additions & 22 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ void set_network_port_status(int portID, port_status_t status) {
*
* This assumes the caller holds the mutex.
*
* @param tag The tag on which the latest status of network input
* @param tag The tag on which the latest status of all network input
* ports is known.
*/
void update_last_known_status_on_input_ports(tag_t tag) {
Expand Down Expand Up @@ -1280,18 +1280,16 @@ void update_last_known_status_on_input_ports(tag_t tag) {
// FIXME: We could put a condition variable into the trigger_t
// struct for each network input port, in which case this won't
// be a broadcast but rather a targetted signal.
if (notify) {
update_max_level(tag, false);
if (notify && update_max_level(tag, false)) {
// Notify network input reactions
lf_cond_broadcast(&port_status_changed);
}
}


/**
* Update the last known status tag of a network input port
* to the value of "tag". This is the largest tag at which the status
* (present or absent) of the port was known.
* (present or absent) of the port is known.
*
* This function assumes the caller holds the mutex, and, if the tag
* actually increases, it broadcasts on `port_status_changed`.
Expand Down Expand Up @@ -1663,14 +1661,14 @@ void handle_message(int socket, int fed_id) {
_lf_schedule_value(action, 0, message_contents, length);
}

void stall_advance_level_federation(environment_t* env, size_t next_reaction_level) {
LF_PRINT_DEBUG("Trying to acquire the global mutex.");
void stall_advance_level_federation(environment_t* env, size_t level) {
LF_PRINT_DEBUG("Acquiring the environment mutex.");
lf_mutex_lock(&env->mutex);
LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", next_reaction_level, max_level_allowed_to_advance);
while (((int) next_reaction_level) >= max_level_allowed_to_advance) {
LF_PRINT_DEBUG("Waiting on MLAA with next_reaction_level %zu and MLAA %d.", level, max_level_allowed_to_advance);
while (((int) level) >= max_level_allowed_to_advance) {
lf_cond_wait(&port_status_changed);
};
LF_PRINT_DEBUG("Exiting wait with MLAA %d and next_reaction_level %zu.", max_level_allowed_to_advance, next_reaction_level);
LF_PRINT_DEBUG("Exiting wait with MLAA %d and next_reaction_level %zu.", max_level_allowed_to_advance, level);
lf_mutex_unlock(&env->mutex);
}

Expand Down Expand Up @@ -1723,7 +1721,7 @@ void handle_tagged_message(int socket, int fed_id) {
lf_action_base_t* action = _lf_action_for_port(port_id);

// Record the physical time of arrival of the message
action->trigger->physical_time_of_arrival = lf_time_physical();
instant_t time_of_arrival = lf_time_physical();

if (action->trigger->is_physical) {
// Messages sent on physical connections should be handled via handle_message().
Expand All @@ -1741,8 +1739,8 @@ void handle_tagged_message(int socket, int fed_id) {
// If something happens, make sure to release the barrier.
_lf_increment_tag_barrier(env, intended_tag);
#endif
LF_PRINT_LOG("Received message with tag: " PRINTF_TAG ", Current tag: " PRINTF_TAG ".",
intended_tag.time - start_time, intended_tag.microstep,
LF_PRINT_LOG("Received message on port %d with tag: " PRINTF_TAG ", Current tag: " PRINTF_TAG ".",
port_id, intended_tag.time - start_time, intended_tag.microstep,
lf_time_logical_elapsed(env), env->current_tag.microstep);

// Read the payload.
Expand All @@ -1756,6 +1754,8 @@ void handle_tagged_message(int socket, int fed_id) {

lf_mutex_lock(&env->mutex);

action->trigger->physical_time_of_arrival = time_of_arrival;

// Create a token for the message
lf_token_t* message_token = _lf_new_token((token_type_t*)action, message_contents, length);

Expand Down Expand Up @@ -1916,25 +1916,38 @@ void _lf_logical_tag_complete(tag_t tag_to_send) {
_fed.last_sent_LTC = tag_to_send;
}

void update_max_level(tag_t tag, bool is_provisional) {
bool update_max_level(tag_t tag, bool is_provisional) {
// This always needs the top-level environment, which will be env[0].
environment_t *env;
_lf_get_environments(&env);
int prev_max_level_allowed_to_advance = max_level_allowed_to_advance;
max_level_allowed_to_advance = INT_MAX;
LF_PRINT_DEBUG("last_TAG=" PRINTF_TIME, tag.time);
if ((lf_tag_compare(env->current_tag, tag) < 0) || (
lf_tag_compare(env->current_tag, tag) == 0 && !is_provisional
)) {
LF_PRINT_DEBUG("Updated MLAA to %d at time " PRINTF_TIME " with last_TAG=" PRINTF_TIME " and current time " PRINTF_TIME ".", max_level_allowed_to_advance, lf_time_logical_elapsed(env), tag.time, env->current_tag.time);
return; // Safe to complete the current tag
LF_PRINT_DEBUG("Updated MLAA to %d at time " PRINTF_TIME ".",
max_level_allowed_to_advance,
lf_time_logical_elapsed(env)
);
// Safe to complete the current tag
return (prev_max_level_allowed_to_advance != max_level_allowed_to_advance);
}
for (int i = 0; i < _lf_zero_delay_action_table_size; i++) {
lf_action_base_t* input_port_action = _lf_zero_delay_action_table[i];
if (lf_tag_compare(env->current_tag,
input_port_action->trigger->last_known_status_tag) > 0 && !input_port_action->trigger->is_physical) {
max_level_allowed_to_advance = LF_MIN(max_level_allowed_to_advance, ((int) LF_LEVEL(input_port_action->trigger->reactions[0]->index)));
input_port_action->trigger->last_known_status_tag) > 0
&& !input_port_action->trigger->is_physical) {
max_level_allowed_to_advance = LF_MIN(
max_level_allowed_to_advance,
((int) LF_LEVEL(input_port_action->trigger->reactions[0]->index))
);
}
}
LF_PRINT_DEBUG("Updated MLAA to %d at time " PRINTF_TIME " with %lld items in zero-delay action table.", max_level_allowed_to_advance, lf_time_logical_elapsed(env), (long long) _lf_zero_delay_action_table_size);
LF_PRINT_DEBUG("Updated MLAA to %d at time " PRINTF_TIME ".",
max_level_allowed_to_advance,
lf_time_logical_elapsed(env)
);
return (prev_max_level_allowed_to_advance != max_level_allowed_to_advance);
}

#ifdef FEDERATED_DECENTRALIZED
Expand Down Expand Up @@ -2095,7 +2108,7 @@ void handle_provisional_tag_advance_grant() {
// it is already treating the current tag as PTAG cycle (e.g. at the
// start time) or it will be completing the current cycle and sending
// a LTC message shortly. In either case, there is nothing more to do.
lf_mutex_unlock(&env->mutex);
lf_mutex_unlock(&env->mutex);
return;
} else if (lf_tag_compare(env->current_tag, PTAG) > 0) {
// Current tag is greater than the PTAG.
Expand All @@ -2109,7 +2122,7 @@ void handle_provisional_tag_advance_grant() {
// Send an LTC to indicate absent outputs.
_lf_logical_tag_complete(PTAG);
// Nothing more to do.
lf_mutex_unlock(&env->mutex);
lf_mutex_unlock(&env->mutex);
return;
} else if (PTAG.time == env->current_tag.time) {
// We now know env->current_tag < PTAG, but the times are equal.
Expand Down
6 changes: 3 additions & 3 deletions core/threaded/scheduler_GEDF_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ int _lf_sched_distribute_ready_reactions(lf_scheduler_t* scheduler) {
// a mutex.
for (; scheduler->next_reaction_level <=
scheduler->max_reaction_level;
try_advance_level(scheduler->env, &scheduler->next_reaction_level)) {
try_advance_level(scheduler->env, &scheduler->next_reaction_level)) {
tmp_queue = ((pqueue_t**)scheduler->triggered_reactions)
[scheduler->next_reaction_level];
size_t reactions_to_execute = pqueue_size(tmp_queue);
Expand Down Expand Up @@ -147,13 +147,13 @@ void _lf_sched_signal_stop(lf_scheduler_t* scheduler) {
*/
void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) {
environment_t* env = scheduler->env;

// Executing queue must be empty when this is called.
assert(pqueue_size((pqueue_t*)scheduler->executing_reactions) == 0);

// Loop until it's time to stop or work has been distributed
while (true) {
if (scheduler->next_reaction_level ==
(scheduler->max_reaction_level + 1)) {
if (scheduler->next_reaction_level == (scheduler->max_reaction_level + 1)) {
scheduler->next_reaction_level = 0;
lf_mutex_lock(&env->mutex);
// Nothing more happening at this tag.
Expand Down
28 changes: 17 additions & 11 deletions include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,23 +352,29 @@ void send_port_absent_to_federate(environment_t* env, interval_t, unsigned short
void enqueue_port_absent_reactions(environment_t* env);

/**
* @brief Prevent the advancement to the next level of the reaction queue until the
* level we try to advance to is known to be under the max level allowed to advance.
*
* @param next_reaction_level
* @brief Wait until inputs statuses are known up to and including the specified level.
* Specifically, wait until the specified level is less that the max level allowed to
* advance (MLAA).
* @param env The environment (which should always be the top-level environment).
* @param level The level to which we would like to advance.
*/
void stall_advance_level_federation(environment_t*, size_t);
void stall_advance_level_federation(environment_t* env, size_t level);

/**
* @brief Attempts to update the max level the reaction queue is allowed to advance to
* for the current logical timestep.
*
* @param tag The latest TAG received by this federate.
* @param is_provisional Whether the latest tag was provisional
* @brief Update the max level allowed to advance (MLAA).
* If the specified tag is greater than the current_tag of the top-level environment
* (or equal an is_provisional is false), then set the MLAA to MAX_INT and return.
* This removes any barriers on execution at the current tag due to network inputs.
* Otherwise, set the MLAA to the minimum level over all (non-physical) network input ports
* where the status of the input port is not known at that current_tag.
*
* This function assumes that the caller holds the mutex.
*
* @param tag The latest TAG or PTAG received by this federate.
* @param is_provisional Whether the tag was provisional.
* @return True if the MLAA changed.
*/
void update_max_level(tag_t, bool);
bool update_max_level(tag_t tag, bool is_provisional);

/**
* Send a message to another federate directly or via the RTI.
Expand Down
10 changes: 5 additions & 5 deletions include/core/threaded/reactor_threaded.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
#include "lf_types.h"

/**
* @brief Attempt to advance the current reaction level to the next level
* in the reaction queue. For federated runtimes, this function should
* @brief Advance to the next level.
* For federated runtimes, this function should
* stall the advance until we know that we can safely execute the next level
* given knowledge about upstream network port statuses.
*
* @param next_reaction_level
* @param env The environment.
* @param next_reaction_level The place to store the next reaction level.
*/
void try_advance_level(environment_t*, volatile size_t*);
void try_advance_level(environment_t* env, volatile size_t* next_reaction_level);

/**
* Enqueue port absent reactions that will send a PORT_ABSENT
Expand Down

0 comments on commit fedd174

Please sign in to comment.