Skip to content

Commit ac8c7c4

Browse files
beaubyfacebook-github-bot
authored andcommittedSep 21, 2020
Make Channel API accept buffer structs rather than raw pointers. (pytorch#45014)
Summary: Pull Request resolved: pytorch#45014 Pull Request resolved: pytorch/tensorpipe#219 Pull Request resolved: pytorch/tensorpipe#212 + Introduce buffer.h defining the buffer struct(s). The `CpuBuffer` struct is always defined, while the `CudaBuffer` struct is defined only when `TENSORPIPE_SUPPORTS_CUDA` is true. + Update all channels to take a `CpuBuffer` or `CudaBuffer` for `send`/`recv` rather than a raw pointer and a length. + Make the base `Channel`/`Context` classes templated on `TBuffer`, effectively creating two channel hierarchies (one for CPU channels, one for CUDA channels). + Update the Pipe and the generic channel tests to use the new API. So far, generic channel tests are CPU only, and tests for the CUDA IPC channel are (temporarily) disabled. A subsequent PR will take care of refactoring tests so that generic tests work for CUDA channels. An other PR will add support for CUDA tensors in the Pipe. Differential Revision: D23598033 Test Plan: Imported from OSS Reviewed By: lw Pulled By: beauby fbshipit-source-id: 1d6c3f91e288420858835cd5e7962e8da051b44b
1 parent 4bbb6ad commit ac8c7c4

File tree

6 files changed

+105
-64
lines changed

6 files changed

+105
-64
lines changed
 

‎BUILD.bazel

+6-3
Original file line numberDiff line numberDiff line change
@@ -1727,13 +1727,16 @@ cc_library(
17271727
"@gloo",
17281728
"@onnx",
17291729
"@fmt",
1730-
"@tensorpipe",
17311730
] + if_cuda(
17321731
[
17331732
":caffe2_cpp_cuda",
17341733
":aten_cuda",
1734+
"@tensorpipe//:tensorpipe_cuda",
1735+
],
1736+
[
1737+
":aten",
1738+
"@tensorpipe",
17351739
],
1736-
[":aten"],
17371740
),
17381741
alwayslink = True,
17391742
)
@@ -1764,7 +1767,7 @@ cu_library(
17641767
"@cudnn",
17651768
"@eigen",
17661769
"@gloo",
1767-
"@tensorpipe",
1770+
"@tensorpipe//:tensorpipe_cuda",
17681771
],
17691772
alwayslink = True,
17701773
)

‎test/cpp/rpc/test_tensorpipe_serialization.cpp

+19-11
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ TEST(TensorpipeSerialize, Base) {
4242
recvingTpMessage.tensors.reserve(sendingTpMessage.tensors.size());
4343
for (auto& tpTensor : sendingTpMessage.tensors) {
4444
tensorpipe::Message::Tensor t;
45-
t.length = tpTensor.length;
45+
t.buffer = tensorpipe::CpuBuffer{nullptr, tpTensor.buffer.cpu.length};
4646
t.metadata = tpTensor.metadata;
4747
recvingTpMessage.tensors.push_back(std::move(t));
4848
}
@@ -67,7 +67,10 @@ TEST(TensorpipeSerialize, Base) {
6767
for (int i = 0; i < recvingTpMessage.tensors.size(); i++) {
6868
tensorpipe::Message::Tensor& srcTensor = sendingTpMessage.tensors[i];
6969
tensorpipe::Message::Tensor& dstTensor = recvingTpMessage.tensors[i];
70-
memcpy(dstTensor.data, srcTensor.data, srcTensor.length);
70+
memcpy(
71+
dstTensor.buffer.cpu.ptr,
72+
srcTensor.buffer.cpu.ptr,
73+
srcTensor.buffer.cpu.length);
7174
}
7275

7376
// Mimic read() callback:
@@ -110,9 +113,10 @@ TEST(TensorpipeSerialize, RecopySparseTensors) {
110113
EXPECT_TRUE(torch::equal(main, tpBuffers.tensors[0]));
111114
EXPECT_TRUE(torch::equal(tiny, tpBuffers.tensors[1]));
112115
// Test cloned storage
113-
EXPECT_EQ(main.storage().data(), sendingTpMessage.tensors[0].data);
114-
EXPECT_NE(tiny.storage().data(), sendingTpMessage.tensors[1].data);
115-
EXPECT_EQ(tiny.element_size() * k1K, sendingTpMessage.tensors[1].length);
116+
EXPECT_EQ(main.storage().data(), sendingTpMessage.tensors[0].buffer.cpu.ptr);
117+
EXPECT_NE(tiny.storage().data(), sendingTpMessage.tensors[1].buffer.cpu.ptr);
118+
EXPECT_EQ(
119+
tiny.element_size() * k1K, sendingTpMessage.tensors[1].buffer.cpu.length);
116120
}
117121

118122
TEST(TensorpipeSerialize, NoDeleterTensors) {
@@ -136,21 +140,25 @@ TEST(TensorpipeSerialize, NoDeleterTensors) {
136140
EXPECT_EQ(tpBuffers.copiedTensors.size(), 2);
137141
EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
138142
EXPECT_EQ(
139-
tpBuffers.copiedTensors[0].size(), sendingTpMessage.tensors[0].length);
143+
tpBuffers.copiedTensors[0].size(),
144+
sendingTpMessage.tensors[0].buffer.cpu.length);
140145
EXPECT_EQ(
141-
tpBuffers.copiedTensors[1].size(), sendingTpMessage.tensors[1].length);
146+
tpBuffers.copiedTensors[1].size(),
147+
sendingTpMessage.tensors[1].buffer.cpu.length);
142148
EXPECT_EQ(
143-
tpBuffers.copiedTensors[0].data(), sendingTpMessage.tensors[0].data);
149+
tpBuffers.copiedTensors[0].data(),
150+
sendingTpMessage.tensors[0].buffer.cpu.ptr);
144151
EXPECT_EQ(
145-
tpBuffers.copiedTensors[1].data(), sendingTpMessage.tensors[1].data);
152+
tpBuffers.copiedTensors[1].data(),
153+
sendingTpMessage.tensors[1].buffer.cpu.ptr);
146154
EXPECT_TRUE(
147155
memcmp(
148156
tpBuffers.copiedTensors[0].data(),
149157
t1.storage().data(),
150-
sendingTpMessage.tensors[0].length) == 0);
158+
sendingTpMessage.tensors[0].buffer.cpu.length) == 0);
151159
EXPECT_TRUE(
152160
memcmp(
153161
tpBuffers.copiedTensors[1].data(),
154162
t2.storage().data(),
155-
sendingTpMessage.tensors[1].length) == 0);
163+
sendingTpMessage.tensors[1].buffer.cpu.length) == 0);
156164
}

‎third_party/tensorpipe

Submodule tensorpipe updated 110 files

‎third_party/tensorpipe.BUILD

+69-44
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
load("@rules_cc//cc:defs.bzl", "cc_library")
2-
load("@//third_party:substitution.bzl", "template_rule")
2+
load("@//third_party:substitution.bzl", "header_template_rule")
33

44
LIBUV_COMMON_SRCS = [
55
"third_party/libuv/src/fs-poll.c",
@@ -59,62 +59,87 @@ cc_library(
5959
visibility = ["//visibility:public"],
6060
)
6161

62-
proto_library(
63-
name = "tensorpipe_proto_source",
64-
srcs = glob([
65-
"tensorpipe/proto/*.proto",
66-
"tensorpipe/proto/*/*.proto",
67-
]),
68-
visibility = ["//visibility:public"],
69-
)
70-
71-
cc_proto_library(
72-
name = "tensorpipe_protos",
73-
deps = [":tensorpipe_proto_source"],
62+
cc_library(
63+
name = "libnop",
64+
srcs = [],
65+
includes = ["third_party/libnop/include"],
66+
hdrs = glob(["third_party/libnop/include/**/*.h"]),
7467
)
7568

76-
template_rule(
77-
name = "tensorpipe_header_template",
78-
src = "tensorpipe/tensorpipe.h.in",
79-
out = "tensorpipe/tensorpipe.h",
69+
header_template_rule(
70+
name = "tensorpipe_config_header",
71+
src = "tensorpipe/config.h.in",
72+
out = "tensorpipe/config.h",
8073
substitutions = {
81-
"cmakedefine01 TENSORPIPE_HAS_SHM_TRANSPORT": "define TENSORPIPE_HAS_SHM_TRANSPORT 0",
82-
"cmakedefine01 TENSORPIPE_HAS_CMA_CHANNEL": "define TENSORPIPE_HAS_CMA_CHANNEL 0",
74+
"#cmakedefine01 TENSORPIPE_HAS_SHM_TRANSPORT": "",
75+
"#cmakedefine01 TENSORPIPE_HAS_CMA_CHANNEL": "",
76+
"#cmakedefine01 TENSORPIPE_HAS_CUDA_IPC_CHANNEL": "",
77+
"#cmakedefine01 TENSORPIPE_SUPPORTS_CUDA": "",
8378
},
8479
)
8580

81+
TENSORPIPE_HEADERS = glob([
82+
"tensorpipe/*.h",
83+
"tensorpipe/channel/*.h",
84+
"tensorpipe/channel/*/*.h",
85+
"tensorpipe/common/*.h",
86+
"tensorpipe/core/*.h",
87+
"tensorpipe/transport/*.h",
88+
"tensorpipe/transport/*/*.h",
89+
"tensorpipe/util/*/*.h",
90+
])
91+
92+
TENSORPIPE_BASE_SRCS = glob([
93+
"tensorpipe/*.cc",
94+
"tensorpipe/channel/*.cc",
95+
"tensorpipe/common/*.cc",
96+
"tensorpipe/core/*.cc",
97+
"tensorpipe/transport/*.cc",
98+
"tensorpipe/util/*/*.cc",
99+
])
100+
101+
TENSORPIPE_SRCS = TENSORPIPE_BASE_SRCS + glob([
102+
"tensorpipe/channel/basic/*.cc",
103+
"tensorpipe/channel/mpt/*.cc",
104+
"tensorpipe/channel/xth/*.cc",
105+
"tensorpipe/transport/uv/*.cc",
106+
])
107+
108+
TENSORPIPE_SRCS_CUDA = TENSORPIPE_SRCS + glob([
109+
"tensorpipe/channel/cuda_ipc/*.cc",
110+
])
111+
86112
cc_library(
87113
name = "tensorpipe",
88-
srcs = glob(
89-
[
90-
"tensorpipe/*.cc",
91-
"tensorpipe/channel/*.cc",
92-
"tensorpipe/channel/*/*.cc",
93-
"tensorpipe/common/*.cc",
94-
"tensorpipe/core/*.cc",
95-
"tensorpipe/transport/*.cc",
96-
"tensorpipe/transport/*/*.cc",
97-
"tensorpipe/util/*/*.cc",
98-
],
99-
),
100-
hdrs = glob(
101-
[
102-
"tensorpipe/*.h",
103-
"tensorpipe/channel/*.h",
104-
"tensorpipe/channel/*/*.h",
105-
"tensorpipe/common/*.h",
106-
"tensorpipe/core/*.h",
107-
"tensorpipe/transport/*.h",
108-
"tensorpipe/transport/*/*.h",
109-
"tensorpipe/util/*/*.h",
110-
],
111-
),
114+
srcs = TENSORPIPE_SRCS + [":tensorpipe_config_header"],
115+
hdrs = TENSORPIPE_HEADERS,
112116
includes = [
113117
".",
114118
],
115119
copts = [
116120
"-std=c++14",
117121
],
118122
visibility = ["//visibility:public"],
119-
deps = [":tensorpipe_protos", ":libuv"],
123+
deps = [
124+
":libnop",
125+
":libuv",
126+
],
127+
)
128+
129+
cc_library(
130+
name = "tensorpipe_cuda",
131+
srcs = TENSORPIPE_SRCS_CUDA + [":tensorpipe_config_header"],
132+
hdrs = TENSORPIPE_HEADERS,
133+
includes = [
134+
".",
135+
],
136+
copts = [
137+
"-std=c++14",
138+
],
139+
visibility = ["//visibility:public"],
140+
deps = [
141+
":libnop",
142+
":libuv",
143+
"@cuda",
144+
],
120145
)

‎torch/csrc/distributed/rpc/tensorpipe_agent.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
namespace tensorpipe {
1818

19+
class CpuBuffer;
1920
class Context;
2021
class Error;
2122
class Listener;
@@ -30,7 +31,9 @@ class Context;
3031
} // namespace transport
3132

3233
namespace channel {
34+
template <typename TBuffer>
3335
class Context;
36+
using CpuContext = Context<CpuBuffer>;
3437
} // namespace channel
3538

3639
using DeviceMap = std::unordered_map<c10::DeviceIndex, c10::DeviceIndex>;
@@ -53,7 +56,7 @@ struct TransportRegistration {
5356
C10_DECLARE_REGISTRY(TensorPipeTransportRegistry, TransportRegistration);
5457

5558
struct ChannelRegistration {
56-
std::shared_ptr<tensorpipe::channel::Context> channel;
59+
std::shared_ptr<tensorpipe::channel::CpuContext> channel;
5760
int64_t priority;
5861
};
5962

‎torch/csrc/distributed/rpc/tensorpipe_utils.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,17 @@ std::tuple<tensorpipe::Message, TensorpipeWriteBuffers> tensorpipeSerialize(
9999
std::vector<char> storageData(
100100
tensorData.data(), tensorData.data() + tensorData.sizeInBytes());
101101
tpMessage.tensors.push_back(tensorpipe::Message::Tensor{
102-
storageData.data(), storageData.size(), std::move(metadata)});
102+
tensorpipe::CpuBuffer{storageData.data(), storageData.size()},
103+
std::move(metadata)});
103104
buffers.copiedTensors.push_back(std::move(storageData));
104105
} else {
105106
// TensorPipe uses the same Message class for both reading and writing, so
106107
// it uses non-const ptrs even though it doesn't modify them when writing.
107108
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
108109
char* tensorPtr = const_cast<char*>(tensorData.data());
109110
tpMessage.tensors.push_back(tensorpipe::Message::Tensor{
110-
tensorPtr, tensorData.sizeInBytes(), std::move(metadata)});
111+
tensorpipe::CpuBuffer{tensorPtr, tensorData.sizeInBytes()},
112+
std::move(metadata)});
111113
}
112114
}
113115

@@ -152,8 +154,8 @@ TensorpipeReadBuffers tensorpipeAllocate(tensorpipe::Message& tpMessage) {
152154

153155
for (auto& tensor : tpMessage.tensors) {
154156
buffers.tensors.emplace_back(
155-
at::getCPUAllocator()->allocate(tensor.length));
156-
tensor.data = buffers.tensors.back().get();
157+
at::getCPUAllocator()->allocate(tensor.buffer.cpu.length));
158+
tensor.buffer.cpu.ptr = buffers.tensors.back().get();
157159
}
158160

159161
return buffers;

0 commit comments

Comments
 (0)
Please sign in to comment.