Skip to content

Commit

Permalink
Fix error handling. Fix bls tensor lifetime
Browse files Browse the repository at this point in the history
  • Loading branch information
krishung5 committed Sep 7, 2023
1 parent fd682b9 commit 9d02cc0
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 151 deletions.
23 changes: 22 additions & 1 deletion src/memory_manager.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -59,6 +59,27 @@ GPUMemoryRecord::ReleaseCallback()
{
return release_callback_;
}

BackendMemoryRecord::BackendMemoryRecord(
std::unique_ptr<BackendMemory> backend_memory)
: backend_memory_(std::move(backend_memory))
{
release_callback_ = [](void* ptr) {
// Do nothing. The backend_memory_ will be destroyed in the destructor.
};
}

void*
BackendMemoryRecord::MemoryId()
{
return reinterpret_cast<void*>(backend_memory_->MemoryPtr());
}

const std::function<void(void*)>&
BackendMemoryRecord::ReleaseCallback()
{
return release_callback_;
}
#endif

MemoryManager::MemoryManager(
Expand Down
14 changes: 13 additions & 1 deletion src/memory_manager.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -33,6 +33,7 @@

#include "message_queue.h"
#include "triton/backend/backend_common.h"
#include "triton/backend/backend_memory.h"
#include "triton/core/tritonserver.h"

#ifdef TRITON_ENABLE_GPU
Expand All @@ -59,6 +60,17 @@ class GPUMemoryRecord : public MemoryRecord {
void* ptr_;
std::function<void(void*)> release_callback_;
};

class BackendMemoryRecord : public MemoryRecord {
public:
BackendMemoryRecord(std::unique_ptr<BackendMemory> backend_memory);
const std::function<void(void*)>& ReleaseCallback() override;
void* MemoryId() override;

private:
std::unique_ptr<BackendMemory> backend_memory_;
std::function<void(void*)> release_callback_;
};
#endif

/// Memory manager class is used primarily for managing the lifetime of GPU
Expand Down
113 changes: 62 additions & 51 deletions src/pb_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ PbMemory::Create(
{
std::unique_ptr<PbMemory> pb_memory = PbMemory::Create(
shm_pool, backend_memory->MemoryType(), backend_memory->MemoryTypeId(),
backend_memory->ByteSize(), backend_memory->MemoryPtr(), copy_gpu);
pb_memory->backend_memory_ = std::move(backend_memory);
backend_memory->ByteSize(), backend_memory->MemoryPtr(), copy_gpu,
false /* write_back_data */, false /* copy_data*/);
if (!pb_memory->backend_memory_) {
pb_memory->backend_memory_ = std::move(backend_memory);
}

return pb_memory;
}


void
PbMemory::WriteBackOutput(
std::unique_ptr<SharedMemoryManager>& shm_pool, cudaStream_t cuda_stream)
Expand Down Expand Up @@ -241,58 +243,67 @@ PbMemory::FillShmData(
reinterpret_cast<cudaIpcMemHandle_t*>(memory_data_shm), data));
}
#ifndef TRITON_PB_STUB
// Check if the data is already in the pool by checking the base address.
CUDAHandler& cuda_api = CUDAHandler::getInstance();
CUdeviceptr cuda_pool_address = 0;
cuda_api.PointerGetAttribute(
&cuda_pool_address, CU_POINTER_ATTRIBUTE_RANGE_START_ADDR,
reinterpret_cast<CUdeviceptr>(data));
if (shm_pool->CUDAPoolAddress() ==
reinterpret_cast<void*>(cuda_pool_address)) {
use_cuda_shared_pool = true;
memory_shm_ptr->cuda_pool_offset =
data - reinterpret_cast<char*>(shm_pool->CUDAPoolAddress());
} else {
TRITONSERVER_Error* error = BackendMemory::Create(
reinterpret_cast<TRITONBACKEND_MemoryManager*>(
shm_pool->TritonMemoryManager()),
BackendMemory::AllocationType::GPU_POOL, memory_type_id, byte_size,
reinterpret_cast<BackendMemory**>(backend_memory));
if (error != nullptr) {
LOG_MESSAGE(
TRITONSERVER_LOG_WARN,
(std::string(
"Failed to allocate memory from CUDA memory pool: ") +
TRITONSERVER_ErrorMessage(error) +
". Switching back to CUDA IPC approach.")
.c_str());
} else if (copy_data) {
// Copy the data to the new buffer in the CUDA pool.
cudaMemcpyKind kind = cudaMemcpyDeviceToDevice;
cudaError_t err;
err = cudaMemcpy(
(reinterpret_cast<BackendMemory*>(*backend_memory))->MemoryPtr(),
data, byte_size, kind);
if (err != cudaSuccess) {
throw PythonBackendException(
std::string(
"failed to copy data: " +
std::string(cudaGetErrorString(err)))
.c_str());
if (shm_pool->UseCudaSharedPool()) {
// Check if the data is already in the pool by checking the base
// address.
CUDAHandler& cuda_api = CUDAHandler::getInstance();
CUdeviceptr cuda_pool_address = 0;
cuda_api.PointerGetAttribute(
&cuda_pool_address, CU_POINTER_ATTRIBUTE_RANGE_START_ADDR,
reinterpret_cast<CUdeviceptr>(data));
if (shm_pool->CUDAPoolAddress() ==
reinterpret_cast<void*>(cuda_pool_address)) {
use_cuda_shared_pool = true;
memory_shm_ptr->cuda_pool_offset =
data - reinterpret_cast<char*>(shm_pool->CUDAPoolAddress());
} else {
try {
THROW_IF_TRITON_ERROR(BackendMemory::Create(
reinterpret_cast<TRITONBACKEND_MemoryManager*>(
shm_pool->TritonMemoryManager()),
BackendMemory::AllocationType::GPU_POOL, memory_type_id,
byte_size, reinterpret_cast<BackendMemory**>(backend_memory)));

if (copy_data) {
// Copy the data to the new buffer in the CUDA pool.
cudaMemcpyKind kind = cudaMemcpyDeviceToDevice;
cudaError_t err;
err = cudaMemcpy(
(reinterpret_cast<BackendMemory*>(*backend_memory))
->MemoryPtr(),
data, byte_size, kind);
if (err != cudaSuccess) {
throw PythonBackendException(
std::string(
"failed to copy data: " +
std::string(cudaGetErrorString(err)))
.c_str());
}
err = cudaStreamSynchronize(0);
if (err != cudaSuccess) {
throw PythonBackendException(
std::string(
"failed to synchronize the default CUDA stream. "
"error: " +
std::string(cudaGetErrorString(err)))
.c_str());
}
}
use_cuda_shared_pool = true;
memory_shm_ptr->cuda_pool_offset =
(reinterpret_cast<BackendMemory*>(*backend_memory))
->MemoryPtr() -
reinterpret_cast<char*>(shm_pool->CUDAPoolAddress());
}
err = cudaStreamSynchronize(0);
if (err != cudaSuccess) {
throw PythonBackendException(
std::string(
"failed to synchronize the default CUDA stream. error: " +
std::string(cudaGetErrorString(err)))
catch (const PythonBackendException& pb_exception) {
LOG_MESSAGE(
TRITONSERVER_LOG_WARN,
(std::string(
"Failed to allocate memory from CUDA memory pool: ") +
pb_exception.what() + ". Switching back to CUDA IPC approach.")
.c_str());
}
}
use_cuda_shared_pool = true;
memory_shm_ptr->cuda_pool_offset =
(reinterpret_cast<BackendMemory*>(*backend_memory))->MemoryPtr() -
reinterpret_cast<char*>(shm_pool->CUDAPoolAddress());
}
#endif // Not TRITON_PB_STUB
}
Expand Down
7 changes: 7 additions & 0 deletions src/pb_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ class PbMemory {

~PbMemory();

#ifndef TRITON_PB_STUB
std::unique_ptr<BackendMemory> GetBackendMemory()
{
return std::move(backend_memory_);
};
#endif

private:
AllocatedSharedMemory<char> memory_shm_;
MemoryShm* memory_shm_ptr_;
Expand Down
9 changes: 2 additions & 7 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ Stub::RunCommand()
AllocatedSharedMemory<InitializeResponseShm> response =
shm_pool_->Construct<InitializeResponseShm>();

ScopedDefer finalize([this] { stub_message_queue_->Pop(); });
ScopedDefer _([this, &response_msg] { SendIPCMessage(response_msg); });

response.data_->response_has_error = false;
Expand All @@ -387,12 +388,10 @@ Stub::RunCommand()
catch (const PythonBackendException& pb_exception) {
has_exception = true;
error_string = pb_exception.what();
shm_pool_->SetCUDAPoolAddress(nullptr);
}

if (has_exception) {
// Do not delete the region. The region will be deleted by the parent
// process.
shm_pool_->SetDeleteRegion(false);
LOG_INFO
<< "Failed to initialize CUDA shared memory pool in Python stub: "
<< error_string;
Expand All @@ -405,8 +404,6 @@ Stub::RunCommand()
response.data_->response_is_error_set = true;
response.data_->response_error = error_string_shm->ShmHandle();
}

return true; // Terminate the stub process.
}
} break;
default:
Expand Down Expand Up @@ -1297,8 +1294,6 @@ Stub::GetCUDAMemoryPoolAddress(bi::managed_external_buffer::handle_t handle)
cuda_api.OpenCudaHandle(
device_id_, &cuda_handle_shm_ptr->cuda_handle, &cuda_pool_address);
shm_pool_->SetCUDAPoolAddress(cuda_pool_address);
#else
return nullptr;
#endif
}

Expand Down
94 changes: 76 additions & 18 deletions src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,17 @@ ModelInstanceState::LaunchStubProcess()
StartMonitor();
RETURN_IF_ERROR(Stub()->Launch());
#ifdef TRITON_ENABLE_GPU
Stub()->ShareCUDAMemoryPool(Model()->TritonMemoryManager());
try {
Stub()->ShareCUDAMemoryPool(Model()->TritonMemoryManager());
}
catch (const PythonBackendException& ex) {
LOG_MESSAGE(
TRITONSERVER_LOG_WARN,
(std::string("Failed to utlize CUDA memory pool: ") + ex.what() +
". Switching back to use CUDA IPC.")
.c_str());
}

#endif // TRITON_ENABLE_GPU

thread_pool_ = std::make_unique<boost::asio::thread_pool>(
Expand Down Expand Up @@ -568,10 +578,34 @@ ModelInstanceState::GetInputTensor(
Stub()->ShmPool(), true /* copy_gpu */));
}
} else {
// Try to use the cuda shared memory pool first.
void* dev_ptr;
RETURN_IF_CUDA_ERROR(
cudaMalloc(&dev_ptr, input_byte_size), TRITONSERVER_ERROR_INTERNAL,
std::string("Failed to allocate CUDA memory"));
BackendMemory* backend_memory;
std::unique_ptr<BackendMemory> lbackend_memory;
bool use_cuda_shared_pool = false;
try {
THROW_IF_TRITON_ERROR(BackendMemory::Create(
reinterpret_cast<TRITONBACKEND_MemoryManager*>(
Stub()->ShmPool()->TritonMemoryManager()),
BackendMemory::AllocationType::GPU_POOL, src_memory_type_id,
input_byte_size, &backend_memory));

dev_ptr = backend_memory->MemoryPtr();
lbackend_memory.reset(backend_memory);
use_cuda_shared_pool = true;
}
catch (const PythonBackendException& pb_exception) {
LOG_MESSAGE(
TRITONSERVER_LOG_WARN,
(std::string("Failed to allocate memory from CUDA memory pool: ") +
pb_exception.what() +
". Using cudaMalloc to allocate memory for input tensor.")
.c_str());

RETURN_IF_CUDA_ERROR(
cudaMalloc(&dev_ptr, input_byte_size), TRITONSERVER_ERROR_INTERNAL,
std::string("Failed to allocate CUDA memory"));
}

size_t byte_size = input_byte_size;

Expand All @@ -594,13 +628,24 @@ ModelInstanceState::GetInputTensor(
const_cast<void*>(dev_ptr), input_byte_size,
nullptr /* DLManagedTensor */);

if (use_cuda_shared_pool) {
input_tensor->SetMemory(std::move(
PbMemory::Create(Stub()->ShmPool(), std::move(lbackend_memory))));
}

RETURN_IF_EXCEPTION(input_tensor->SaveToSharedMemory(
Stub()->ShmPool(), true /* copy_gpu */));

std::unique_ptr<MemoryRecord> gpu_memory_record =
std::make_unique<GPUMemoryRecord>(input_tensor->Memory()->DataPtr());
std::unique_ptr<MemoryRecord> memory_record;
if (use_cuda_shared_pool) {
memory_record = std::make_unique<BackendMemoryRecord>(
input_tensor->Memory()->GetBackendMemory());
} else {
memory_record = std::make_unique<GPUMemoryRecord>(
input_tensor->Memory()->DataPtr());
}
uint64_t memory_release_id =
Stub()->GetMemoryManager()->AddRecord(std::move(gpu_memory_record));
Stub()->GetMemoryManager()->AddRecord(std::move(memory_record));
input_tensor->Memory()->SetMemoryReleaseId(memory_release_id);
}
#else
Expand Down Expand Up @@ -1609,14 +1654,22 @@ ModelInstanceState::PrepareResponseHandle(
{
(*infer_response)->SaveToSharedMemory(Stub()->ShmPool());
for (auto& output_tensor : (*infer_response)->OutputTensors()) {
// For GPU tensors we need to store the memory release id in
// memory manager.
if (!output_tensor->IsCPU()) {
#ifdef TRITON_ENABLE_GPU
std::unique_ptr<MemoryRecord> gpu_memory_record =
std::make_unique<GPUMemoryRecord>(output_tensor->Memory()->DataPtr());
std::unique_ptr<MemoryRecord> memory_record;
if (output_tensor->Memory()->UseCudaSharedPool()) {
// Need to transfer the ownership of the BackendMemory to the
// MemoryManager so that the lifetime of the BackendMemory is managed.
memory_record = std::make_unique<BackendMemoryRecord>(
output_tensor->Memory()->GetBackendMemory());
} else {
// For GPU tensors we need to store the memory release id in
// memory manager.
memory_record = std::make_unique<GPUMemoryRecord>(
output_tensor->Memory()->DataPtr());
}
uint64_t memory_release_id =
Stub()->GetMemoryManager()->AddRecord(std::move(gpu_memory_record));
Stub()->GetMemoryManager()->AddRecord(std::move(memory_record));
output_tensor->Memory()->SetMemoryReleaseId(memory_release_id);
#endif
}
Expand Down Expand Up @@ -2193,12 +2246,17 @@ TRITONBACKEND_ModelInstanceExecute(
"Failed to restart the stub process: failed to launch "
"the stub process.");
#ifdef TRITON_ENABLE_GPU
err = instance_state->Stub()->ShareCUDAMemoryPool(
instance_state->Model()->TritonMemoryManager());
LOG_IF_ERROR(
err,
"Failed to restart the stub process: failed to share "
"CUDA memory pool.");
try {
instance_state->Stub()->ShareCUDAMemoryPool(
instance_state->Model()->TritonMemoryManager());
}
catch (const PythonBackendException& pb_exception) {
LOG_MESSAGE(
TRITONSERVER_LOG_ERROR,
(std::string("Failed to restart the stub process: ") +
pb_exception.what())
.c_str());
}
#endif // TRITON_ENABLE_GPU
}
} else {
Expand Down
Loading

0 comments on commit 9d02cc0

Please sign in to comment.