Skip to content

Commit 5211fb9

Browse files
mrshenlifacebook-github-bot
authored andcommittedSep 25, 2020
Remove device maps from TensorPipe for v1.7 release (pytorch#45353)
Summary: Pull Request resolved: pytorch#45353 Temporarily removing this feature, will add this back after branch cut. Test Plan: Imported from OSS Reviewed By: rohan-varma Differential Revision: D23939865 Pulled By: mrshenli fbshipit-source-id: 7dceaffea6b9a16512b5ba6036da73e7f8f83a8e
1 parent 439930c commit 5211fb9

File tree

6 files changed

+51
-544
lines changed

6 files changed

+51
-544
lines changed
 

‎torch/csrc/distributed/rpc/init.cpp

+3-11
Original file line numberDiff line numberDiff line change
@@ -483,28 +483,20 @@ PyObject* rpc_init(PyObject* /* unused */) {
483483
optional<std::vector<std::string>>,
484484
optional<std::vector<std::string>>,
485485
float,
486-
std::string,
487-
std::unordered_map<std::string, tensorpipe::DeviceMap>>(),
486+
std::string>(),
488487
py::arg("num_worker_threads") = kDefaultNumWorkerThreads,
489488
py::arg("_transports") = optional<std::vector<std::string>>(),
490489
py::arg("_channels") = optional<std::vector<std::string>>(),
491490
py::arg("rpc_timeout") = kDefaultRpcTimeoutSeconds,
492-
py::arg("init_method") = kDefaultInitMethod,
493-
py::arg("device_maps") =
494-
std::unordered_map<std::string, tensorpipe::DeviceMap>())
491+
py::arg("init_method") = kDefaultInitMethod)
495492
.def_readwrite(
496493
"num_worker_threads",
497494
&TensorPipeRpcBackendOptions::numWorkerThreads,
498495
R"(
499496
The number of threads in the thread-pool used by
500497
:class:`~torch.distributed.rpc.TensorPipeAgent` to execute
501498
requests.
502-
)")
503-
.def_readwrite(
504-
"device_maps",
505-
&TensorPipeRpcBackendOptions::deviceMaps,
506-
R"(The device map locations.)")
507-
.def("set_device_map", &TensorPipeRpcBackendOptions::setDeviceMap);
499+
)");
508500

509501
module.attr("_DEFAULT_NUM_WORKER_THREADS") =
510502
py::cast(kDefaultNumWorkerThreads);

‎torch/csrc/distributed/rpc/tensorpipe_agent.cpp

+13-41
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,9 @@ const std::string kServerActiveAsyncCalls = "agent.server_active_async_calls";
3232
inline void checkCPUTensor(const torch::Tensor& tensor) {
3333
TORCH_CHECK(
3434
tensor.device() == at::kCPU,
35-
"TensorPipeAgent only supports CPU tensors by default. Sending "
36-
"GPU tensors using RPC requires explicitly configurations using "
37-
"`set_device_map` on `TensorPipeRpcBackendOptions`. Got a tensor "
38-
"with device ",
39-
tensor.device(),
40-
", but no device map is specified.");
35+
"TensorPipe RPC backend only supports CPU tensors, please move your ",
36+
"tensors to CPU before sending them over RPC. Found tensor on device: ",
37+
tensor.device());
4138
}
4239

4340
std::vector<c10::DeviceIndex> getDevicesForTensors(
@@ -480,41 +477,16 @@ void TensorPipeAgent::sendCompletedResponseMessage(
480477
Message&& responseMessage = std::move(*futureResponseMessage).moveValue();
481478
responseMessage.setId(messageId);
482479
if (!error) {
483-
const auto& iter = reverseDeviceMaps_.find(pipe->getRemoteName());
484-
if (iter == opts_.deviceMaps.end()) {
485-
for (const auto& t : responseMessage.tensors()) {
486-
if (!t.device().is_cpu()) {
487-
responseMessage = createExceptionResponse(
488-
c10::str(
489-
"TensorPipe RPC backend only supports CPU tensors by default,"
490-
" please move your tensors to CPU before sending them over "
491-
"RPC, or call `set_device_map` on "
492-
"`TensorPipeRpcBackendOptions` to explicitly configure "
493-
"device mapping. Response device mapping is not available for "
494-
"destination ",
495-
pipe->getRemoteName(),
496-
", but found tensor on device: ",
497-
t.device()),
498-
responseMessage.id());
499-
break;
500-
}
501-
}
502-
} else {
503-
const auto& deviceMap = iter->second;
504-
for (const auto& t : responseMessage.tensors()) {
505-
if (!t.device().is_cpu() &&
506-
deviceMap.find(t.device().index()) == deviceMap.end()) {
507-
responseMessage = createExceptionResponse(
508-
c10::str(
509-
"TensorPipe RPC backend only supports CPU tensors by default."
510-
" Response device mapping is not available for destination ",
511-
pipe->getRemoteName(),
512-
" for device ",
513-
t.device(),
514-
" but received a tensor on that device."),
515-
responseMessage.id());
516-
break;
517-
}
480+
for (const auto& tensor : responseMessage.tensors()) {
481+
if (!tensor.device().is_cpu()) {
482+
responseMessage = createExceptionResponse(
483+
c10::str(
484+
"TensorPipe RPC backend only supports CPU tensors, please ",
485+
"move your tensors to CPU before sending them over RPC. Found ",
486+
"tensor on device: ",
487+
tensor.device()),
488+
responseMessage.id());
489+
break;
518490
}
519491
}
520492

‎torch/distributed/rpc/__init__.py

-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ def is_available():
1919
raise RuntimeError("Failed to initialize torch.distributed.rpc")
2020

2121

22-
23-
2422
if is_available():
2523
from . import api, backend_registry, functions, _set_profiler_node_id
2624
from . import (

‎torch/distributed/rpc/backend_registry.py

+1-65
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@
33
from datetime import timedelta
44
import enum
55

6-
import torch
76
import torch.distributed as dist
87

9-
from . import api
108
from . import constants as rpc_constants
119

1210

@@ -185,57 +183,6 @@ def _tensorpipe_construct_rpc_backend_options_handler(
185183
)
186184

187185

188-
# detect if any worker has invalid device_map configurations, and return
189-
# names of failed workers
190-
def _tensorpipe_check_device_maps(agent, device_maps):
191-
if device_maps is None:
192-
device_maps = {}
193-
194-
def check_one_worker(name, device_maps, all_device_counts):
195-
device_count = all_device_counts[name]
196-
wrong_worker_names = set(device_maps) - set(all_device_counts)
197-
if wrong_worker_names:
198-
raise ValueError(f"Wrong worker names: {wrong_worker_names}")
199-
for worker_name in all_device_counts:
200-
remote_device_count = all_device_counts[worker_name]
201-
if worker_name in device_maps:
202-
device_map = device_maps[worker_name]
203-
key_set = set(device_map.keys())
204-
val_set = set(device_map.values())
205-
if not all([
206-
len(device_map) == len(key_set),
207-
len(device_map) == len(val_set), # check 1-to-1 mapping
208-
min(key_set) >= 0,
209-
max(key_set) < device_count, # check local range
210-
min(val_set) >= 0,
211-
max(val_set) < remote_device_count # check remote range
212-
]):
213-
raise ValueError(
214-
f"Invalid device_map configuration on {name}:\n"
215-
f"device_maps = {device_maps}"
216-
)
217-
218-
gathered = api._all_gather([torch.cuda.device_count(), device_maps])
219-
all_device_counts = {name: gathered[name][0] for name in gathered}
220-
all_device_maps = {name: gathered[name][1] for name in gathered}
221-
for worker_name in all_device_maps:
222-
worker_device_maps = all_device_maps[worker_name]
223-
check_one_worker(worker_name, worker_device_maps, all_device_counts)
224-
225-
# passed all checked, construct reverse mapping for return values
226-
reverse_device_maps = {}
227-
local_name = api.get_worker_info().name
228-
for worker_name in all_device_maps:
229-
remote_device_maps = all_device_maps[worker_name]
230-
if local_name in remote_device_maps:
231-
remote_device_map = remote_device_maps[local_name]
232-
reverse_device_maps[worker_name] = {
233-
remote_device_map[k]: k for k in remote_device_map
234-
}
235-
236-
agent._set_reverse_device_maps(reverse_device_maps)
237-
238-
239186
def _tensorpipe_init_backend_handler(store, name, rank, world_size, rpc_backend_options):
240187
from . import TensorPipeRpcBackendOptions
241188
from . import TensorPipeAgent
@@ -259,21 +206,10 @@ def _tensorpipe_init_backend_handler(store, name, rank, world_size, rpc_backend_
259206
group = _init_process_group(store, rank, world_size)
260207

261208
# TODO: add try-except and destroy _agent in all processes if any fails.
262-
agent = TensorPipeAgent(
209+
return TensorPipeAgent(
263210
store, name, rank, world_size, group, rpc_backend_options
264211
)
265212

266-
api._init_rpc_states(agent)
267-
268-
try:
269-
_tensorpipe_check_device_maps(agent, rpc_backend_options.device_maps)
270-
agent.join()
271-
except Exception:
272-
api.shutdown()
273-
raise
274-
275-
return agent
276-
277213

278214
register_backend(
279215
"TENSORPIPE",

‎torch/distributed/rpc/options.py

+2-76
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
from . import _TensorPipeRpcBackendOptionsBase
22
from . import constants as rpc_contants
33

4-
import torch
5-
6-
from typing import Dict, List
4+
from typing import List
75

86

97
class TensorPipeRpcBackendOptions(_TensorPipeRpcBackendOptionsBase):
@@ -27,19 +25,13 @@ class TensorPipeRpcBackendOptions(_TensorPipeRpcBackendOptionsBase):
2725
store used for rendezvous. It takes any value accepted for the
2826
same argument of :meth:`~torch.distributed.init_process_group`
2927
(default: ``env://``).
30-
device_maps (Dict[str, Dict]): Device placement mappings from this
31-
worker to the callee. Key is the callee worker name and value the
32-
dictionary (``Dict`` of ``int``, ``str``, or ``torch.device``) that
33-
maps this worker's devices to the callee worker's devices.
34-
(default: ``None``)
3528
"""
3629
def __init__(
3730
self,
3831
*,
3932
num_worker_threads: int = rpc_contants.DEFAULT_NUM_WORKER_THREADS,
4033
rpc_timeout: float = rpc_contants.DEFAULT_RPC_TIMEOUT_SEC,
4134
init_method: str = rpc_contants.DEFAULT_INIT_METHOD,
42-
device_maps: Dict = None,
4335
_transports: List = None,
4436
_channels: List = None,
4537
):
@@ -48,71 +40,5 @@ def __init__(
4840
_transports,
4941
_channels,
5042
rpc_timeout,
51-
init_method,
52-
device_maps if device_maps else {}
43+
init_method
5344
)
54-
55-
def set_device_map(self, to: str, device_map: Dict):
56-
r"""
57-
Set device mapping between each RPC caller and callee pair. This
58-
function can be called multiple times to incrementally add
59-
device placement configurations.
60-
61-
Arguments:
62-
worker_name (str): Callee name.
63-
device_map (Dict of int, str, or torch.device): Device placement
64-
mappings from this worker to the callee. This map must be
65-
invertible.
66-
67-
Example::
68-
>>> # both workers
69-
>>> def add(x, y):
70-
>>> print(x) # tensor([1., 1.], device='cuda:1')
71-
>>> return x + y, (x + y).to(2)
72-
>>>
73-
>>> # on worker 0
74-
>>> options = TensorPipeRpcBackendOptions(
75-
>>> num_worker_threads=8,
76-
>>> device_maps={"worker1": {0, 1}}
77-
>>> # maps worker0's cuda:0 to worker1's cuda:1
78-
>>> )
79-
>>> options.set_device_map("worker1", {1, 2})
80-
>>> # maps worker0's cuda:1 to worker1's cuda:2
81-
>>>
82-
>>> rpc.init_rpc(
83-
>>> "worker0",
84-
>>> rank=0,
85-
>>> world_size=2
86-
>>> backend=rpc.BackendType.TENSORPIPE,
87-
>>> rpc_backend_options=options
88-
>>> )
89-
>>>
90-
>>> x = torch.ones(2)
91-
>>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
92-
>>> # The first argument will be moved to cuda:1 on worker1. When
93-
>>> # sending the return value back, it will follow the invert of
94-
>>> # the device map, and hence will be moved back to cuda:0 and
95-
>>> # cuda:1 on worker0
96-
>>> print(rets[0]) # tensor([2., 2.], device='cuda:0')
97-
>>> print(rets[0]) # tensor([2., 2.], device='cuda:1')
98-
"""
99-
device_index_map = {}
100-
curr_device_maps = super().device_maps
101-
for k in device_map:
102-
v = device_map[k]
103-
k, v = torch.device(k), torch.device(v)
104-
if k.type != 'cuda' or v.type != 'cuda':
105-
raise ValueError(
106-
"`set_device_map` only supports CUDA devices, "
107-
f"but got device pair {k}: {v}"
108-
109-
)
110-
if to in curr_device_maps and k.index in curr_device_maps[to]:
111-
curr_v = super().device_maps[to][k.index]
112-
if curr_v != v.index:
113-
raise ValueError(
114-
"`set_device_map` only supports 1-to-1 mapping, "
115-
f"trying to map {k} to {v} and {curr_v}"
116-
)
117-
device_index_map[k.index] = v.index
118-
super().set_device_map(to, device_index_map)

‎torch/testing/_internal/distributed/rpc/rpc_test.py

+32-349
Original file line numberDiff line numberDiff line change
@@ -2876,6 +2876,38 @@ def _return_gpu_tensor_list():
28762876
def _gpu_tensor_list_arg(tensor_list):
28772877
return torch.rand(3, 3)
28782878

2879+
@skip_if_lt_x_gpu(2)
2880+
@dist_init
2881+
def test_cuda(self):
2882+
dst = worker_name((self.rank + 1) % self.world_size)
2883+
t1 = torch.rand(3, 3).cuda(0)
2884+
t2 = torch.rand(3, 3).cuda(1)
2885+
t3 = torch.rand(3, 3)
2886+
2887+
# cuda tensors as args fail.
2888+
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
2889+
rpc.rpc_sync(dst, torch.add, args=(t1, t2))
2890+
2891+
# mix of cpu and cuda tensors as args fail.
2892+
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
2893+
rpc.rpc_sync(dst, torch.add, args=(t1, t3))
2894+
2895+
# gpu tensor list as args fails.
2896+
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
2897+
rpc.rpc_sync(dst, RpcTest._gpu_tensor_list_arg, args=([t1, t2]))
2898+
2899+
# cuda tensors as return values fail.
2900+
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
2901+
rpc.rpc_sync(dst, RpcTest._return_gpu_tensor, args=())
2902+
2903+
# cuda tensors as a list of return value fails
2904+
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
2905+
rpc.rpc_sync(dst, RpcTest._return_gpu_tensor_list, args=())
2906+
2907+
# Sending to self should fail too.
2908+
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
2909+
rpc.rpc_sync(worker_name(self.rank), torch.add, args=(t1, t2))
2910+
28792911
def _create_rref(self):
28802912
owner_rank = (self.rank + 2) % self.world_size
28812913
return rpc.remote(
@@ -3609,39 +3641,6 @@ def test_logs_deprecation_warning(self):
36093641
"\n".join(cm.output),
36103642
)
36113643

3612-
@skip_if_lt_x_gpu(2)
3613-
@dist_init
3614-
def test_cuda(self):
3615-
dst = worker_name((self.rank + 1) % self.world_size)
3616-
t1 = torch.rand(3, 3).cuda(0)
3617-
t2 = torch.rand(3, 3).cuda(1)
3618-
t3 = torch.rand(3, 3)
3619-
3620-
# cuda tensors as args fail.
3621-
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
3622-
rpc.rpc_sync(dst, torch.add, args=(t1, t2))
3623-
3624-
# mix of cpu and cuda tensors as args fail.
3625-
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
3626-
rpc.rpc_sync(dst, torch.add, args=(t1, t3))
3627-
3628-
# gpu tensor list as args fails.
3629-
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
3630-
rpc.rpc_sync(dst, RpcTest._gpu_tensor_list_arg, args=([t1, t2]))
3631-
3632-
# cuda tensors as return values fail.
3633-
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
3634-
rpc.rpc_sync(dst, RpcTest._return_gpu_tensor, args=())
3635-
3636-
# cuda tensors as a list of return value fails
3637-
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
3638-
rpc.rpc_sync(dst, RpcTest._return_gpu_tensor_list, args=())
3639-
3640-
# Sending to self should fail too.
3641-
with self.assertRaisesRegex(RuntimeError, "RPC backend only supports CPU tensors.*Found tensor on device: cuda:0"):
3642-
rpc.rpc_sync(worker_name(self.rank), torch.add, args=(t1, t2))
3643-
3644-
36453644
def test_single_threaded_rref_owner(self):
36463645
# We need a process group in order to perform a barrier at the end.
36473646
dist.init_process_group(
@@ -4288,319 +4287,3 @@ def test_tensorpipe_options_throw_on_timedelta_timeout(self):
42884287
num_worker_threads=self.rpc_backend_options.num_worker_threads,
42894288
rpc_timeout=timeout,
42904289
)
4291-
4292-
def _test_device_maps(self, options, errMsg="Invalid device_map"):
4293-
with self.assertRaisesRegex(ValueError, errMsg):
4294-
rpc.init_rpc(
4295-
name=worker_name(self.rank),
4296-
backend=self.rpc_backend,
4297-
rank=self.rank,
4298-
world_size=self.world_size,
4299-
rpc_backend_options=options,
4300-
)
4301-
4302-
self.assertFalse(rpc.api._is_current_rpc_agent_set())
4303-
4304-
@skip_if_lt_x_gpu(2)
4305-
def test_device_maps_wrong_worker_name(self):
4306-
options = self.rpc_backend_options
4307-
options.set_device_map("none_exist", {0: 1})
4308-
self._test_device_maps(options, "Wrong worker names")
4309-
4310-
@skip_if_lt_x_gpu(1)
4311-
def test_device_maps_invalid_max_local_device(self):
4312-
options = self.rpc_backend_options
4313-
dst = worker_name((self.rank + 1) % self.world_size)
4314-
options.set_device_map(dst, {torch.cuda.device_count(): 0})
4315-
4316-
self._test_device_maps(options)
4317-
4318-
@skip_if_lt_x_gpu(1)
4319-
def test_device_maps_invalid_max_remote_device(self):
4320-
options = self.rpc_backend_options
4321-
dst = worker_name((self.rank + 1) % self.world_size)
4322-
options.set_device_map(dst, {0: torch.cuda.device_count()})
4323-
4324-
self._test_device_maps(options)
4325-
4326-
@skip_if_lt_x_gpu(2)
4327-
def test_device_maps_many_to_one(self):
4328-
options = self.rpc_backend_options
4329-
dst = worker_name((self.rank + 1) % self.world_size)
4330-
options.set_device_map(dst, {1: 0})
4331-
options.set_device_map(dst, {0: 0})
4332-
4333-
self._test_device_maps(options)
4334-
4335-
@skip_if_lt_x_gpu(2)
4336-
def test_device_maps_one_to_many(self):
4337-
if self.rank == 0:
4338-
options = self.rpc_backend_options
4339-
dst = worker_name((self.rank + 1) % self.world_size)
4340-
options.set_device_map(dst, {0: 1})
4341-
with self.assertRaisesRegex(
4342-
ValueError, "`set_device_map` only supports 1-to-1 mapping"
4343-
):
4344-
options.set_device_map(dst, {0: 0})
4345-
4346-
@skip_if_lt_x_gpu(1)
4347-
def test_device_maps_invalid_min_device(self):
4348-
options = self.rpc_backend_options
4349-
dst = worker_name((self.rank + 1) % self.world_size)
4350-
with self.assertRaisesRegex(
4351-
RuntimeError, "Device index must not be negative"
4352-
):
4353-
options.set_device_map(dst, {-1: 0})
4354-
4355-
with self.assertRaisesRegex(
4356-
RuntimeError, "Device index must not be negative"
4357-
):
4358-
options.set_device_map(dst, {0: -1})
4359-
4360-
@staticmethod
4361-
def _gpu_add(x, y):
4362-
if all([x.is_cuda, x.device.index == 1, y.is_cuda, y.device.index == 1]):
4363-
return (x + y).to(0)
4364-
else:
4365-
raise ValueError("Wrong device affinity")
4366-
4367-
@skip_if_lt_x_gpu(2)
4368-
def test_device_maps_gpu(self):
4369-
options = self.rpc_backend_options
4370-
dst = worker_name((self.rank + 1) % self.world_size)
4371-
options.set_device_map(dst, {0: 1, 1: 0})
4372-
4373-
rpc.init_rpc(
4374-
name=worker_name(self.rank),
4375-
backend=self.rpc_backend,
4376-
rank=self.rank,
4377-
world_size=self.world_size,
4378-
rpc_backend_options=options,
4379-
)
4380-
4381-
ret = rpc.rpc_sync(
4382-
dst,
4383-
TensorPipeAgentRpcTest._gpu_add,
4384-
args=(torch.zeros(2).to(0), torch.ones(2).to(0))
4385-
)
4386-
self.assertEqual(ret.device, torch.device(1))
4387-
self.assertEqual(ret, (torch.zeros(2) + torch.ones(2)).to(1))
4388-
rpc.shutdown()
4389-
4390-
@staticmethod
4391-
def _gpu_add_multi_gpu(x, y):
4392-
if all([x.is_cuda, x.device.index == 0, y.is_cuda, y.device.index == 1]):
4393-
return x + y.to(0), x.to(1) - y
4394-
else:
4395-
raise ValueError("Wrong device affinity")
4396-
4397-
def _test_device_maps_multi_gpu(self, dst):
4398-
options = self.rpc_backend_options
4399-
options.set_device_map(dst, {1: 0})
4400-
options.set_device_map(dst, {0: 1})
4401-
4402-
rpc.init_rpc(
4403-
name=worker_name(self.rank),
4404-
backend=self.rpc_backend,
4405-
rank=self.rank,
4406-
world_size=self.world_size,
4407-
rpc_backend_options=options,
4408-
)
4409-
4410-
rets = rpc.rpc_sync(
4411-
dst,
4412-
TensorPipeAgentRpcTest._gpu_add_multi_gpu,
4413-
args=(torch.zeros(2).to(1), torch.ones(2).to(0))
4414-
)
4415-
self.assertEqual(rets[0].device, torch.device(1))
4416-
self.assertEqual(rets[1].device, torch.device(0))
4417-
self.assertEqual(rets[0], (torch.zeros(2) + torch.ones(2)).to(1))
4418-
self.assertEqual(rets[1], (torch.zeros(2) - torch.ones(2)).to(0))
4419-
rpc.shutdown()
4420-
4421-
@skip_if_lt_x_gpu(2)
4422-
def test_device_maps_multi_gpu(self):
4423-
dst = worker_name((self.rank + 1) % self.world_size)
4424-
self._test_device_maps_multi_gpu(dst)
4425-
4426-
@skip_if_lt_x_gpu(2)
4427-
def test_device_maps_multi_gpu_self(self):
4428-
dst = worker_name(self.rank)
4429-
self._test_device_maps_multi_gpu(dst)
4430-
4431-
@staticmethod
4432-
def _gpu_add_return_to_gpu(x, y):
4433-
if x.device.type == 'cpu' and y.device.type == 'cpu':
4434-
return (x + y).to(0), (x - y).to(1), (x * y).to(2), (x / y).to(3)
4435-
else:
4436-
raise ValueError("Wrong device affinity")
4437-
4438-
@skip_if_lt_x_gpu(2)
4439-
def test_device_maps_in_options(self):
4440-
dst = worker_name((self.rank + 1) % self.world_size)
4441-
options = self.rpc_backend_options
4442-
4443-
rpc.init_rpc(
4444-
name=worker_name(self.rank),
4445-
backend=self.rpc_backend,
4446-
rank=self.rank,
4447-
world_size=self.world_size,
4448-
rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
4449-
init_method=options.init_method,
4450-
num_worker_threads=options.num_worker_threads,
4451-
device_maps={dst: {0: 1, 1: 0}}
4452-
)
4453-
)
4454-
4455-
rets = rpc.rpc_sync(
4456-
dst,
4457-
TensorPipeAgentRpcTest._gpu_add_multi_gpu,
4458-
args=(torch.zeros(2).to(1), torch.ones(2).to(0))
4459-
)
4460-
self.assertEqual(rets[0].device, torch.device(1))
4461-
self.assertEqual(rets[1].device, torch.device(0))
4462-
self.assertEqual(rets[0], (torch.zeros(2) + torch.ones(2)).to(1))
4463-
self.assertEqual(rets[1], (torch.zeros(2) - torch.ones(2)).to(0))
4464-
rpc.shutdown()
4465-
4466-
def _test_device_maps_return_to_gpu(self, dst):
4467-
options = self.rpc_backend_options
4468-
4469-
options.set_device_map(dst, {0: 1})
4470-
options.set_device_map(dst, {1: 2})
4471-
options.set_device_map(dst, {2: 3})
4472-
options.set_device_map(dst, {3: 0})
4473-
4474-
rpc.init_rpc(
4475-
name=worker_name(self.rank),
4476-
backend=self.rpc_backend,
4477-
rank=self.rank,
4478-
world_size=self.world_size,
4479-
rpc_backend_options=options,
4480-
)
4481-
4482-
rets = rpc.rpc_sync(
4483-
dst,
4484-
TensorPipeAgentRpcTest._gpu_add_return_to_gpu,
4485-
args=(torch.zeros(2), torch.ones(2))
4486-
)
4487-
for i in range(len(rets)):
4488-
self.assertEqual(rets[i].device, torch.device((3 + i) % 4))
4489-
self.assertEqual(rets[0], (torch.zeros(2) + torch.ones(2)).to(3))
4490-
self.assertEqual(rets[1], (torch.zeros(2) - torch.ones(2)).to(0))
4491-
self.assertEqual(rets[2], (torch.zeros(2) * torch.ones(2)).to(1))
4492-
self.assertEqual(rets[3], (torch.zeros(2) / torch.ones(2)).to(2))
4493-
rpc.shutdown()
4494-
4495-
@skip_if_lt_x_gpu(4)
4496-
def test_device_maps_return_to_gpu(self):
4497-
dst = worker_name((self.rank + 1) % self.world_size)
4498-
self._test_device_maps_return_to_gpu(dst)
4499-
4500-
@skip_if_lt_x_gpu(4)
4501-
def test_device_maps_return_to_gpu_self(self):
4502-
dst = worker_name(self.rank)
4503-
self._test_device_maps_return_to_gpu(dst)
4504-
4505-
@staticmethod
4506-
def _add_to_gpu(x, y):
4507-
return (x + y).to(0)
4508-
4509-
def _test_device_maps_missing_config(self, mode):
4510-
dst = worker_name((self.rank + 1) % self.world_size)
4511-
errMsg = (
4512-
"TensorPipeAgent only supports CPU tensors by default.*"
4513-
"`set_device_map` on `TensorPipeRpcBackendOptions`"
4514-
)
4515-
4516-
with self.assertRaisesRegex(RuntimeError, errMsg):
4517-
if mode == RPCExecMode.SYNC:
4518-
rpc.rpc_sync(dst, torch.add, args=(torch.zeros(2).to(0), 1))
4519-
elif mode == RPCExecMode.REMOTE:
4520-
rpc.remote(dst, torch.add, args=(torch.zeros(2).to(0), 1)).to_here()
4521-
else:
4522-
raise ValueError(f"unexpected mode {mode}")
4523-
4524-
# make sure RPC is still functioning
4525-
ret = rpc.rpc_sync(dst, torch.add, args=(torch.ones(2), 1))
4526-
self.assertEqual(ret, torch.ones(2) + 1)
4527-
4528-
def _test_device_maps_missing_config_response(self, mode):
4529-
dst = worker_name((self.rank + 1) % self.world_size)
4530-
errMsg = "Response device mapping is not available"
4531-
4532-
with self.assertRaisesRegex(RuntimeError, errMsg):
4533-
if mode == RPCExecMode.SYNC:
4534-
rpc.rpc_sync(
4535-
dst,
4536-
TensorPipeAgentRpcTest._add_to_gpu,
4537-
args=(torch.zeros(2), 1)
4538-
)
4539-
elif mode == RPCExecMode.REMOTE:
4540-
rpc.remote(
4541-
dst,
4542-
TensorPipeAgentRpcTest._add_to_gpu,
4543-
args=(torch.zeros(2), 1)
4544-
).to_here()
4545-
else:
4546-
raise ValueError(f"unexpected mode {mode}")
4547-
4548-
# make sure RPC is still functioning
4549-
ret = rpc.rpc_sync(dst, torch.add, args=(torch.ones(2), 1))
4550-
self.assertEqual(ret, torch.ones(2) + 1)
4551-
4552-
@skip_if_lt_x_gpu(1)
4553-
@dist_init
4554-
def test_device_maps_missing_config(self):
4555-
self._test_device_maps_missing_config(RPCExecMode.SYNC)
4556-
4557-
@skip_if_lt_x_gpu(1)
4558-
@dist_init
4559-
def test_device_maps_missing_config_loop(self):
4560-
for _ in range(self.rpc_backend_options.num_worker_threads + 5):
4561-
self._test_device_maps_missing_config(RPCExecMode.SYNC)
4562-
4563-
@skip_if_lt_x_gpu(1)
4564-
@dist_init
4565-
def test_device_maps_missing_config_response(self):
4566-
self._test_device_maps_missing_config_response(RPCExecMode.SYNC)
4567-
4568-
@skip_if_lt_x_gpu(1)
4569-
@dist_init
4570-
def test_device_maps_missing_config_response_loop(self):
4571-
for _ in range(self.rpc_backend_options.num_worker_threads + 5):
4572-
self._test_device_maps_missing_config_response(RPCExecMode.SYNC)
4573-
4574-
@skip_if_lt_x_gpu(1)
4575-
@dist_init
4576-
def test_device_maps_missing_config_remote(self):
4577-
self._test_device_maps_missing_config(RPCExecMode.REMOTE)
4578-
4579-
@skip_if_lt_x_gpu(1)
4580-
@dist_init
4581-
def test_device_maps_missing_config_remote_response(self):
4582-
self._test_device_maps_missing_config_response(RPCExecMode.REMOTE)
4583-
4584-
@skip_if_lt_x_gpu(2)
4585-
def test_device_maps_remote(self):
4586-
options = self.rpc_backend_options
4587-
dst = worker_name((self.rank + 1) % self.world_size)
4588-
options.set_device_map(dst, {1: 0})
4589-
4590-
rpc.init_rpc(
4591-
name=worker_name(self.rank),
4592-
backend=self.rpc_backend,
4593-
rank=self.rank,
4594-
world_size=self.world_size,
4595-
rpc_backend_options=options,
4596-
)
4597-
4598-
rref = rpc.remote(
4599-
dst,
4600-
TensorPipeAgentRpcTest._add_to_gpu,
4601-
args=(torch.zeros(2), 1)
4602-
)
4603-
4604-
self.assertEqual(rref.to_here(), torch.ones(2).to(1))
4605-
4606-
rpc.shutdown()

0 commit comments

Comments
 (0)
Please sign in to comment.