Skip to content

Commit

Permalink
Added other pipeline test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
vossmjp committed Nov 23, 2024
1 parent 80b4ede commit a418fa2
Showing 1 changed file with 120 additions and 16 deletions.
136 changes: 120 additions & 16 deletions new_examples/performance_tuning/parallel_pipeline_timed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,29 @@ void doWork(double sec) {
while ((tbb::tick_count::now() - t0).seconds() <= sec);
}

tbb::filter<int,int> makeMiddleFilters(int num_filters, tbb::filter_mode m, double usec) {
tbb::filter<int,int> makeMiddleFilters(int num_filters, tbb::filter_mode m, double usec,
int imbalanced_filter = -1, double imbalance_factor = 1) {
double w = (imbalanced_filter == num_filters) ? usec*imbalance_factor : usec;

auto f = tbb::make_filter<int, int>(m,
[=](int i) -> int {
doWork(usec);
doWork(w);
return i;
});
if (num_filters > 1)
return f & makeMiddleFilters(num_filters-1, m, usec);
return f & makeMiddleFilters(num_filters-1, m, usec,
imbalanced_filter, imbalance_factor);
else
return f;
}

tbb::filter<void,void> buildFilterChain(std::atomic<int>& counter, std::vector<int>& data,
int num_filters, tbb::filter_mode m, double usec) {
int num_filters, tbb::filter_mode m, double usec,
double imbalance_factor = 1) {
counter = data.size() - 1;

tbb::filter<int, int> middle{};
tbb::filter<int,void> end{};
int imbalanced_filter = (imbalance_factor == 1) ? -1 : num_filters - 3;

if (num_filters > 1) {
tbb::filter<void,int> start =
Expand All @@ -65,7 +70,7 @@ tbb::filter<void,void> buildFilterChain(std::atomic<int>& counter, std::vector<i
});

if (num_filters > 2) {
tbb::filter<int, int> middle = makeMiddleFilters(num_filters-2, m, usec);
tbb::filter<int, int> middle = makeMiddleFilters(num_filters-2, m, usec, imbalanced_filter, imbalance_factor);
return start & middle & end;
} else {
return start & end;
Expand All @@ -83,11 +88,16 @@ tbb::filter<void,void> buildFilterChain(std::atomic<int>& counter, std::vector<i
}
}

double runSerial(std::vector<int>& data, int num_filters, double usec, int num_items) {
double runSerial(std::vector<int>& data, int num_filters, double usec, int num_items,
int imbalanced_filter = -1, double imbalance_factor = 1) {
tbb::tick_count t0 = tbb::tick_count::now();
for (int i = 0; i < num_items; ++i) {
for (int j = 0; j < num_filters; ++j) {
doWork(usec);
if (j == imbalanced_filter) {
doWork(usec*imbalance_factor);
} else {
doWork(usec);
}
}
}
double total_time = (tbb::tick_count::now() - t0).seconds();
Expand All @@ -104,18 +114,18 @@ std::vector<std::string> mode_name = {"serial_in_order",
"serial_out_of_order",
"parallel"};

std::vector<double> spin_times = {1e-7, 1e-6, 1e-5, 1e-4};


void runEightFiltersOneTokenBalanced(int N, std::vector<int>& data) {
std::vector<double> spin_times = {1e-7, 1e-6, 1e-5, 1e-4};
int num_filters = 8;

std::cout << "Varying the mode and spin_time for 8 filters:\n";
std::cout << "\nVarying the mode and spin_time for 8 filters:\n";

// Balanced serial time
std::vector<double> serial_time;
for (double spin_time : spin_times) {
serial_time.push_back(runSerial(data, num_filters, spin_time, N));
std::cout << serial_time.back() << ",";
}

std::cout << "\nmode";
Expand All @@ -139,18 +149,109 @@ void runEightFiltersOneTokenBalanced(int N, std::vector<int>& data) {
std::cout << ", " << serial_time[st]/t;
}
std::cout << std::endl;
}
}

void runEightTokensIncreasingFilters(int N, std::vector<int>& data) {
int max_threads = tbb::this_task_arena::max_concurrency();
double spin_time = 0.0001;

std::cout << "\nVarying number of filters with 100us spin and 8 tokens:\n";

// Balanced serial time
std::vector<double> serial_time;
for (int num_filters = 1; num_filters <= max_threads; ++num_filters) {
serial_time.push_back(runSerial(data, num_filters, spin_time, N));
}
}

void runEightTokensIncreasingFiltersBalanced() {
std::cout << "\nmode";
for (int num_filters = 1; num_filters <= max_threads; ++num_filters)
std::cout << ", " << num_filters;
std::cout << std::endl;

for (int m = 0; m < modes.size(); ++m) {
std::atomic<int> counter = 0;
std::cout << mode_name[m];
for (int num_filters = 1; num_filters <= max_threads; ++num_filters) {
auto chain =
buildFilterChain(counter, data, num_filters, modes[m], spin_time);

tbb::tick_count t0 = tbb::tick_count::now();
warmupTBB();
tbb::parallel_pipeline(max_threads, chain);
double t = (tbb::tick_count::now() - t0).seconds();
std::cout << ", " << serial_time[num_filters-1]/t;
}
std::cout << std::endl;
}
}

void runEightFiltersIncreasingTokensBalanced() {
void runEightFiltersIncreasingTokens(int N, std::vector<int>& data) {
int max_threads = tbb::this_task_arena::max_concurrency();
int num_filters = 8;
double spin_time = 0.0001;

std::cout << "\nVarying number of tokens with 8 filters and 100us spin:\n";

double serial_time = runSerial(data, num_filters, spin_time, N);

std::cout << "\nmode";
for (int num_tokens = 1; num_tokens <= 2*max_threads; ++num_tokens)
std::cout << ", " << num_tokens;
std::cout << std::endl;

for (int m = 0; m < modes.size(); ++m) {
std::atomic<int> counter = 0;
std::cout << mode_name[m];
for (int num_tokens = 1; num_tokens <= 2*max_threads; ++num_tokens) {
auto chain =
buildFilterChain(counter, data, num_filters, modes[m], spin_time);

tbb::tick_count t0 = tbb::tick_count::now();
warmupTBB();
tbb::parallel_pipeline(num_tokens, chain);
double t = (tbb::tick_count::now() - t0).seconds();
std::cout << ", " << serial_time/t;
}
std::cout << std::endl;
}
}

void runEightFiltersImbalanced() {
void runEightFiltersImbalanced(int N, std::vector<int>& data) {
std::vector<double> imbalance = {0.1, 0.5, 0.75, 1.5, 2, 10};
int num_filters = 8;
int num_tokens = 8;
double spin_time = 0.0001;

std::cout << "\nVarying imbalance of 1 of 8 filters:\n";

// Imbalanced serial time
std::vector<double> serial_time;
for (double imb : imbalance) {
serial_time.push_back(runSerial(data, num_filters, spin_time, N, imb));
}

std::cout << "\nmode";
for (double imb : imbalance)
std::cout << ", " << imb;
std::cout << std::endl;

for (int m = 0; m < modes.size(); ++m) {
std::atomic<int> counter = 0;
std::cout << mode_name[m];
for (int imb = 0; imb < imbalance.size(); ++imb) {
double imbalance_factor = imbalance[imb];
auto chain =
buildFilterChain(counter, data, num_filters, modes[m], spin_time, imbalance_factor);

tbb::tick_count t0 = tbb::tick_count::now();
warmupTBB();
tbb::parallel_pipeline(num_tokens, chain);
double t = (tbb::tick_count::now() - t0).seconds();
std::cout << ", " << serial_time[imb]/t;
}
std::cout << std::endl;
}
}

int main() {
Expand All @@ -169,7 +270,10 @@ int main() {
a.execute([&]() {
int N = 8000;
std::vector<int> data = makeVector(N);
runEightFiltersOneTokenBalanced(N, data);
//runEightFiltersOneTokenBalanced(N, data);
//runEightTokensIncreasingFilters(N, data);
//runEightFiltersIncreasingTokens(N, data);
runEightFiltersImbalanced(N, data);
});

return 0;
Expand Down

0 comments on commit a418fa2

Please sign in to comment.