forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtensorpipe_cuda.cpp
133 lines (104 loc) · 4.56 KB
/
tensorpipe_cuda.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#include <torch/csrc/distributed/rpc/tensorpipe_agent.h>
#include <torch/csrc/distributed/rpc/tensorpipe_utils.h>
#if defined(USE_TENSORPIPE) && !defined(USE_ROCM)
#include <c10/cuda/CUDACachingAllocator.h>
#include <c10/cuda/CUDAGuard.h>
#include <c10/cuda/CUDAStream.h>
C10_DIAGNOSTIC_PUSH_AND_IGNORED_IF_DEFINED("-Wdeprecated")
#include <tensorpipe/tensorpipe.h>
#include <tensorpipe/tensorpipe_cuda.h>
C10_DIAGNOSTIC_POP()
namespace torch {
namespace distributed {
namespace rpc {
namespace {
#if TENSORPIPE_HAS_CUDA_IPC_CHANNEL
std::unique_ptr<ChannelRegistration> makeCudaIpcChannel() {
auto context = tensorpipe::channel::cuda_ipc::create();
return std::make_unique<ChannelRegistration>(
ChannelRegistration{std::move(context), kCudaIpcChannelPriority});
}
// The cuda_ipc channels use cudaMemcpy to transmit CUDA tensor across processes
C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_ipc, makeCudaIpcChannel);
#endif
#if TENSORPIPE_HAS_CUDA_GDR_CHANNEL
std::unique_ptr<ChannelRegistration> makeCudaGdrChannel() {
auto context = tensorpipe::channel::cuda_gdr::create();
return std::make_unique<ChannelRegistration>(
ChannelRegistration{std::move(context), kCudaGdrChannelPriority});
}
// The cuda_gdr channel sends CUDA memory over InfiniBand using GPUDirect RDMA.
// It directly registers the user-provided tensor with libibverbs, an operation
// which is expensive the first time, but it then caches the registration in
// order to amortize the cost and get low latency for subsequent transfers. A
// ready-to-send/ready-to-receive handshake is still needed before the transfer
// in order to ensure readiness and to agree on the device indices and thus the
// queue pair to use. It automatically pairs each GPU to the "closest" NIC if
// there are multiple of them (closest = longest prefix match in PCI tree).
C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_gdr, makeCudaGdrChannel);
#endif
std::unique_ptr<ChannelRegistration> makeCudaXthChannel() {
auto context = tensorpipe::channel::cuda_xth::create();
return std::make_unique<ChannelRegistration>(
ChannelRegistration{std::move(context), kCudaXthChannelPriority});
}
// The cuda_xth channel supports same-process GPU-to-GPU comm
C10_REGISTER_CREATOR(TensorPipeChannelRegistry, cuda_xth, makeCudaXthChannel);
std::unique_ptr<ChannelRegistration> makeCudaBasicChannel() {
auto context = tensorpipe::channel::cuda_basic::create(
tensorpipe::channel::basic::create());
return std::make_unique<ChannelRegistration>(
ChannelRegistration{std::move(context), kCudaBasicChannelPriority});
}
// The cuda_basic is the fallback channel for GPU-to-GPU comm
C10_REGISTER_CREATOR(
TensorPipeChannelRegistry,
cuda_basic,
makeCudaBasicChannel);
class TensorpipeCudaConverter : public TensorpipeDeviceTypeConverter {
public:
std::optional<std::vector<char>> prepareTensorForSending(
const c10::Storage& storage,
const std::vector<c10::Stream>& streams,
tensorpipe::Message& message) const override {
auto stream =
at::cuda::CUDAStream(getStreamForDevice(streams, storage.device()));
// record tensor data ptrs on TensorPipe streams, so that the tensors
// won't be destructed before TensorPipe finishing sending them.
c10::cuda::CUDACachingAllocator::recordStream(storage.data_ptr(), stream);
tensorpipe::CudaBuffer buffer;
buffer.ptr = static_cast<char*>(storage.mutable_data());
buffer.stream = stream.stream();
tensorpipe::Message::Tensor tensor;
tensor.buffer = buffer;
tensor.length = storage.nbytes();
message.tensors.push_back(std::move(tensor));
return c10::nullopt;
}
at::DataPtr allocateTensorForReceiving(
c10::DeviceIndex deviceIndex,
size_t length,
const std::vector<c10::Stream>& streams,
tensorpipe::Allocation& allocation) const override {
c10::Device device(c10::kCUDA, deviceIndex);
at::cuda::CUDAStream stream(getStreamForDevice(streams, device));
// CUDACachingAllocator will call recordStream accordingly on the current
// stream.
at::cuda::CUDAStreamGuard guard(stream);
at::DataPtr dataPtr =
c10::cuda::CUDACachingAllocator::get()->allocate(length);
tensorpipe::CudaBuffer buffer;
buffer.ptr = dataPtr.get();
buffer.stream = stream.stream();
tensorpipe::Allocation::Tensor tensor;
tensor.buffer = buffer;
allocation.tensors.push_back(tensor);
return dataPtr;
}
};
C10_REGISTER_TENSORPIPE_DEVICE_TYPE_CONVERTER(CUDA, TensorpipeCudaConverter);
} // namespace
} // namespace rpc
} // namespace distributed
} // namespace torch
#endif