Skip to content

Commit

Permalink
Merge pull request #463 from lf-lang/fix-adaptive-scheduler
Browse files Browse the repository at this point in the history
Fix adaptive scheduler
  • Loading branch information
cmnrd authored Jul 2, 2024
2 parents 4f8ea7d + d208dda commit 38294a3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
3 changes: 1 addition & 2 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ void termination(void) {
}
}
}
lf_tracing_global_shutdown();
// Skip most cleanup on abnormal termination.
if (_lf_normal_termination) {
_lf_free_all_tokens(); // Must be done before freeing reactors.
Expand All @@ -1189,8 +1190,6 @@ void termination(void) {
#endif
lf_free_all_reactors();

lf_tracing_global_shutdown();

// Free up memory associated with environment.
// Do this last so that printed warnings don't access freed memory.
for (int i = 0; i < num_envs; i++) {
Expand Down
21 changes: 13 additions & 8 deletions core/threaded/scheduler_adaptive.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ static size_t cond_of(size_t worker) {
static void set_level(lf_scheduler_t* scheduler, size_t level) {
worker_assignments_t* worker_assignments = scheduler->custom_data->worker_assignments;
assert(level < worker_assignments->num_levels);
assert(0 <= level);
assert(0 <= (long long)level);
data_collection_end_level(scheduler, worker_assignments->current_level, worker_assignments->num_workers);
worker_assignments->current_level = level;
worker_assignments->num_reactions_by_worker = worker_assignments->num_reactions_by_worker_by_level[level];
Expand Down Expand Up @@ -224,7 +224,7 @@ static reaction_t* get_reaction(lf_scheduler_t* scheduler, size_t worker) {
if (old_num_reactions <= 0)
return NULL;
} while ((current_num_reactions = lf_atomic_val_compare_and_swap32(
((int32_t*)worker_assignments->num_reactions_by_worker + worker), old_num_reactions,
(int32_t*)(worker_assignments->num_reactions_by_worker + worker), old_num_reactions,
(index = old_num_reactions - 1))) != old_num_reactions);
return worker_assignments->reactions_by_worker[worker][index];
#endif
Expand All @@ -238,9 +238,9 @@ static reaction_t* get_reaction(lf_scheduler_t* scheduler, size_t worker) {
*/
static reaction_t* worker_assignments_get_or_lock(lf_scheduler_t* scheduler, size_t worker) {
worker_assignments_t* worker_assignments = scheduler->custom_data->worker_assignments;
assert(worker >= 0);
assert((long long)worker >= 0);
// assert(worker < num_workers); // There are edge cases where this doesn't hold.
assert(worker_assignments->num_reactions_by_worker[worker] >= 0);
assert((long long)worker_assignments->num_reactions_by_worker[worker] >= 0);
reaction_t* ret;
while (true) {
if ((ret = get_reaction(scheduler, worker)))
Expand Down Expand Up @@ -425,6 +425,7 @@ static void worker_states_sleep_and_unlock(lf_scheduler_t* scheduler, size_t wor
static void advance_level_and_unlock(lf_scheduler_t* scheduler, size_t worker) {
worker_assignments_t* worker_assignments = scheduler->custom_data->worker_assignments;
size_t max_level = worker_assignments->num_levels - 1;
size_t total_num_reactions;
while (true) {
if (worker_assignments->current_level == max_level) {
data_collection_end_tag(scheduler, worker_assignments->num_workers_by_level,
Expand All @@ -438,12 +439,15 @@ static void advance_level_and_unlock(lf_scheduler_t* scheduler, size_t worker) {
}
} else {
#ifdef FEDERATED
lf_stall_advance_level_federation(scheduler->env, worker_assignments->current_level);
lf_stall_advance_level_federation_locked(worker_assignments->current_level);
#endif
worker_assignments->current_level++;
set_level(scheduler, worker_assignments->current_level);
total_num_reactions = get_num_reactions(scheduler);
if (!total_num_reactions) {
worker_assignments->current_level++;
set_level(scheduler, worker_assignments->current_level);
}
}
size_t total_num_reactions = get_num_reactions(scheduler);
total_num_reactions = get_num_reactions(scheduler);
if (total_num_reactions) {
size_t num_workers_to_awaken = LF_MIN(total_num_reactions, worker_assignments->num_workers);
LF_ASSERT(num_workers_to_awaken > 0, "");
Expand Down Expand Up @@ -598,6 +602,7 @@ static size_t restrict_to_range(size_t start_inclusive, size_t end_inclusive, si
*/
static void compute_number_of_workers(lf_scheduler_t* scheduler, size_t* num_workers_by_level,
size_t* max_num_workers_by_level, bool jitter) {

data_collection_t* data_collection = scheduler->custom_data->data_collection;
for (size_t level = 0; level < data_collection->num_levels; level++) {
interval_t this_execution_time =
Expand Down

0 comments on commit 38294a3

Please sign in to comment.