Skip to content

Commit

Permalink
Merge pull request #311 from lf-lang/enclaves3
Browse files Browse the repository at this point in the history
Detect ZDC in RTI and issue PTAG only for nodes in ZDC
  • Loading branch information
edwardalee authored Dec 2, 2023
2 parents 16af56a + caeae15 commit 2fb2878
Show file tree
Hide file tree
Showing 20 changed files with 374 additions and 287 deletions.
3 changes: 2 additions & 1 deletion core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ int process_args(int argc, const char* argv[]) {
return 0;
}
rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines
lf_print("RTI: Number of federates: %d\n", rti.base.number_of_scheduling_nodes);
lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes);
} else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) {
if (argc < i + 2) {
lf_print_error(
Expand Down Expand Up @@ -284,6 +284,7 @@ int main(int argc, const char* argv[]) {

int socket_descriptor = start_rti_server(rti.user_specified_port);
wait_for_federates(socket_descriptor);
free_scheduling_nodes(rti.base.scheduling_nodes, rti.base.number_of_scheduling_nodes);
lf_print("RTI is exiting.");
return 0;
}
Expand Down
278 changes: 169 additions & 109 deletions core/federated/RTI/rti_common.c

Large diffs are not rendered by default.

78 changes: 53 additions & 25 deletions core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ typedef enum scheduling_node_state_t {
PENDING // Waiting for upstream scheduling nodes.
} scheduling_node_state_t;

/** Struct for minimum delays from upstream nodes. */
typedef struct minimum_delay_t {
int id; // ID of the upstream node.
tag_t min_delay; // Minimum delay from upstream.
} minimum_delay_t;

/**
* Information about the scheduling nodes coordinated by the RTI.
* The abstract scheduling node could either be an enclave or a federate.
Expand All @@ -59,6 +65,9 @@ typedef struct scheduling_node_t {
int* downstream; // Array of downstream scheduling node ids.
int num_downstream; // Size of the array of downstream scheduling nodes.
execution_mode_t mode; // FAST or REALTIME.
minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node.
size_t num_min_delays; // Size of min_delays array.
int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE
} scheduling_node_t;

/**
Expand Down Expand Up @@ -162,7 +171,7 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag);
void notify_advance_grant_if_safe(scheduling_node_t* e);

/**
* Nontify a provisional tag advance grant (PTAG) message to the specified scheduling node.
* Notify a provisional tag advance grant (PTAG) message to the specified scheduling node.
* Do not notify it if a previously sent PTAG or TAG was greater or equal.
*
* This function will keep a record of this PTAG in the node's last_provisionally_granted
Expand Down Expand Up @@ -207,7 +216,6 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag);
*/
tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e);


/**
* @brief Update the next event tag of an scheduling node.
*
Expand All @@ -221,29 +229,49 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e);
void update_scheduling_node_next_event_tag_locked(scheduling_node_t* e, tag_t next_event_tag);

/**
* Find the earliest tag at which the specified federate may
* experience its next event. This is the least next event tag (NET)
* of the specified federate and (transitively) upstream federates
* (with delays of the connections added). For upstream federates,
* we assume (conservatively) that federate upstream of those
* may also send an event. The result will never be less than
* the completion time of the federate (which may be NEVER,
* if the federate has not yet completed a logical time).
*
* FIXME: This could be made less conservative by building
* at code generation time a causality interface table indicating
* which outputs can be triggered by which inputs. For now, we
* assume any output can be triggered by any input.
*
* @param e The scheduling node.
* @param candidate A candidate tag (for the first invocation,
* this should be fed->next_event).
* @param visited An array of booleans indicating which federates
* have been visited (for the first invocation, this should be
* an array of falses of size _RTI.number_of_federates).
* @return The earliest next event tag of the scheduling node e.
*/
tag_t transitive_next_event(scheduling_node_t *e, tag_t candidate, bool visited[]);
* Given a node (enclave or federate), find the tag of the earliest possible incoming
* message from upstream enclaves or federates, which will be the smallest upstream NET
* plus the least delay. This could be NEVER_TAG if the RTI has not seen a NET from some
* upstream node.
* @param e The target node.
* @return The earliest possible incoming message tag.
*/
tag_t earliest_future_incoming_message_tag(scheduling_node_t* e);

/**
* Return true if the node is in a zero-delay cycle.
* @param node The node.
*/
bool is_in_zero_delay_cycle(scheduling_node_t* node);

/**
* Return true if the node is in a cycle (possibly a zero-delay cycle).
* @param node The node.
*/
bool is_in_cycle(scheduling_node_t* node);

/**
* For the given scheduling node (enclave or federate), if necessary, update the `min_delays`,
* `num_min_delays`, and the fields that indicate cycles. These fields will be
* updated only if they have not been previously updated or if invalidate_min_delays_upstream
* has been called since they were last updated.
* @param node The node.
*/
void update_min_delays_upstream(scheduling_node_t* node);

/**
* For the given scheduling node (enclave or federate), invalidate the `min_delays`,
* `num_min_delays`, and the fields that indicate cycles.
* This should be called whenever the structure of the connections upstream of the
* given node have changed.
* @param node The node.
*/
void invalidate_min_delays_upstream(scheduling_node_t* node);

/**
* Free dynamically allocated memory on the scheduling nodes and the scheduling node array itself.
*/
void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes);

#endif // RTI_COMMON_H
#endif // STANDALONE_RTI || LF_ENCLAVES
12 changes: 11 additions & 1 deletion core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ static rti_local_t * rti_local;
lf_mutex_t rti_mutex;

void initialize_local_rti(environment_t *envs, int num_envs) {
rti_local = malloc(sizeof(rti_local_t));
rti_local = (rti_local_t*)malloc(sizeof(rti_local_t));
LF_ASSERT(rti_local, "Out of memory");

initialize_rti_common(&rti_local->base);
Expand All @@ -60,6 +60,11 @@ void initialize_local_rti(environment_t *envs, int num_envs) {
}
}

void free_local_rti() {
free_scheduling_nodes(rti_local->base.scheduling_nodes, rti_local->base.number_of_scheduling_nodes);
free(rti_local);
}

void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t * env) {
initialize_scheduling_node(&enclave->base, idx);

Expand Down Expand Up @@ -186,4 +191,9 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
LF_PRINT_LOG("RTI: enclave %u callback with PTAG " PRINTF_TAG " ",
e->id, tag.time - lf_time_start(), tag.microstep);
}

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
// Nothing to do here.
}

#endif //LF_ENCLAVES
9 changes: 7 additions & 2 deletions core/federated/RTI/rti_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ typedef struct {

/**
* @brief Dynamically create and initialize the local RTI.
*
*/
void initialize_local_rti(environment_t* envs, int num_envs);

/**
* @brief Free memory associated with the local the RTI and the local RTI iself.
*/
void free_local_rti();

/**
* @brief Initialize the enclave object.
*
Expand All @@ -41,7 +45,8 @@ void initialize_local_rti(environment_t* envs, int num_envs);
void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t *env);

/**
* @brief This function call may block. A call to this function serves two purposes.
* @brief Notify the local RTI of a next event tag (NET).
* This function call may block. A call to this function serves two purposes.
* 1) It is a promise that, unless receiving events from other enclaves, this
* enclave will not produce any event until the next_event_tag (NET) argument.
* 2) It is a request for permission to advance the logical tag of the enclave
Expand Down
64 changes: 39 additions & 25 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,30 +250,25 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
// Send PTAG to all upstream federates, if they have not had
// a later or equal PTAG or TAG sent previously and if their transitive
// NET is greater than or equal to the tag.
// This is needed to stimulate absent messages from upstream and break deadlocks.
// NOTE: This could later be replaced with a TNET mechanism once
// we have an available encoding of causality interfaces.
// That might be more efficient.
// NOTE: This is not needed for enclaves because zero-delay loops are prohibited.
// It's only needed for federates, which is why this is implemented here.
for (int j = 0; j < e->num_upstream; j++) {
federate_info_t* upstream = GET_FED_INFO(e->upstream[j]);
scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->upstream[j]];

// Ignore this federate if it has resigned.
if (upstream->enclave.state == NOT_CONNECTED) continue;
// To handle cycles, need to create a boolean array to keep
// track of which upstream federates have been visited.
bool* visited = (bool*)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); // Initializes to 0.

// Find the (transitive) next event tag upstream.
tag_t upstream_next_event = transitive_next_event(
&(upstream->enclave), upstream->enclave.next_event, visited);
free(visited);
// If these tags are equal, then
// a TAG or PTAG should have already been granted,
// in which case, another will not be sent. But it
// may not have been already granted.
if (lf_tag_compare(upstream_next_event, tag) >= 0) {
notify_provisional_tag_advance_grant(&(upstream->enclave), tag);
}
if (upstream->state == NOT_CONNECTED) continue;

tag_t earliest = earliest_future_incoming_message_tag(upstream);

// If these tags are equal, then a TAG or PTAG should have already been granted,
// in which case, another will not be sent. But it may not have been already granted.
if (lf_tag_compare(earliest, tag) >= 0) {
notify_provisional_tag_advance_grant(upstream, tag);
}
}
}
}
Expand Down Expand Up @@ -1265,14 +1260,22 @@ int receive_connection_information(int socket_id, uint16_t fed_id) {
fed_id);

// Allocate memory for the upstream and downstream pointers
fed->enclave.upstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream);
fed->enclave.downstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream);

// Allocate memory for the upstream delay pointers
fed->enclave.upstream_delay =
(interval_t*)malloc(
sizeof(interval_t) * fed->enclave.num_upstream
);
if (fed->enclave.num_upstream > 0) {
fed->enclave.upstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream);
// Allocate memory for the upstream delay pointers
fed->enclave.upstream_delay =
(interval_t*)malloc(
sizeof(interval_t) * fed->enclave.num_upstream
);
} else {
fed->enclave.upstream = (int*)NULL;
fed->enclave.upstream_delay = (interval_t*)NULL;
}
if (fed->enclave.num_downstream > 0) {
fed->enclave.downstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream);
} else {
fed->enclave.downstream = (int*)NULL;
}

size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) *
fed->enclave.num_upstream) + (sizeof(uint16_t) * fed->enclave.num_downstream);
Expand Down Expand Up @@ -1646,4 +1649,15 @@ void initialize_RTI(rti_remote_t *rti){
rti_remote->base.tracing_enabled = false;
rti_remote->stop_in_progress = false;
}

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) {
// FIXME: Gives error freeing memory not allocated!!!!
scheduling_node_t* node = scheduling_nodes[i];
if (node->upstream != NULL) free(node->upstream);
if (node->downstream != NULL) free(node->downstream);
}
free(scheduling_nodes);
}

#endif // STANDALONE_RTI
4 changes: 2 additions & 2 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2067,8 +2067,8 @@ void spawn_staa_thread(){
* If current_time is less than the specified PTAG, then this will
* also insert into the event_q a dummy event with the specified tag.
* This will ensure that the federate advances time to the specified
* tag and, for centralized coordination, inserts blocking reactions
* and null-message-sending output reactions at that tag.
* tag and, for centralized coordination, stimulates null-message-sending
* output reactions at that tag.
*
* @note This function is similar to handle_tag_advance_grant() except that
* it sets last_TAG_was_provisional to true and also it does not update the
Expand Down
5 changes: 5 additions & 0 deletions core/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
initialize_global();
// Set start time
start_time = lf_time_physical();

LF_PRINT_DEBUG("NOTE: FOREVER is displayed as " PRINTF_TAG " and NEVER as " PRINTF_TAG,
FOREVER_TAG.time - start_time, FOREVER_TAG.microstep,
NEVER_TAG.time - start_time, 0);

environment_init_tags(env, start_time, duration);
// Start tracing if enalbed
start_trace(env->trace);
Expand Down
2 changes: 1 addition & 1 deletion core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@ void termination(void) {
continue;
}
// Stop any tracing, if it is running.
stop_trace(env->trace);
stop_trace_locked(env->trace);

_lf_start_time_step(env);

Expand Down
23 changes: 13 additions & 10 deletions core/tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,22 @@ tag_t lf_tag(void *env) {
return ((environment_t *)env)->current_tag;
}

tag_t lf_tag_add(tag_t a, tag_t b) {
if (a.time == NEVER || b.time == NEVER) return NEVER_TAG;
if (a.time == FOREVER || b.time == FOREVER) return FOREVER_TAG;
tag_t result = {.time = a.time + b.time, .microstep = a.microstep + b.microstep};
if (result.microstep < a.microstep) return FOREVER_TAG;
if (result.time < a.time && b.time > 0) return FOREVER_TAG;
if (result.time > a.time && b.time < 0) return NEVER_TAG;
return result;
}

int lf_tag_compare(tag_t tag1, tag_t tag2) {
if (tag1.time < tag2.time) {
LF_PRINT_DEBUG(PRINTF_TIME " < " PRINTF_TIME, tag1.time, tag2.time);
return -1;
} else if (tag1.time > tag2.time) {
return 1;
} else if (tag1.microstep < tag2.microstep) {
LF_PRINT_DEBUG(PRINTF_TIME " and microstep < " PRINTF_TIME, tag1.time, tag2.time);
return -1;
} else if (tag1.microstep > tag2.microstep) {
return 1;
Expand All @@ -134,20 +142,16 @@ int lf_tag_compare(tag_t tag1, tag_t tag2) {

tag_t lf_delay_tag(tag_t tag, interval_t interval) {
if (tag.time == NEVER || interval < 0LL) return tag;
if (tag.time >= FOREVER - interval) return tag;
// Note that overflow in C is undefined for signed variables.
if (tag.time >= FOREVER - interval) return FOREVER_TAG; // Overflow.
tag_t result = tag;
if (interval == 0LL) {
// Note that unsigned variables will wrap on overflow.
// This is probably the only reasonable thing to do with overflowing
// microsteps.
result.microstep++;
} else {
// Note that overflow in C is undefined for signed variables.
if (FOREVER - interval < result.time) {
result.time = FOREVER;
} else {
result.time += interval;
}
result.time += interval;
result.microstep = 0;
}
return result;
Expand All @@ -156,7 +160,6 @@ tag_t lf_delay_tag(tag_t tag, interval_t interval) {
tag_t lf_delay_strict(tag_t tag, interval_t interval) {
tag_t result = lf_delay_tag(tag, interval);
if (interval != 0 && interval != NEVER && interval != FOREVER && result.time != NEVER && result.time != FOREVER) {
LF_PRINT_DEBUG("interval=%lld, result time=%lld", (long long) interval, (long long) result.time);
result.time -= 1;
result.microstep = UINT_MAX;
}
Expand Down
6 changes: 5 additions & 1 deletion core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,9 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level)
#ifdef FEDERATED
stall_advance_level_federation(env, *next_reaction_level);
#endif
*next_reaction_level += 1;
if (*next_reaction_level < SIZE_MAX) *next_reaction_level += 1;
}

/**
* The main looping logic of each LF worker thread.
* This function assumes the caller holds the mutex lock.
Expand Down Expand Up @@ -1216,6 +1217,9 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
LF_PRINT_LOG("---- All worker threads exited successfully.");
}
}
#if defined LF_ENCLAVES
free_local_rti();
#endif
return 0;
}

Expand Down
Loading

0 comments on commit 2fb2878

Please sign in to comment.