diff --git a/CMakeLists.txt b/CMakeLists.txt index 3b6ea4b570a99..fb36d133dda2b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -187,6 +187,11 @@ if(VLLM_GPU_LANG STREQUAL "CUDA") "csrc/custom_all_reduce.cu") endif() +if(VLLM_GPU_LANG STREQUAL "HIP") + list(APPEND VLLM_EXT_SRC + "csrc/custom_all_reduce.cu") +endif() + define_gpu_extension_target( _C DESTINATION vllm diff --git a/csrc/custom_all_reduce.cu b/csrc/custom_all_reduce.cu index 3906dcfc80dbf..b160e11b1a48a 100644 --- a/csrc/custom_all_reduce.cu +++ b/csrc/custom_all_reduce.cu @@ -84,13 +84,20 @@ void _all_reduce(fptr_t _fa, torch::Tensor &inp, torch::Tensor &out, out.numel()); break; } -#if (__CUDA_ARCH__ >= 800 || !defined(__CUDA_ARCH__)) +#if (__CUDA_ARCH__ >= 800 || !defined(__CUDA_ARCH__)) && !defined USE_ROCM case at::ScalarType::BFloat16: { fa->allreduce( stream, reinterpret_cast(inp.data_ptr()), reinterpret_cast(out.data_ptr()), out.numel()); break; } +#elif defined USE_ROCM + case at::ScalarType::BFloat16: { + fa->allreduce<__hip_bfloat16>( + stream, reinterpret_cast<__hip_bfloat16 *>(inp.data_ptr()), + reinterpret_cast<__hip_bfloat16 *>(out.data_ptr()), out.numel()); + break; + } #endif default: throw std::runtime_error( diff --git a/csrc/custom_all_reduce.cuh b/csrc/custom_all_reduce.cuh index 750e68d42f6c6..0308b4d4afba0 100644 --- a/csrc/custom_all_reduce.cuh +++ b/csrc/custom_all_reduce.cuh @@ -1,7 +1,11 @@ #pragma once #include +#ifndef USE_ROCM #include +#else +#include +#endif #include #include @@ -31,9 +35,17 @@ struct Signal { alignas(128) uint32_t end[kMaxBlocks][8]; }; +#ifndef USE_ROCM struct __align__(16) RankData { const void *__restrict__ ptrs[8]; }; +#else +struct __align__(16) RankData { const void * ptrs[8]; }; +#endif +#ifndef USE_ROCM struct __align__(16) RankSignals { volatile Signal *signals[8]; }; +#else +struct __align__(16) RankSignals { Signal *signals[8]; }; +#endif // like std::array, but aligned template @@ -74,6 +86,7 @@ DINLINE half &assign_add(half &a, half b) { } DINLINE float &assign_add(float &a, float b) { return a += b; } +#ifndef USE_ROCM #if (__CUDA_ARCH__ >= 800 || !defined(__CUDA_ARCH__)) DINLINE float upcast_s(nv_bfloat16 val) { return __bfloat162float(val); } template <> @@ -85,6 +98,17 @@ DINLINE nv_bfloat16 &assign_add(nv_bfloat16 &a, nv_bfloat16 b) { return a; } #endif +#else +DINLINE float upcast_s(__hip_bfloat16 val) { return __bfloat162float(val); } +template <> +DINLINE __hip_bfloat16 downcast_s(float val) { + return __float2bfloat16(val); +} +DINLINE __hip_bfloat16 &assign_add(__hip_bfloat16 &a, __hip_bfloat16 b) { + a = __hadd(a, b); + return a; +} +#endif template DINLINE array_t &packed_assign_add(array_t &a, array_t b) { @@ -128,16 +152,30 @@ DINLINE O downcast(array_t val) { // prior memory accesses. Note: volatile writes will not be reordered against // other volatile writes. template +#ifndef USE_ROCM DINLINE void start_sync(const RankSignals &sg, volatile Signal *self_sg, - int rank) { + int rank) { +#else +DINLINE void start_sync(const RankSignals &sg, Signal *self_sg, int rank) { +#endif if (threadIdx.x < ngpus) { // reset flag for next time +#ifndef USE_ROCM self_sg->end[blockIdx.x][threadIdx.x] = 0; // simultaneously write to the corresponding flag of all ranks. // Latency = 1 p2p write sg.signals[threadIdx.x]->start[blockIdx.x][rank] = 1; // wait until we got true from all ranks while (!self_sg->start[blockIdx.x][threadIdx.x]) +#else + __atomic_store_n(&self_sg->end[blockIdx.x][threadIdx.x], 0, __ATOMIC_RELAXED); + // simultaneously write to the corresponding flag of all ranks. + // Latency = 1 p2p write + __atomic_store_n(&sg.signals[threadIdx.x]->start[blockIdx.x][rank], 1, __ATOMIC_RELAXED); + __atomic_thread_fence(__ATOMIC_ACQ_REL); + // wait until we got true from all ranks + while (!__atomic_load_n(&self_sg->start[blockIdx.x][threadIdx.x], __ATOMIC_RELAXED)) +#endif ; } __syncthreads(); @@ -147,13 +185,18 @@ DINLINE void start_sync(const RankSignals &sg, volatile Signal *self_sg, // barrier in the all reduce kernel. If it's the final synchronization barrier, // we don't need to make any visibility guarantees for prior memory accesses. template +#ifndef USE_ROCM DINLINE void end_sync(const RankSignals &sg, volatile Signal *self_sg, int rank) { +#else +DINLINE void end_sync(const RankSignals &sg, Signal *self_sg, int rank) { +#endif __syncthreads(); // eliminate the case that prior writes are not visible after signals become // visible. Note that I did not managed to make this happen through a lot of // testing. Might be the case that hardware provides stronger guarantee than // the memory model. +#ifndef USE_ROCM if constexpr (!final_sync) __threadfence_system(); if (threadIdx.x < ngpus) { // reset flag for next time @@ -164,6 +207,18 @@ DINLINE void end_sync(const RankSignals &sg, volatile Signal *self_sg, // wait until we got true from all ranks while (!self_sg->end[blockIdx.x][threadIdx.x]) ; +#else + if (threadIdx.x < ngpus) { + // reset flag for next time + __atomic_store_n(&self_sg->start[blockIdx.x][threadIdx.x], 0, __ATOMIC_RELAXED); + // simultaneously write to the corresponding flag of all ranks. + // Latency = 1 p2p write + __atomic_store_n(&sg.signals[threadIdx.x]->end[blockIdx.x][rank], 1, __ATOMIC_RELAXED); + __atomic_thread_fence(__ATOMIC_ACQ_REL); + // wait until we got true from all ranks + while (!__atomic_load_n(&self_sg->end[blockIdx.x][threadIdx.x], __ATOMIC_RELAXED)) + ; +#endif } if constexpr (!final_sync) __syncthreads(); } @@ -179,10 +234,16 @@ DINLINE P packed_reduce(const P *ptrs[], int idx) { } template +#ifndef USE_ROCM __global__ void __launch_bounds__(512, 1) cross_device_reduce_1stage(RankData *_dp, RankSignals sg, volatile Signal *self_sg, T *__restrict__ result, int rank, int size) { +#else +__global__ void __launch_bounds__(1024, 1) + cross_device_reduce_1stage(RankData *_dp, RankSignals sg, + Signal *self_sg, T *__restrict__ result, int rank, int size) { +#endif using P = typename packed_t::P; using A = typename packed_t::A; // note: we don't reorder the address so the accumulation order is the same @@ -199,15 +260,26 @@ __global__ void __launch_bounds__(512, 1) } template +#ifndef USE_ROCM DINLINE P *get_tmp_buf(volatile Signal *sg) { +#else +DINLINE P *get_tmp_buf(Signal *sg) { +#endif return (P *)(((Signal *)sg) + 1); } template +#ifndef USE_ROCM __global__ void __launch_bounds__(512, 1) cross_device_reduce_2stage(RankData *_dp, RankSignals sg, volatile Signal *self_sg, T *__restrict__ result, int rank, int size) { +#else +__global__ void __launch_bounds__(1024, 1) + cross_device_reduce_2stage(RankData *_dp, RankSignals sg, + Signal *self_sg, T *__restrict__ result, + int rank, int size) { +#endif int tid = blockIdx.x * blockDim.x + threadIdx.x; int stride = gridDim.x * blockDim.x; using P = typename packed_t::P; @@ -327,8 +399,12 @@ class CustomAllreduce { // note: must share the base address of each allocation, or we get wrong // address if (cuPointerGetAttribute(&base_ptr, +#ifndef USE_ROCM CU_POINTER_ATTRIBUTE_RANGE_START_ADDR, - (CUdeviceptr)ptr) != CUDA_SUCCESS) +#else + HIP_POINTER_ATTRIBUTE_RANGE_START_ADDR, +#endif + (CUdeviceptr)ptr) != CUDA_SUCCESS) throw std::runtime_error("failed to get pointer attr"); CUDACHECK(cudaIpcGetMemHandle( (cudaIpcMemHandle_t *)&handles[i * handle_sz], base_ptr)); @@ -406,7 +482,11 @@ class CustomAllreduce { */ template void allreduce(cudaStream_t stream, T *input, T *output, int size, +#ifndef USE_ROCM int threads = 512, int block_limit = 36) { +#else + int threads = 1024, int block_limit = 36) { +#endif auto d = packed_t::P::size; if (size % d != 0) throw std::runtime_error( diff --git a/csrc/custom_all_reduce_test.cu b/csrc/custom_all_reduce_test.cu index c34a50389c21c..bf0d4a8433e79 100644 --- a/csrc/custom_all_reduce_test.cu +++ b/csrc/custom_all_reduce_test.cu @@ -4,6 +4,10 @@ * export MPI_HOME=XXX * nvcc -O2 -arch=native -std=c++17 custom_all_reduce_test.cu -o * custom_all_reduce_test -lnccl -I${MPI_HOME}/include -lmpi + * to hipify and compile + * export MPI_HOME=XXX + * hipify-perl custom_all_reduce_test.cu > custom_all_reduce_test.hip + * hipcc -O2 -std=c++17 custom_all_reduce_test.hip -o custom_all_reduce_test -lrccl -I${MPI_HOME}/include -L${MPI_HOME}/lib -lmpi -DUSE_ROCM=1 * * Warning: this C++ test is not designed to be very readable and was used * during the rapid prototyping process. @@ -12,7 +16,11 @@ * mpirun -np 8 ./custom_all_reduce_test */ #include +#ifndef USE_ROCM #include +#else +#include +#endif #include #include @@ -20,9 +28,17 @@ #include #include "cuda_profiler_api.h" +#ifndef USE_ROCM #include "custom_all_reduce.cuh" +#else +#include "custom_all_reduce_hip.cuh" +#endif #include "mpi.h" +#ifndef USE_ROCM #include "nccl.h" +#else +#include +#endif #define MPICHECK(cmd) \ do { \ @@ -44,7 +60,12 @@ } while (0) __global__ void dummy_kernel() { +#ifndef USE_ROCM for (int i = 0; i < 100; i++) __nanosleep(1000000); // 100ms +#else + #pragma unroll + for (int i = 0; i < 100; i++) __builtin_amdgcn_s_sleep(127); +#endif } template @@ -164,7 +185,11 @@ void run(int myRank, int nRanks, ncclComm_t &comm, int threads, int block_limit, ncclDataType_t ncclDtype; if (std::is_same::value) { ncclDtype = ncclFloat16; +#ifndef USE_ROCM } else if (std::is_same::value) { +#else + } else if (std::is_same::value) { +#endif ncclDtype = ncclBfloat16; } else { ncclDtype = ncclFloat; @@ -308,9 +333,14 @@ int main(int argc, char **argv) { // } // } for (int sz = 512; sz <= (8 << 20); sz *= 2) { +#ifndef USE_ROCM run(myRank, nRanks, comm, 512, 36, sz + 8 * 47, performance_test); +#else + run(myRank, nRanks, comm, 1024, 16, sz + 8 * 47, performance_test); +#endif } cudaProfilerStop(); + MPICHECK(MPI_Finalize()); return EXIT_SUCCESS; } diff --git a/csrc/ops.h b/csrc/ops.h index 3acbc1b3f8363..8fc06d104d521 100644 --- a/csrc/ops.h +++ b/csrc/ops.h @@ -139,7 +139,6 @@ void moe_align_block_size( torch::Tensor experts_ids, torch::Tensor num_tokens_post_pad); -#ifndef USE_ROCM using fptr_t = uint64_t; fptr_t init_custom_ar(torch::Tensor &meta, torch::Tensor &rank_data, const std::vector &handles, @@ -158,7 +157,6 @@ void register_buffer(fptr_t _fa, torch::Tensor &t, std::pair, std::vector> get_graph_buffer_ipc_meta(fptr_t _fa); void register_graph_buffers(fptr_t _fa, const std::vector &handles, const std::vector> &offsets); -#endif void convert_fp8( torch::Tensor& src_cache, @@ -180,4 +178,4 @@ torch::Tensor fp8_gemm_16( torch::Tensor& scaleA, torch::Tensor& scaleB, int algo_idx -); \ No newline at end of file +); diff --git a/csrc/pybind.cpp b/csrc/pybind.cpp index d533b6fcca498..f23adf230fe03 100644 --- a/csrc/pybind.cpp +++ b/csrc/pybind.cpp @@ -112,7 +112,6 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { &get_max_shared_memory_per_block_device_attribute, "Gets the maximum shared memory per block device attribute."); -#ifndef USE_ROCM // Custom all-reduce kernels pybind11::module custom_ar = m.def_submodule("custom_ar", "custom allreduce"); custom_ar.def("init_custom_ar", &init_custom_ar, "init_custom_ar"); @@ -126,6 +125,5 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { "get_graph_buffer_ipc_meta"); custom_ar.def("register_graph_buffers", ®ister_graph_buffers, "register_graph_buffers"); -#endif } diff --git a/vllm/config.py b/vllm/config.py index 5496c892c1638..c3c1ff2d261b6 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -526,10 +526,9 @@ def _verify_args(self) -> None: "Pipeline parallelism is not supported yet.") if not self.disable_custom_all_reduce and self.world_size > 1: if is_hip(): - self.disable_custom_all_reduce = True + self.disable_custom_all_reduce = False logger.info( - "Disabled the custom all-reduce kernel because it is not " - "supported on AMD GPUs.") + "Enable the custom all-reduce kernel on AMD GPUs.") elif self.pipeline_parallel_size > 1: self.disable_custom_all_reduce = True logger.info( diff --git a/vllm/model_executor/parallel_utils/custom_all_reduce.py b/vllm/model_executor/parallel_utils/custom_all_reduce.py index bf8ee07070c8a..214dc2a6bbe6f 100644 --- a/vllm/model_executor/parallel_utils/custom_all_reduce.py +++ b/vllm/model_executor/parallel_utils/custom_all_reduce.py @@ -9,12 +9,14 @@ get_tensor_model_parallel_rank, get_tensor_model_parallel_world_size) try: - import pynvml - from vllm._C import custom_ar except ImportError: - # For AMD GPUs custom_ar = None + +try: + import pynvml +except ImportError: + # For AMD GPUs pynvml = None logger = init_logger(__name__)