Skip to content

Commit

Permalink
[Kernel] Enable custom AR on ROCm (#27)
Browse files Browse the repository at this point in the history
* [Kernel] Enable custome AR on ROCm

* Install amdsmi in Docker in preparation for custom all reduce

(cherry picked from commit f6cfb9bf31e9feeefbdedecf2165f80dd0564b75)

* Fix for yapf

* Linting and small fixes to vLLM syntax

(cherry picked from commit 2cf8103bfb0afce59b28a06c5bbe905983c42728)

---------

Co-authored-by: Matthew Wong <[email protected]>
  • Loading branch information
wenkaidu and mawong-amd authored Jun 24, 2024
1 parent c455e9c commit fa78403
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 44 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ if(VLLM_GPU_LANG STREQUAL "CUDA")

endif()

if(VLLM_GPU_LANG STREQUAL "HIP")
list(APPEND VLLM_EXT_SRC
"csrc/custom_all_reduce.cu")
endif()

define_gpu_extension_target(
_C
DESTINATION vllm
Expand Down
13 changes: 12 additions & 1 deletion Dockerfile.rocm
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ COPY --from=build_triton ${COMMON_WORKDIR}/triton/python/dist/*.whl /
FROM scratch AS export_triton_0
FROM export_triton_${BUILD_TRITON} AS export_triton

# AMD-SMI build stages
FROM base AS build_amdsmi
RUN cd /opt/rocm/share/amd_smi \
&& pip wheel . --wheel-dir=dist
FROM scratch AS export_amdsmi
COPY --from=build_amdsmi /opt/rocm/share/amd_smi/dist/*.whl /

# -----------------------
# vLLM (and gradlib) fetch stages
FROM base AS fetch_vllm_0
Expand Down Expand Up @@ -201,7 +208,10 @@ RUN --mount=type=bind,from=export_triton,src=/,target=/install \
pip install /install/*.whl; \
fi

RUN python3 -m pip install --upgrade numba
RUN --mount=type=bind,from=export_amdsmi,src=/,target=/install \
pip install /install/*.whl;

RUN python3 -m pip install --upgrade numba scipy huggingface-hub[cli]

# Install vLLM (and gradlib)
# Make sure punica kernels are built (for LoRA)
Expand All @@ -221,6 +231,7 @@ RUN --mount=type=bind,from=export_vllm,src=/,target=/install \
COPY --from=export_vllm /benchmarks ${COMMON_WORKDIR}/vllm/benchmarks

ENV RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES=1
ENV TOKENIZERS_PARALLELISM=false

# Performance environment variable.
ENV HIP_FORCE_DEV_KERNARG=1
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/benchmark_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def main(args: argparse.Namespace):
enable_chunked_prefill=args.enable_chunked_prefill,
download_dir=args.download_dir,
block_size=args.block_size,
disable_custom_all_reduce=args.disable_custom_all_reduce,
gpu_memory_utilization=args.gpu_memory_utilization)

sampling_params = SamplingParams(
Expand Down Expand Up @@ -229,6 +230,7 @@ def run_to_completion(profile_dir: Optional[str] = None):
type=str,
default=None,
help='Path to save the latency results in JSON format.')
parser.add_argument('--disable_custom_all_reduce', action='store_true')
parser.add_argument('--gpu-memory-utilization',
type=float,
default=0.9,
Expand Down
39 changes: 39 additions & 0 deletions csrc/custom_all_reduce.cu
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,42 @@ void register_graph_buffers(fptr_t _fa, const std::vector<std::string>& handles,
auto fa = reinterpret_cast<vllm::CustomAllreduce*>(_fa);
fa->register_graph_buffers(handles, offsets);
}

#ifdef USE_ROCM

void free_meta_buffer(void* buffer) { hipFree(buffer); }

std::vector<uint8_t> get_meta_buffer_ipc_handle(torch::Tensor inp) {
std::vector<uint8_t> data_handle(sizeof(cudaIpcMemHandle_t), 0);
CUDACHECK(cudaIpcGetMemHandle((cudaIpcMemHandle_t*)data_handle.data(),
inp.data_ptr()));
return data_handle;
}

torch::Tensor allocate_meta_buffer(int size) {
auto device_index = c10::cuda::current_device();
at::DeviceGuard device_guard(at::Device(at::DeviceType::CUDA, device_index));
void* buffer;
cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed;
auto stream = c10::cuda::getCurrentCUDAStream().stream();
AT_CUDA_CHECK(cudaThreadExchangeStreamCaptureMode(&mode));
AT_CUDA_CHECK(
hipExtMallocWithFlags((void**)&buffer, size, hipDeviceMallocUncached));
AT_CUDA_CHECK(cudaMemsetAsync(buffer, 0, size, stream));
AT_CUDA_CHECK(cudaStreamSynchronize(stream));
AT_CUDA_CHECK(cudaThreadExchangeStreamCaptureMode(&mode));
auto options = torch::TensorOptions()
.dtype(torch::kI8)
.device(torch::kCUDA, device_index);
return torch::from_blob(buffer, {size}, free_meta_buffer, options);
}

std::vector<uint8_t> get_device_bdf(int dev) {
char busIdStr[] = "0000:00:00.0";
std::vector<uint8_t> bdf(sizeof(busIdStr), 0);
CUDACHECK(cudaDeviceGetPCIBusId((char*)bdf.data(), sizeof(busIdStr), dev));
bdf.resize(bdf.size() - 1); // remove trailing NULL
return bdf;
}

#endif
54 changes: 53 additions & 1 deletion csrc/custom_all_reduce.cuh
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
#pragma once

#include <cuda.h>
#include <cuda_bf16.h>
#ifdef USE_ROCM
#include <hip/hip_bf16.h>
typedef __hip_bfloat16 nv_bfloat16;
#else
#include <cuda_bf16.h>
#endif
#include <cuda_fp16.h>
#include <cuda_runtime.h>

Expand Down Expand Up @@ -29,9 +34,14 @@ constexpr int kMaxBlocks = 64;
struct Signal {
alignas(128) uint32_t start[kMaxBlocks][8];
alignas(128) uint32_t end[kMaxBlocks][8];
alignas(128) uint32_t _flag[kMaxBlocks]; // incremental flags for each rank
};

#ifdef USE_ROCM
struct __align__(16) RankData { const void* ptrs[8]; };
#else
struct __align__(16) RankData { const void* __restrict__ ptrs[8]; };
#endif

struct __align__(16) RankSignals { volatile Signal* signals[8]; };

Expand Down Expand Up @@ -130,6 +140,21 @@ DINLINE O downcast(array_t<float, O::size> val) {
template <int ngpus>
DINLINE void start_sync(const RankSignals& sg, volatile Signal* self_sg,
int rank) {
#ifdef USE_ROCM
uint32_t flag = self_sg->_flag[blockIdx.x] + 1;
if (threadIdx.x < ngpus) {
// simultaneously write to the corresponding flag of all ranks.
// Latency = 1 p2p write
__atomic_store_n(&sg.signals[threadIdx.x]->start[blockIdx.x][rank], flag,
__ATOMIC_RELAXED);
// wait until we got true from all ranks
while (__atomic_load_n(&self_sg->start[blockIdx.x][threadIdx.x],
__ATOMIC_RELAXED) < flag);
}
__syncthreads();
// use one thread to update flag
if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag;
#else
if (threadIdx.x < ngpus) {
// reset flag for next time
self_sg->end[blockIdx.x][threadIdx.x] = 0;
Expand All @@ -140,6 +165,7 @@ DINLINE void start_sync(const RankSignals& sg, volatile Signal* self_sg,
while (!self_sg->start[blockIdx.x][threadIdx.x]);
}
__syncthreads();
#endif
}

// This function is meant to be used as the second or the final synchronization
Expand All @@ -148,6 +174,27 @@ DINLINE void start_sync(const RankSignals& sg, volatile Signal* self_sg,
template <int ngpus, bool final_sync = false>
DINLINE void end_sync(const RankSignals& sg, volatile Signal* self_sg,
int rank) {
#ifdef USE_ROCM
__syncthreads();
// eliminate the case that prior writes are not visible after signals become
// visible. Note that I did not managed to make this happen through a lot of
// testing. Might be the case that hardware provides stronger guarantee than
// the memory model.
uint32_t flag = self_sg->_flag[blockIdx.x] + 1;
if (threadIdx.x < ngpus) {
// simultaneously write to the corresponding flag of all ranks.
// Latency = 1 p2p write
__atomic_store_n(&sg.signals[threadIdx.x]->end[blockIdx.x][rank], flag,
final_sync ? __ATOMIC_RELAXED : __ATOMIC_RELEASE);
// wait until we got true from all ranks
while (__atomic_load_n(&self_sg->end[blockIdx.x][threadIdx.x],
final_sync ? __ATOMIC_RELAXED : __ATOMIC_ACQUIRE) <
flag);
}
__syncthreads();
// use one thread to update flag
if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag;
#else
__syncthreads();
// eliminate the case that prior writes are not visible after signals become
// visible. Note that I did not managed to make this happen through a lot of
Expand All @@ -164,6 +211,7 @@ DINLINE void end_sync(const RankSignals& sg, volatile Signal* self_sg,
while (!self_sg->end[blockIdx.x][threadIdx.x]);
}
if constexpr (!final_sync) __syncthreads();
#endif
}

template <typename P, int ngpus, typename A>
Expand Down Expand Up @@ -324,7 +372,11 @@ class CustomAllreduce {
// note: must share the base address of each allocation, or we get wrong
// address
if (cuPointerGetAttribute(&base_ptr,
#ifdef USE_ROCM
HIP_POINTER_ATTRIBUTE_RANGE_START_ADDR,
#else
CU_POINTER_ATTRIBUTE_RANGE_START_ADDR,
#endif
(CUdeviceptr)ptr) != CUDA_SUCCESS)
throw std::runtime_error("failed to get pointer attr");
CUDACHECK(cudaIpcGetMemHandle(
Expand Down
27 changes: 25 additions & 2 deletions csrc/custom_all_reduce_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@
#include <vector>

#include "cuda_profiler_api.h"
#include "custom_all_reduce.cuh"
#include "mpi.h"
#include "nccl.h"
#ifdef USE_ROCM
#include <hip/hip_bf16.h>
typedef __hip_bfloat16 nv_bfloat16;
#include "rccl/rccl.h"
#include "custom_all_reduce_hip.cuh"
#else
#include "nccl.h"
#include "custom_all_reduce.cuh"
#endif

#define MPICHECK(cmd) \
do { \
Expand All @@ -44,7 +51,17 @@
} while (0)

__global__ void dummy_kernel() {
#ifdef USE_ROCM
for (int i = 0; i < 100; i++) {
uint64_t start = wall_clock64();
uint64_t cycles_elapsed;
do {
cycles_elapsed = wall_clock64() - start;
} while (cycles_elapsed < 100);
}
#else
for (int i = 0; i < 100; i++) __nanosleep(1000000); // 100ms
#endif
}

template <typename T>
Expand Down Expand Up @@ -114,8 +131,14 @@ void run(int myRank, int nRanks, ncclComm_t& comm, int threads, int block_limit,
* registration, they are allocated and registered together in the test for
* convenience.
*/
#ifdef USE_ROCM
CUDACHECK(hipExtMallocWithFlags(
(void**)&buffer, 2 * data_size * sizeof(T) + sizeof(vllm::Signal),
hipDeviceMallocUncached));
#else
CUDACHECK(
cudaMalloc(&buffer, 2 * data_size * sizeof(T) + sizeof(vllm::Signal)));
#endif
CUDACHECK(
cudaMemset(buffer, 0, 2 * data_size * sizeof(T) + sizeof(vllm::Signal)));
CUDACHECK(cudaMalloc(&self_data_copy, data_size * sizeof(T)));
Expand Down
5 changes: 4 additions & 1 deletion csrc/ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ void moe_align_block_size(torch::Tensor topk_ids, int num_experts,
torch::Tensor experts_ids,
torch::Tensor num_tokens_post_pad);

#ifndef USE_ROCM
using fptr_t = uint64_t;
fptr_t init_custom_ar(torch::Tensor& meta, torch::Tensor& rank_data,
const std::vector<std::string>& handles,
Expand All @@ -151,4 +150,8 @@ std::pair<std::vector<uint8_t>, std::vector<int64_t>> get_graph_buffer_ipc_meta(
fptr_t _fa);
void register_graph_buffers(fptr_t _fa, const std::vector<std::string>& handles,
const std::vector<std::vector<int64_t>>& offsets);
#ifdef USE_ROCM
torch::Tensor allocate_meta_buffer(int size);
std::vector<uint8_t> get_meta_buffer_ipc_handle(torch::Tensor inp);
std::vector<uint8_t> get_device_bdf(int dev);
#endif
7 changes: 6 additions & 1 deletion csrc/pybind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
&get_max_shared_memory_per_block_device_attribute,
"Gets the maximum shared memory per block device attribute.");

#ifndef USE_ROCM
// Custom all-reduce kernels
pybind11::module custom_ar = m.def_submodule("custom_ar", "custom allreduce");
custom_ar.def("init_custom_ar", &init_custom_ar, "init_custom_ar");
Expand All @@ -112,5 +111,11 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
"get_graph_buffer_ipc_meta");
custom_ar.def("register_graph_buffers", &register_graph_buffers,
"register_graph_buffers");
#ifdef USE_ROCM
custom_ar.def("allocate_meta_buffer", &allocate_meta_buffer,
"allocate_meta_buffer");
custom_ar.def("get_meta_buffer_ipc_handle", &get_meta_buffer_ipc_handle,
"get_meta_buffer_ipc_handle");
custom_ar.def("get_device_bdf", &get_device_bdf, "get_device_bdf");
#endif
}
17 changes: 6 additions & 11 deletions vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,17 +623,12 @@ def _verify_args(self) -> None:
raise ValueError(
"Unrecognized distributed executor backend. Supported values "
"are 'ray' or 'mp' or 'torchrun'.")
if not self.disable_custom_all_reduce and self.world_size > 1:
if is_hip():
self.disable_custom_all_reduce = True
logger.info(
"Disabled the custom all-reduce kernel because it is not "
"supported on AMD GPUs.")
elif self.pipeline_parallel_size > 1:
self.disable_custom_all_reduce = True
logger.info(
"Disabled the custom all-reduce kernel because it is not "
"supported with pipeline parallelism.")
if (not self.disable_custom_all_reduce and self.world_size > 1
and self.pipeline_parallel_size > 1):
self.disable_custom_all_reduce = True
logger.info(
"Disabled the custom all-reduce kernel because it is not "
"supported with pipeline parallelism.")
if self.ray_workers_use_nsight and (
not self.distributed_executor_backend == "ray"):
raise ValueError("Unable to use nsight profiling unless workers "
Expand Down
Loading

0 comments on commit fa78403

Please sign in to comment.