diff --git a/core/environment.c b/core/environment.c index d2d56a593..af192c09b 100644 --- a/core/environment.c +++ b/core/environment.c @@ -117,7 +117,10 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st * @brief Initialize the federation-specific parts of the environment struct. */ static void environment_init_federated(environment_t* env, int num_is_present_fields) { -#ifdef FEDERATED_DECENTRALIZED +#if defined(FEDERATED_CENTRALIZED) + env->need_to_send_LTC = false; + (void)num_is_present_fields; +#elif defined(FEDERATED_DECENTRALIZED) if (num_is_present_fields > 0) { env->_lf_intended_tag_fields = (tag_t**)calloc(num_is_present_fields, sizeof(tag_t*)); LF_ASSERT_NON_NULL(env->_lf_intended_tag_fields); diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index 157425811..3a1b16fab 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -64,7 +64,7 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) { enclave->completed = completed; - LF_PRINT_LOG("RTI received from federate/enclave %d the latest tag complete (LTC) " PRINTF_TAG ".", enclave->id, + LF_PRINT_LOG("RTI received from federate/enclave %d the latest tag confirmed (LTC) " PRINTF_TAG ".", enclave->id, enclave->completed.time - start_time, enclave->completed.microstep); // Check downstream scheduling_nodes to see whether they should now be granted a TAG. diff --git a/core/federated/RTI/rti_common.h b/core/federated/RTI/rti_common.h index d4e0c1236..615ca8a8a 100644 --- a/core/federated/RTI/rti_common.h +++ b/core/federated/RTI/rti_common.h @@ -199,7 +199,7 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag); * If M is equal to the NET of the federate, then return PTAG(M). * * This should be called whenever an immediately upstream federate sends to - * the RTI an LTC (latest tag complete), or when a transitive upstream + * the RTI an LTC (latest tag confirmed), or when a transitive upstream * federate sends a NET (Next Event Tag) message. * It is also called when an upstream federate resigns from the federation. * diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 4efe438e6..3daaeff7e 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -477,7 +477,7 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff LF_MUTEX_UNLOCK(&rti_mutex); } -void handle_latest_tag_complete(federate_info_t* fed) { +void handle_latest_tag_confirmed(federate_info_t* fed) { unsigned char buffer[sizeof(int64_t) + sizeof(uint32_t)]; read_from_socket_fail_on_error(&fed->socket, sizeof(int64_t) + sizeof(uint32_t), buffer, NULL, "RTI failed to read the content of the logical tag complete from federate %d.", @@ -1110,8 +1110,8 @@ void* federate_info_thread_TCP(void* fed) { case MSG_TYPE_NEXT_EVENT_TAG: handle_next_event_tag(my_fed); break; - case MSG_TYPE_LATEST_TAG_COMPLETE: - handle_latest_tag_complete(my_fed); + case MSG_TYPE_LATEST_TAG_CONFIRMED: + handle_latest_tag_confirmed(my_fed); break; case MSG_TYPE_STOP_REQUEST: handle_stop_request_message(my_fed); // FIXME: Reviewed until here. diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index a83179f62..8d3012b95 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -217,14 +217,14 @@ void handle_port_absent_message(federate_info_t* sending_federate, unsigned char void handle_timed_message(federate_info_t* sending_federate, unsigned char* buffer); /** - * Handle a latest tag complete (LTC) message. @see - * MSG_TYPE_LATEST_TAG_COMPLETE in rti.h. + * Handle a latest tag confirmed (LTC) message. @see + * MSG_TYPE_LATEST_TAG_CONFIRMED in rti.h. * * This function assumes the caller does not hold the mutex. * * @param fed The federate that has completed a logical tag. */ -void handle_latest_tag_complete(federate_info_t* fed); +void handle_latest_tag_confirmed(federate_info_t* fed); /** * Handle a next event tag (NET) message. @see MSG_TYPE_NEXT_EVENT_TAG in rti.h. diff --git a/core/federated/federate.c b/core/federated/federate.c index ad1ec1741..c1bd4cbea 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -124,7 +124,7 @@ static void send_time(unsigned char type, instant_t time) { /** * Send a tag to the RTI. * This function acquires the lf_outbound_socket_mutex. - * @param type The message type (MSG_TYPE_NEXT_EVENT_TAG or MSG_TYPE_LATEST_TAG_COMPLETE). + * @param type The message type (MSG_TYPE_NEXT_EVENT_TAG or MSG_TYPE_LATEST_TAG_CONFIRMED). * @param tag The tag. */ static void send_tag(unsigned char type, tag_t tag) { @@ -1277,7 +1277,7 @@ static void handle_provisional_tag_advance_grant() { // TAG. In either case, we know that at the PTAG tag, all outputs // have either been sent or are absent, so we can send an LTC. // Send an LTC to indicate absent outputs. - lf_latest_tag_complete(PTAG); + lf_latest_tag_confirmed(PTAG); // Nothing more to do. LF_MUTEX_UNLOCK(&env->mutex); return; @@ -2202,14 +2202,21 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { return NULL; } -void lf_latest_tag_complete(tag_t tag_to_send) { - int compare_with_last_tag = lf_tag_compare(_fed.last_sent_LTC, tag_to_send); - if (compare_with_last_tag >= 0) { +void lf_latest_tag_confirmed(tag_t tag_to_send) { + environment_t* env; + if (lf_tag_compare(_fed.last_sent_LTC, tag_to_send) >= 0) { + return; // Already sent this or later tag. + } + _lf_get_environments(&env); + if (!env->need_to_send_LTC) { + LF_PRINT_LOG("Skip sending Latest Tag Confirmed (LTC) to the RTI because there was no tagged message with the " + "tag " PRINTF_TAG " that this federate has received.", + tag_to_send.time - start_time, tag_to_send.microstep); return; } - LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time, + LF_PRINT_LOG("Sending Latest Tag Confirmed (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time, tag_to_send.microstep); - send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send); + send_tag(MSG_TYPE_LATEST_TAG_CONFIRMED, tag_to_send); _fed.last_sent_LTC = tag_to_send; } diff --git a/core/reactor_common.c b/core/reactor_common.c index 55d3342ca..34f9d68eb 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -210,6 +210,10 @@ void _lf_start_time_step(environment_t* env) { } #endif // FEDERATED_DECENTRALIZED +#ifdef FEDERATED_CENTRALIZED + env->need_to_send_LTC = false; +#endif // FEDERATED_CENTRALIZED + // Reset absent fields on network ports because // their status is unknown lf_reset_status_fields_on_input_port_triggers(); diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 8023dd10d..ce4c463b5 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -881,6 +881,15 @@ void _lf_worker_do_work(environment_t* env, int worker_number) { worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index), current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->deadline); +#ifdef FEDERATED_CENTRALIZED + if (current_reaction_to_execute->is_an_input_reaction) { + // This federate has received a tagged message with the current tag and + // must send LTC at the current tag to confirm that the federate has successfully + // received and processed tagged messages with the current tag. + env->need_to_send_LTC = true; + } +#endif // FEDERATED_CENTRALIZED + bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute); if (!violation) { diff --git a/include/core/environment.h b/include/core/environment.h index 038f97e4e..8099eed26 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -87,6 +87,7 @@ typedef struct environment_t { #if defined(FEDERATED) tag_t** _lf_intended_tag_fields; int _lf_intended_tag_fields_size; + bool need_to_send_LTC; #endif // FEDERATED #ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef enclave_info_t* enclave_info; diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 230f3e277..8ab5dab2e 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -150,12 +150,12 @@ typedef struct federate_instance_t { bool received_stop_request_from_rti; /** - * A record of the most recently sent LTC (latest tag complete) message. + * A record of the most recently sent LTC (latest tag confirmed) message. * In some situations, federates can send logical_tag_complete for * the same tag twice or more in-a-row to the RTI. For example, when * _lf_next() returns without advancing tag. To prevent overwhelming * the RTI with extra messages, record the last sent logical tag - * complete message and check against it in lf_latest_tag_complete(). + * complete message and check against it in lf_latest_tag_confirmed(). * * @note Here, the underlying assumption is that the TCP stack will * deliver the Logical TAG Complete message to the RTI eventually @@ -291,7 +291,7 @@ void lf_enqueue_port_absent_reactions(environment_t* env); void* lf_handle_p2p_connections_from_federates(void*); /** - * @brief Send a latest tag complete (LTC) signal to the RTI. + * @brief Send a latest tag confirmed (LTC) signal to the RTI. * * This avoids the send if an equal or later LTC has previously been sent. * @@ -300,7 +300,7 @@ void* lf_handle_p2p_connections_from_federates(void*); * * @param tag_to_send The tag to send. */ -void lf_latest_tag_complete(tag_t); +void lf_latest_tag_confirmed(tag_t); /** * @brief Parse the address of the RTI and store them into the global federation_metadata struct. diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index accd075bc..79ce19550 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -163,7 +163,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * each federate has a valid event at the start tag (start time, 0) and it will * inform the RTI of this event. * Subsequently, at the conclusion of each tag, each federate will send a - * `MSG_TYPE_LATEST_TAG_COMPLETE` followed by a `MSG_TYPE_NEXT_EVENT_TAG` (see + * `MSG_TYPE_LATEST_TAG_CONFIRMED` followed by a `MSG_TYPE_NEXT_EVENT_TAG` (see * the comment for each message for further explanation). Each federate would * have to wait for a `MSG_TYPE_TAG_ADVANCE_GRANT` or a * `MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT` before it can advance to a @@ -434,12 +434,12 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT 8 /** - * Byte identifying a latest tag complete (LTC) message sent by a federate + * Byte identifying a latest tag confirmed (LTC) message sent by a federate * to the RTI. * The next eight bytes will be the timestep of the completed tag. * The next four bytes will be the microsteps of the completed tag. */ -#define MSG_TYPE_LATEST_TAG_COMPLETE 9 +#define MSG_TYPE_LATEST_TAG_CONFIRMED 9 /////////// Messages used in lf_request_stop() /////////////// //// Overview of the algorithm: diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f92..32836b440 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +LTC-optimization