Skip to content

Commit

Permalink
Merge branch 'For_Each_Td_Private_Scope' of https://github.com/horsha…
Browse files Browse the repository at this point in the history
…ck-dpreview/fio

* 'For_Each_Td_Private_Scope' of https://github.com/horshack-dpreview/fio:
  Refactor for_each_td() to catch inappropriate td ptr reuse

Signed-off-by: Jens Axboe <[email protected]>
  • Loading branch information
axboe committed Mar 3, 2023
2 parents edf6624 + da8f124 commit 051b578
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 151 deletions.
44 changes: 18 additions & 26 deletions backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,16 @@ static void sig_int(int sig)
#ifdef WIN32
static void sig_break(int sig)
{
struct thread_data *td;
int i;

sig_int(sig);

/**
* Windows terminates all job processes on SIGBREAK after the handler
* returns, so give them time to wrap-up and give stats
*/
for_each_td(td, i) {
for_each_td(td) {
while (td->runstate < TD_EXITED)
sleep(1);
}
} end_for_each();
}
#endif

Expand Down Expand Up @@ -2056,15 +2053,14 @@ static void *thread_main(void *data)
static void reap_threads(unsigned int *nr_running, uint64_t *t_rate,
uint64_t *m_rate)
{
struct thread_data *td;
unsigned int cputhreads, realthreads, pending;
int i, status, ret;
int status, ret;

/*
* reap exited threads (TD_EXITED -> TD_REAPED)
*/
realthreads = pending = cputhreads = 0;
for_each_td(td, i) {
for_each_td(td) {
int flags = 0;

if (!strcmp(td->o.ioengine, "cpuio"))
Expand Down Expand Up @@ -2157,7 +2153,7 @@ static void reap_threads(unsigned int *nr_running, uint64_t *t_rate,
done_secs += mtime_since_now(&td->epoch) / 1000;
profile_td_exit(td);
flow_exit_job(td);
}
} end_for_each();

if (*nr_running == cputhreads && !pending && realthreads)
fio_terminate_threads(TERMINATE_ALL, TERMINATE_ALL);
Expand Down Expand Up @@ -2284,13 +2280,11 @@ static bool waitee_running(struct thread_data *me)
{
const char *waitee = me->o.wait_for;
const char *self = me->o.name;
struct thread_data *td;
int i;

if (!waitee)
return false;

for_each_td(td, i) {
for_each_td(td) {
if (!strcmp(td->o.name, self) || strcmp(td->o.name, waitee))
continue;

Expand All @@ -2300,7 +2294,7 @@ static bool waitee_running(struct thread_data *me)
runstate_to_name(td->runstate));
return true;
}
}
} end_for_each();

dprint(FD_PROCESS, "%s: %s completed, can run\n", self, waitee);
return false;
Expand All @@ -2324,14 +2318,14 @@ static void run_threads(struct sk_out *sk_out)
set_sig_handlers();

nr_thread = nr_process = 0;
for_each_td(td, i) {
for_each_td(td) {
if (check_mount_writes(td))
return;
if (td->o.use_thread)
nr_thread++;
else
nr_process++;
}
} end_for_each();

if (output_format & FIO_OUTPUT_NORMAL) {
struct buf_output out;
Expand All @@ -2357,7 +2351,7 @@ static void run_threads(struct sk_out *sk_out)
nr_started = 0;
m_rate = t_rate = 0;

for_each_td(td, i) {
for_each_td(td) {
print_status_init(td->thread_number - 1);

if (!td->o.create_serialize)
Expand Down Expand Up @@ -2393,7 +2387,7 @@ static void run_threads(struct sk_out *sk_out)
td_io_close_file(td, f);
}
}
}
} end_for_each();

/* start idle threads before io threads start to run */
fio_idle_prof_start();
Expand All @@ -2409,7 +2403,7 @@ static void run_threads(struct sk_out *sk_out)
/*
* create threads (TD_NOT_CREATED -> TD_CREATED)
*/
for_each_td(td, i) {
for_each_td(td) {
if (td->runstate != TD_NOT_CREATED)
continue;

Expand Down Expand Up @@ -2488,7 +2482,7 @@ static void run_threads(struct sk_out *sk_out)

ret = (int)(uintptr_t)thread_main(fd);
_exit(ret);
} else if (i == fio_debug_jobno)
} else if (__td_index == fio_debug_jobno)
*fio_debug_jobp = pid;
free(eo);
free(fd);
Expand All @@ -2504,7 +2498,7 @@ static void run_threads(struct sk_out *sk_out)
break;
}
dprint(FD_MUTEX, "done waiting on startup_sem\n");
}
} end_for_each();

/*
* Wait for the started threads to transition to
Expand Down Expand Up @@ -2549,7 +2543,7 @@ static void run_threads(struct sk_out *sk_out)
/*
* start created threads (TD_INITIALIZED -> TD_RUNNING).
*/
for_each_td(td, i) {
for_each_td(td) {
if (td->runstate != TD_INITIALIZED)
continue;

Expand All @@ -2563,7 +2557,7 @@ static void run_threads(struct sk_out *sk_out)
t_rate += ddir_rw_sum(td->o.rate);
todo--;
fio_sem_up(td->sem);
}
} end_for_each();

reap_threads(&nr_running, &t_rate, &m_rate);

Expand All @@ -2589,9 +2583,7 @@ static void free_disk_util(void)

int fio_backend(struct sk_out *sk_out)
{
struct thread_data *td;
int i;

if (exec_profile) {
if (load_profile(exec_profile))
return 1;
Expand Down Expand Up @@ -2647,7 +2639,7 @@ int fio_backend(struct sk_out *sk_out)
}
}

for_each_td(td, i) {
for_each_td(td) {
struct thread_stat *ts = &td->ts;

free_clat_prio_stats(ts);
Expand All @@ -2660,7 +2652,7 @@ int fio_backend(struct sk_out *sk_out)
}
fio_sem_remove(td->sem);
td->sem = NULL;
}
} end_for_each();

free_disk_util();
if (cgroup_list) {
Expand Down
7 changes: 2 additions & 5 deletions dedupe.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
*/
int init_global_dedupe_working_set_seeds(void)
{
int i;
struct thread_data *td;

for_each_td(td, i) {
for_each_td(td) {
if (!td->o.dedupe_global)
continue;

if (init_dedupe_working_set_seeds(td, 1))
return 1;
}
} end_for_each();

return 0;
}
Expand Down
6 changes: 2 additions & 4 deletions engines/libblkio.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,14 @@ static bool possibly_null_strs_equal(const char *a, const char *b)
*/
static int total_threaded_subjobs(bool hipri)
{
struct thread_data *td;
unsigned int i;
int count = 0;

for_each_td(td, i) {
for_each_td(td) {
const struct fio_blkio_options *options = td->eo;
if (strcmp(td->o.ioengine, "libblkio") == 0 &&
td->o.use_thread && (bool)options->hipri == hipri)
++count;
}
} end_for_each();

return count;
}
Expand Down
33 changes: 16 additions & 17 deletions eta.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ bool eta_time_within_slack(unsigned int time)
*/
bool calc_thread_status(struct jobs_eta *je, int force)
{
struct thread_data *td;
int i, unified_rw_rep;
int unified_rw_rep;
bool any_td_in_ramp;
uint64_t rate_time, disp_time, bw_avg_time, *eta_secs;
unsigned long long io_bytes[DDIR_RWDIR_CNT] = {};
Expand Down Expand Up @@ -417,7 +416,7 @@ bool calc_thread_status(struct jobs_eta *je, int force)

bw_avg_time = ULONG_MAX;
unified_rw_rep = 0;
for_each_td(td, i) {
for_each_td(td) {
unified_rw_rep += td->o.unified_rw_rep;
if (is_power_of_2(td->o.kb_base))
je->is_pow2 = 1;
Expand Down Expand Up @@ -459,9 +458,9 @@ bool calc_thread_status(struct jobs_eta *je, int force)
je->nr_pending++;

if (je->elapsed_sec >= 3)
eta_secs[i] = thread_eta(td);
eta_secs[__td_index] = thread_eta(td);
else
eta_secs[i] = INT_MAX;
eta_secs[__td_index] = INT_MAX;

check_str_update(td);

Expand All @@ -478,26 +477,26 @@ bool calc_thread_status(struct jobs_eta *je, int force)
}
}
}
}
} end_for_each();

if (exitall_on_terminate) {
je->eta_sec = INT_MAX;
for_each_td(td, i) {
if (eta_secs[i] < je->eta_sec)
je->eta_sec = eta_secs[i];
}
for_each_td_index() {
if (eta_secs[__td_index] < je->eta_sec)
je->eta_sec = eta_secs[__td_index];
} end_for_each();
} else {
unsigned long eta_stone = 0;

je->eta_sec = 0;
for_each_td(td, i) {
for_each_td(td) {
if ((td->runstate == TD_NOT_CREATED) && td->o.stonewall)
eta_stone += eta_secs[i];
eta_stone += eta_secs[__td_index];
else {
if (eta_secs[i] > je->eta_sec)
je->eta_sec = eta_secs[i];
if (eta_secs[__td_index] > je->eta_sec)
je->eta_sec = eta_secs[__td_index];
}
}
} end_for_each();
je->eta_sec += eta_stone;
}

Expand All @@ -507,9 +506,9 @@ bool calc_thread_status(struct jobs_eta *je, int force)
rate_time = mtime_since(&rate_prev_time, &now);

any_td_in_ramp = false;
for_each_td(td, i) {
for_each_td(td) {
any_td_in_ramp |= in_ramp_time(td);
}
} end_for_each();
if (write_bw_log && rate_time > bw_avg_time && !any_td_in_ramp) {
calc_rate(unified_rw_rep, rate_time, io_bytes, rate_io_bytes,
je->rate);
Expand Down
19 changes: 17 additions & 2 deletions fio.h
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,24 @@ extern void lat_target_reset(struct thread_data *);

/*
* Iterates all threads/processes within all the defined jobs
* Usage:
* for_each_td(var_name_for_td) {
* << bodoy of your loop >>
* Note: internally-scoped loop index availble as __td_index
* } end_for_each_td()
*/
#define for_each_td(td, i) \
for ((i) = 0, (td) = &segments[0].threads[0]; (i) < (int) thread_number; (i)++, (td) = tnumber_to_td((i)))
#define for_each_td(td) \
{ \
int __td_index; \
struct thread_data *(td); \
for (__td_index = 0, (td) = &segments[0].threads[0];\
__td_index < (int) thread_number; __td_index++, (td) = tnumber_to_td(__td_index))
#define for_each_td_index() \
{ \
int __td_index; \
for (__td_index = 0; __td_index < (int) thread_number; __td_index++)
#define end_for_each() }

#define for_each_file(td, f, i) \
if ((td)->files_index) \
for ((i) = 0, (f) = (td)->files[0]; \
Expand Down
14 changes: 5 additions & 9 deletions init.c
Original file line number Diff line number Diff line change
Expand Up @@ -1405,15 +1405,14 @@ static void gen_log_name(char *name, size_t size, const char *logtype,

static int check_waitees(char *waitee)
{
struct thread_data *td;
int i, ret = 0;
int ret = 0;

for_each_td(td, i) {
for_each_td(td) {
if (td->subjob_number)
continue;

ret += !strcmp(td->o.name, waitee);
}
} end_for_each();

return ret;
}
Expand Down Expand Up @@ -1448,10 +1447,7 @@ static bool wait_for_ok(const char *jobname, struct thread_options *o)

static int verify_per_group_options(struct thread_data *td, const char *jobname)
{
struct thread_data *td2;
int i;

for_each_td(td2, i) {
for_each_td(td2) {
if (td->groupid != td2->groupid)
continue;

Expand All @@ -1461,7 +1457,7 @@ static int verify_per_group_options(struct thread_data *td, const char *jobname)
jobname);
return 1;
}
}
} end_for_each();

return 0;
}
Expand Down
6 changes: 2 additions & 4 deletions iolog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1875,9 +1875,7 @@ void td_writeout_logs(struct thread_data *td, bool unit_logs)

void fio_writeout_logs(bool unit_logs)
{
struct thread_data *td;
int i;

for_each_td(td, i)
for_each_td(td) {
td_writeout_logs(td, unit_logs);
} end_for_each();
}
Loading

0 comments on commit 051b578

Please sign in to comment.