Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization of LTC signals #445

Merged
merged 17 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
* @brief Initialize the federation-specific parts of the environment struct.
*/
static void environment_init_federated(environment_t* env, int num_is_present_fields) {
#ifdef FEDERATED_DECENTRALIZED
#if defined(FEDERATED_CENTRALIZED)
env->need_to_send_LTC = false;
(void)num_is_present_fields;
#elif defined(FEDERATED_DECENTRALIZED)
if (num_is_present_fields > 0) {
env->_lf_intended_tag_fields = (tag_t**)calloc(num_is_present_fields, sizeof(tag_t*));
LF_ASSERT_NON_NULL(env->_lf_intended_tag_fields);
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {

enclave->completed = completed;

LF_PRINT_LOG("RTI received from federate/enclave %d the latest tag complete (LTC) " PRINTF_TAG ".", enclave->id,
LF_PRINT_LOG("RTI received from federate/enclave %d the latest tag confirmed (LTC) " PRINTF_TAG ".", enclave->id,
enclave->completed.time - start_time, enclave->completed.microstep);

// Check downstream scheduling_nodes to see whether they should now be granted a TAG.
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag);
* If M is equal to the NET of the federate, then return PTAG(M).
*
* This should be called whenever an immediately upstream federate sends to
* the RTI an LTC (latest tag complete), or when a transitive upstream
* the RTI an LTC (latest tag confirmed), or when a transitive upstream
* federate sends a NET (Next Event Tag) message.
* It is also called when an upstream federate resigns from the federation.
*
Expand Down
6 changes: 3 additions & 3 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
LF_MUTEX_UNLOCK(&rti_mutex);
}

void handle_latest_tag_complete(federate_info_t* fed) {
void handle_latest_tag_confirmed(federate_info_t* fed) {
unsigned char buffer[sizeof(int64_t) + sizeof(uint32_t)];
read_from_socket_fail_on_error(&fed->socket, sizeof(int64_t) + sizeof(uint32_t), buffer, NULL,
"RTI failed to read the content of the logical tag complete from federate %d.",
Expand Down Expand Up @@ -1110,8 +1110,8 @@ void* federate_info_thread_TCP(void* fed) {
case MSG_TYPE_NEXT_EVENT_TAG:
handle_next_event_tag(my_fed);
break;
case MSG_TYPE_LATEST_TAG_COMPLETE:
handle_latest_tag_complete(my_fed);
case MSG_TYPE_LATEST_TAG_CONFIRMED:
handle_latest_tag_confirmed(my_fed);
byeonggiljun marked this conversation as resolved.
Show resolved Hide resolved
break;
case MSG_TYPE_STOP_REQUEST:
handle_stop_request_message(my_fed); // FIXME: Reviewed until here.
Expand Down
6 changes: 3 additions & 3 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ void handle_port_absent_message(federate_info_t* sending_federate, unsigned char
void handle_timed_message(federate_info_t* sending_federate, unsigned char* buffer);

/**
* Handle a latest tag complete (LTC) message. @see
* MSG_TYPE_LATEST_TAG_COMPLETE in rti.h.
* Handle a latest tag confirmed (LTC) message. @see
* MSG_TYPE_LATEST_TAG_CONFIRMED in rti.h.
*
* This function assumes the caller does not hold the mutex.
*
* @param fed The federate that has completed a logical tag.
*/
void handle_latest_tag_complete(federate_info_t* fed);
void handle_latest_tag_confirmed(federate_info_t* fed);

/**
* Handle a next event tag (NET) message. @see MSG_TYPE_NEXT_EVENT_TAG in rti.h.
Expand Down
22 changes: 15 additions & 7 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static void send_time(unsigned char type, instant_t time) {
/**
* Send a tag to the RTI.
* This function acquires the lf_outbound_socket_mutex.
* @param type The message type (MSG_TYPE_NEXT_EVENT_TAG or MSG_TYPE_LATEST_TAG_COMPLETE).
* @param type The message type (MSG_TYPE_NEXT_EVENT_TAG or MSG_TYPE_LATEST_TAG_CONFIRMED).
* @param tag The tag.
*/
static void send_tag(unsigned char type, tag_t tag) {
Expand Down Expand Up @@ -1277,7 +1277,7 @@ static void handle_provisional_tag_advance_grant() {
// TAG. In either case, we know that at the PTAG tag, all outputs
// have either been sent or are absent, so we can send an LTC.
// Send an LTC to indicate absent outputs.
lf_latest_tag_complete(PTAG);
lf_latest_tag_confirmed(PTAG);
// Nothing more to do.
LF_MUTEX_UNLOCK(&env->mutex);
return;
Expand Down Expand Up @@ -2202,14 +2202,22 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
return NULL;
}

void lf_latest_tag_complete(tag_t tag_to_send) {
int compare_with_last_tag = lf_tag_compare(_fed.last_sent_LTC, tag_to_send);
if (compare_with_last_tag >= 0) {
void lf_latest_tag_confirmed(tag_t tag_to_send) {
environment_t* env;
if (lf_tag_compare(_fed.last_sent_LTC, tag_to_send) >= 0) {
return; // Already sent this or later tag.
}
}
_lf_get_environments(&env);
if (!env->need_to_send_LTC) {
LF_PRINT_LOG("Skip sending Latest Tag Confirmed (LTC) to the RTI because there was no tagged message with the "
"tag " PRINTF_TAG " that this federate has received.",
tag_to_send.time - start_time, tag_to_send.microstep);
return;
}
LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time,
LF_PRINT_LOG("Sending Latest Tag Confirmed (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time,
tag_to_send.microstep);
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
send_tag(MSG_TYPE_LATEST_TAG_CONFIRMED, tag_to_send);
byeonggiljun marked this conversation as resolved.
Show resolved Hide resolved
byeonggiljun marked this conversation as resolved.
Show resolved Hide resolved
_fed.last_sent_LTC = tag_to_send;
}

Expand Down
4 changes: 4 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ void _lf_start_time_step(environment_t* env) {
}
#endif // FEDERATED_DECENTRALIZED

#ifdef FEDERATED_CENTRALIZED
env->need_to_send_LTC = false;
#endif // FEDERATED_CENTRALIZED

// Reset absent fields on network ports because
// their status is unknown
lf_reset_status_fields_on_input_port_triggers();
Expand Down
9 changes: 9 additions & 0 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,15 @@ void _lf_worker_do_work(environment_t* env, int worker_number) {
worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index),
current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->deadline);

#ifdef FEDERATED_CENTRALIZED
if (current_reaction_to_execute->is_an_input_reaction) {
// This federate has received a tagged message with the current tag and
// must send LTC at the current tag to confirm that the federate has successfully
// received and processed tagged messages with the current tag.
env->need_to_send_LTC = true;
}
#endif // FEDERATED_CENTRALIZED

bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute);

if (!violation) {
Expand Down
1 change: 1 addition & 0 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ typedef struct environment_t {
#if defined(FEDERATED)
tag_t** _lf_intended_tag_fields;
int _lf_intended_tag_fields_size;
bool need_to_send_LTC;
#endif // FEDERATED
#ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef
enclave_info_t* enclave_info;
Expand Down
8 changes: 4 additions & 4 deletions include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ typedef struct federate_instance_t {
bool received_stop_request_from_rti;

/**
* A record of the most recently sent LTC (latest tag complete) message.
* A record of the most recently sent LTC (latest tag confirmed) message.
* In some situations, federates can send logical_tag_complete for
* the same tag twice or more in-a-row to the RTI. For example, when
* _lf_next() returns without advancing tag. To prevent overwhelming
* the RTI with extra messages, record the last sent logical tag
* complete message and check against it in lf_latest_tag_complete().
* complete message and check against it in lf_latest_tag_confirmed().
*
* @note Here, the underlying assumption is that the TCP stack will
* deliver the Logical TAG Complete message to the RTI eventually
Expand Down Expand Up @@ -291,7 +291,7 @@ void lf_enqueue_port_absent_reactions(environment_t* env);
void* lf_handle_p2p_connections_from_federates(void*);

/**
* @brief Send a latest tag complete (LTC) signal to the RTI.
* @brief Send a latest tag confirmed (LTC) signal to the RTI.
*
* This avoids the send if an equal or later LTC has previously been sent.
*
Expand All @@ -300,7 +300,7 @@ void* lf_handle_p2p_connections_from_federates(void*);
*
* @param tag_to_send The tag to send.
*/
void lf_latest_tag_complete(tag_t);
void lf_latest_tag_confirmed(tag_t);

/**
* @brief Parse the address of the RTI and store them into the global federation_metadata struct.
Expand Down
6 changes: 3 additions & 3 deletions include/core/federated/network/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* each federate has a valid event at the start tag (start time, 0) and it will
* inform the RTI of this event.
* Subsequently, at the conclusion of each tag, each federate will send a
* `MSG_TYPE_LATEST_TAG_COMPLETE` followed by a `MSG_TYPE_NEXT_EVENT_TAG` (see
* `MSG_TYPE_LATEST_TAG_CONFIRMED` followed by a `MSG_TYPE_NEXT_EVENT_TAG` (see
* the comment for each message for further explanation). Each federate would
* have to wait for a `MSG_TYPE_TAG_ADVANCE_GRANT` or a
* `MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT` before it can advance to a
Expand Down Expand Up @@ -434,12 +434,12 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#define MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT 8

/**
* Byte identifying a latest tag complete (LTC) message sent by a federate
* Byte identifying a latest tag confirmed (LTC) message sent by a federate
* to the RTI.
* The next eight bytes will be the timestep of the completed tag.
* The next four bytes will be the microsteps of the completed tag.
*/
#define MSG_TYPE_LATEST_TAG_COMPLETE 9
#define MSG_TYPE_LATEST_TAG_CONFIRMED 9

/////////// Messages used in lf_request_stop() ///////////////
//// Overview of the algorithm:
Expand Down
2 changes: 1 addition & 1 deletion lingua-franca-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
master
LTC-optimization
Loading