Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast PA teardown #674

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/c++/perf_analyzer/client_backend/mock_client_backend.h
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first run is 1m54s:

faster_teardown_without_changes

The second run is 24s.

Is the additional time for the first run mainly after the last pass? Or is it distributed among the passes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was 100% after the final results were printed. Old behavior is that it just sat there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, can you suggest an order of files to review along with some line-specific PR comments explaining at a high level what the changes are for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One high-level thing that crossed my mind is that: do we want to just drop requests like this PR implements?

Maybe we can instead add a print statement explaining that profiling is complete, but that PA is waiting for unfinished requests and that user can ^C early if they don't mind that the server is still potentially processing said requests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Longer term what we need is a proper way to cancel requests. Short term, this is holding up some GenAI-Perf scripting, where they are running with different request-rates in a bash script. ^C isn't going to help them.

Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,14 @@ class NaggyMockClientBackend : public ClientBackend {
});
}

~NaggyMockClientBackend()
{
// Make sure no requests carry over to the next test
while (stats_->num_active_infer_calls) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want a configurable timeout? Otherwise, it's going to be hard to know why the test times out for someone seeing it fail for that reason.

}
}

MOCK_METHOD(
Error, ModelConfig,
(rapidjson::Document*, const std::string&, const std::string&),
Expand Down
10 changes: 5 additions & 5 deletions src/c++/perf_analyzer/concurrency_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ ConcurrencyWorker::HandleExecuteOff()
// Wait if no request should be sent and it is not exiting
thread_config_->is_paused_ = true;
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.wait(lock, [this]() { return early_exit || execute_; });
wake_signal_.wait(lock, [this]() { return exiting_ || execute_; });

// TODO REFACTOR TMA-1043 - memory manager should be handling this instead
// of here
Expand All @@ -131,10 +131,10 @@ ConcurrencyWorker::HandleNoConcurrency()
// Wait if no request should be sent and it is not exiting
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.wait(lock, [this]() {
return early_exit || (thread_config_->concurrency_ > 0);
return exiting_ || (thread_config_->concurrency_ > 0);
});
// Stop executing if concurrency is 0 and early exit is requested
if (early_exit && thread_config_->concurrency_ == 0) {
// Stop executing if concurrency is 0 and we are exiting
if (exiting_ && thread_config_->concurrency_ == 0) {
return true;
}
}
Expand Down Expand Up @@ -181,7 +181,7 @@ ConcurrencyWorker::WaitForResponses()
std::unique_lock<std::mutex> lk(cb_mtx_);
thread_stat_->idle_timer.Start();
cb_cv_.wait(lk, [this] {
if (notified_) {
if (notified_ || (exiting_ && fast_exit_)) {
notified_ = false;
return true;
}
Expand Down
9 changes: 8 additions & 1 deletion src/c++/perf_analyzer/infer_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ InferContext::SendSequenceInferRequest(uint32_t seq_stat_index, bool delayed)
// This also helps in reporting the realistic latencies.
std::lock_guard<std::mutex> guard(
sequence_manager_->GetMutex(seq_stat_index));
if (!early_exit && execute_) {
if (!exiting_ && execute_) {
sequence_manager_->SetInferSequenceOptions(
seq_stat_index, infer_data_.options_);

Expand Down Expand Up @@ -298,6 +298,13 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
// Add the request record to thread request records vector with
// proper locking
std::lock_guard<std::mutex> lock(thread_stat_->mu_);

// If we are fast exiting, do not handle the request and instead exit
// immediately
if (exiting_ && fast_exit_) {
return;
}

thread_stat_->cb_status_ = result_ptr->RequestStatus();
if (thread_stat_->cb_status_.IsOk()) {
std::string request_id;
Expand Down
12 changes: 12 additions & 0 deletions src/c++/perf_analyzer/infer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ class InferContext {
// Initialize the context. Must be done before any inferences are sent
void Init();

// Signal to the context to stop working and exit
// If fast exit is true, everything should be immediately dropped
// If fast exit is false, the context should still handle outstanding requests
// before exiting
void Exit(bool fast_exit)
{
exiting_ = true;
fast_exit_ = fast_exit;
}

// Send a single inference request to the server
void SendInferRequest(bool delayed = false);

Expand Down Expand Up @@ -192,6 +202,8 @@ class InferContext {

const uint32_t id_{0};
const size_t thread_id_{0};
bool exiting_{false};
bool fast_exit_{false};

size_t GetNumActiveThreads() { return num_active_threads_; }

Expand Down
1 change: 1 addition & 0 deletions src/c++/perf_analyzer/iworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace triton { namespace perfanalyzer {
class IWorker {
public:
virtual void Infer() = 0;
virtual void Exit(bool fast_exit) = 0;
};

}} // namespace triton::perfanalyzer
17 changes: 12 additions & 5 deletions src/c++/perf_analyzer/load_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ LoadManager::LoadManager(
const std::unordered_map<std::string, cb::RequestParameter>&
request_parameters)
: async_(async), streaming_(streaming), batch_size_(batch_size),
max_threads_(max_threads), parser_(parser), factory_(factory),
using_json_data_(false)
max_threads_(max_threads), shared_memory_type_{shared_memory_type},
parser_(parser), factory_(factory), using_json_data_(false)
{
on_sequence_model_ =
((parser_->SchedulerType() == ModelParser::SEQUENCE) ||
Expand Down Expand Up @@ -248,9 +248,16 @@ LoadManager::InitManagerInputs(
void
LoadManager::StopWorkerThreads()
{
early_exit = true;
// wake up all threads
wake_signal_.notify_all();
bool fast_exit = shared_memory_type_ == SharedMemoryType::NO_SHARED_MEMORY;

for (auto& worker : workers_) {
worker->Exit(fast_exit);
}

{
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.notify_all();
}

size_t cnt = 0;
for (auto& thread : threads_) {
Expand Down
1 change: 1 addition & 0 deletions src/c++/perf_analyzer/load_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class LoadManager {
size_t batch_size_;
size_t max_threads_;
bool on_sequence_model_;
SharedMemoryType shared_memory_type_;

std::shared_ptr<ModelParser> parser_;
std::shared_ptr<cb::ClientBackendFactory> factory_;
Expand Down
20 changes: 18 additions & 2 deletions src/c++/perf_analyzer/load_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@

namespace triton { namespace perfanalyzer {

void
LoadWorker::Exit(bool fast_exit)
{
for (auto ctx : ctxs_) {
ctx->Exit(fast_exit);
}

fast_exit_ = fast_exit;
exiting_ = true;

{
std::lock_guard<std::mutex> lk(cb_mtx_);
cb_cv_.notify_all();
}
}

bool
LoadWorker::ShouldExit()
{
Expand All @@ -44,7 +60,7 @@ LoadWorker::ShouldExit()
thread_config_->num_requests_ != 0 &&
thread_stat_->num_sent_requests_ >= thread_config_->num_requests_;

return early_exit || bad_status || done_with_request_count;
return exiting_ || bad_status || done_with_request_count;
}

bool
Expand Down Expand Up @@ -73,7 +89,7 @@ LoadWorker::CompleteOngoingSequences()
void
LoadWorker::WaitForOngoingRequests()
{
while (GetNumOngoingRequests() != 0) {
while (GetNumOngoingRequests() != 0 && !(exiting_ && fast_exit_)) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/c++/perf_analyzer/load_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class LoadWorker : public IWorker {

virtual ~LoadWorker() = default;

// Tell the worker thread to stop issuing new requests and to exit
// If fast_exit is true, the worker thread should exit as fast as possible. If
// it is false, it should still wait for all outstanding requests before
// exiting
virtual void Exit(bool fast_exit) override;

protected:
// Return the total number of async requests that have started and not
// finished
Expand Down Expand Up @@ -117,6 +123,9 @@ class LoadWorker : public IWorker {

void AsyncCallbackFinalize(uint32_t ctx_id);

bool exiting_ = false;
bool fast_exit_ = false;

uint32_t id_;

std::vector<std::shared_ptr<InferContext>> ctxs_;
Expand Down
7 changes: 5 additions & 2 deletions src/c++/perf_analyzer/request_rate_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ RequestRateWorker::Infer()
HandleExecuteOff();

bool is_delayed = SleepIfNecessary();
if (HandleExitConditions()) {
return;
}
uint32_t ctx_id = GetCtxId();
SendInferRequest(ctx_id, is_delayed);
RestoreFreeCtxId(ctx_id);
Expand Down Expand Up @@ -119,7 +122,7 @@ RequestRateWorker::HandleExecuteOff()
// Wait if no request should be sent and it is not exiting
thread_config_->is_paused_ = true;
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.wait(lock, [this]() { return early_exit || execute_; });
wake_signal_.wait(lock, [this]() { return exiting_ || execute_; });
}

thread_config_->is_paused_ = false;
Expand Down Expand Up @@ -155,7 +158,7 @@ RequestRateWorker::WaitForFreeCtx()
std::unique_lock<std::mutex> lk(cb_mtx_);
thread_stat_->idle_timer.Start();
cb_cv_.wait(lk, [this] {
if (notified_) {
if (notified_ || exiting_) {
notified_ = false;
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/c++/perf_analyzer/test_concurrency_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ TEST_CASE("concurrency_free_ctx_ids")

std::this_thread::sleep_for(std::chrono::milliseconds(15));

early_exit = true;
worker->Exit(false);
infer_future.get();

// The first sequence should only be called two times, once at the very start,
Expand Down Expand Up @@ -590,7 +590,7 @@ TEST_CASE("Concurrency - shared memory infer input calls")

std::this_thread::sleep_for(std::chrono::milliseconds(18));

early_exit = true;
worker->Exit(false);
infer_future.get();

const auto& actual_append_raw_calls{tcm.stats_->num_append_raw_calls};
Expand Down
6 changes: 0 additions & 6 deletions src/c++/perf_analyzer/test_load_manager_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ class TestLoadManagerBase {
is_sequence_model, is_decoupled_model);
}

~TestLoadManagerBase()
{
// Reset early_exit in case any test sets it to true during execution.
early_exit = false;
}

// Helper function to process custom json data in testing
// Creates a model tensor to pass to a mock parser which is consumed by the
// mock data loader
Expand Down
8 changes: 3 additions & 5 deletions src/c++/perf_analyzer/test_request_rate_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ class TestRequestRateManager : public TestLoadManagerBase,
REQUIRE(timestamp.count() == expected_current_timestamp.count());
}
}
early_exit = true;
}

void TestCreateSchedule(
Expand All @@ -168,7 +167,6 @@ class TestRequestRateManager : public TestLoadManagerBase,
total_num_seqs += w->thread_config_->num_sequences_;
worker_schedule_sizes.push_back(w->schedule_->intervals.size());
}
early_exit = true;

CHECK(num_of_sequences_ == total_num_seqs);
for (int i = 0; i < worker_schedule_sizes.size() - 1; i++) {
Expand Down Expand Up @@ -977,7 +975,7 @@ TEST_CASE("request_rate_streaming: test that streaming-specific logic works")
std::dynamic_pointer_cast<IScheduler>(worker)->SetSchedule(schedule);
std::future<void> infer_future{std::async(&IWorker::Infer, worker)};

early_exit = true;
worker->Exit(false);
infer_future.get();

CHECK(
Expand Down Expand Up @@ -1827,7 +1825,7 @@ TEST_CASE("Request rate - Shared memory infer input calls")

std::this_thread::sleep_for(milliseconds(18));

early_exit = true;
worker->Exit(false);
infer_future.get();

const auto& actual_append_raw_calls{trrm.stats_->num_append_raw_calls};
Expand Down Expand Up @@ -2184,7 +2182,7 @@ TEST_CASE("request rate create schedule")
params.max_trials = 10;
bool is_sequence_model = false;
bool is_decoupled = false;
bool use_mock_infer = false;
bool use_mock_infer = true;
double rate = 10;
std::vector<uint32_t> expected_worker_ratio;

Expand Down
Loading