-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Accelerated DAGs #48
Conversation
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Isn't this signal very high overhead (basically we will have default transport + custom transport overhead)? Is it necessary if the transport happens under the channel (as an output of each DAG node)?
Good point, especially since custom TCP/RDMA transports primarily improve
latency and not throughput.
…On Thu, Dec 7, 2023, 7:13 AM SangBin Cho ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In reps/2023-12-04-accelerated-dag.md
<#48 (comment)>
:
> +*Vision for Ray Train in Ray 3.0: Ray Train is built on Ray's single-controller model, but with much finer-grained tasks and more control over the execution. The physical execution is as fast as SPMD.*
+
+The "accelerated DAGs" effort aims to keep Ray's flexibility and fault
+tolerance, but: (a) reduce control plane overheads, and (b) support
+specialized communication transports. The basic ideas are to: (a) reuse
+control plane decisions from past executions, (b) support
+application-defined transports such as
+[NCCL](https://developer.nvidia.com/nccl) and
+[UCX](https://openucx.org/). Ultimately, the goal is to
+make Ray into a native execution substrate for distributed ML
+applications.
+
+Architecture
+============
+
+## Compiled DAGs
I think it'd be nice to show concrete API example (for a common
computational pattern like tensor parallel using DAG and custom transport)
in this section?
------------------------------
In reps/2023-12-04-accelerated-dag.md
<#48 (comment)>
:
> +```python
+class Transport:
+ def __init__(self, sender: ray.ActorHandle, receiver: ray.ActorHandle):
+ pass
+ def send(self, send_meta: BufferMetadata):
+ """Ideally async. Called by the sender"""
+ pass
+ def recv(self, recv_meta: BufferMetadata) -> Buffer:
+ """Ideally async. Called by the receiver."""
+ pass
+```
+
+When an application-defined transport is used, we will use the default
+transport to synchronize between the sender and receiver, but we only
+send the BufferMetadata in along the default transport. This will act as
+a signal to begin the actual \`Transport.send\` and \`Transport.recv\`
Isn't this signal very high overhead (basically we will have default
transport + custom transport overhead)? Is it necessary if the transport
happens under the channel (as an output of each DAG node)?
—
Reply to this email directly, view it on GitHub
<#48 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAADUSQMZ6QXA6H4WZOSDCLYIHMKLAVCNFSM6AAAAABAGL4ZWWVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTONZQGMZTINZXGA>
.
You are receiving this because you were assigned.Message ID:
***@***.***>
|
Signed-off-by: Stephanie Wang <[email protected]>
Another TODO: Add some more detail on interaction with load-balancing and autoscaling, ideally some psuedocode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall the motivation makes a lot of sense. Right using the Ray API as the dataplane for ML workloads is a performance loss over other libraries and we should close that gap.
From a serving perspective, there are basically three categories of communication that happens (excluding the pattern used by vLLM where you use multiple GPUs for a single logical replica):
(1) Point-to-point communication passing small messages between two groups of actors (often colocated on the same node). This is the pattern for HTTP proxy -> replica communication and many user-defined applications doing basic model composition.
Requirements:
- Backpressure, load balancing (including adding/removing actors), pipelining.
Non-requirements:
- Dynamic control flow
- Passing output to a downstream actor.
(2) An extension of (1) where there is composition across multiple groups of actors (for example A calls B and passes the output to C). In some cases the output of (B) might be large or on a GPU, so passing it by reference may be helpful.
Requirements:
- Backpressure, load balancing (including adding/removing actors), pipelining.
- Passing outputs to a downstream actor.
Non-requirements:
- Dynamic control flow
(3) An extension of (1) and (2) where there is also dynamic control flow (i.e., users choose the downstream group of actors based on some input or intermediate output).
Requirements:
- Backpressure, load balancing (including adding/removing actors), pipelining.
- Passing outputs to a downstream actor.
- Dynamic control flow.
To me, it's clear that the faster transport layer could be beneficial to all of the above, particularly (1) where we mostly want fast message passing between two actors on the same node.
It's unclear how the optimized DAG as proposed could satisfy the requirements around load balancing and autoscaling across groups of actors. Would be interested to hear if you have any ideas there.
|
||
<img style="background-color:white" src="2023-12-04-accelerated-dag-figures/image2.png"> | ||
|
||
*Left: Instantiation. "Compile" a DAG before the first execution, by allocating buffers for task inputs/outputs and sending the task descriptions to the actor executors. Right: Execute the compiled dataplane. Communication edges can take place over shared memory when local.* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*Left: Instantiation. "Compile" a DAG before the first execution, by allocating buffers for task inputs/outputs and sending the task descriptions to the actor executors. Right: Execute the compiled dataplane. Communication edges can take place over shared memory when local.* | |
*Left: Instantiation. "Compile" a DAG before the first execution, by allocating buffers for task inputs/outputs and sending the task descriptions to the actor executors. Right: Execute the compiled DAG over an optimized dataplane. Communication edges can take place over shared memory when local.* |
sender/receiver are colocated, and shared memory/Ray Core for | ||
cross-node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any plan to experiment with faster transports such as RDMA for cross-node communication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but not for a while probably. The initial focus is GPU communication.
reps/2023-12-04-accelerated-dag.md
Outdated
|-----------------------------------------------------------------|----------------------------------------------------------------------|------------------------------------------------------------------| | ||
| [Pipedream](https://arxiv.org/pdf/1806.03377.pdf) style pipeline-parallel distributed training (PP) | Iterative P2P GPU communication | Performance parity; Increase flexibility of partitioning scheme | | ||
| vLLM pipeline parallelism on heterogeneous GPUs | Asymmetric compute | Reduce implementation burden | | ||
| Fault-tolerant distributed serving | Resume execution w/o restarting everyone | Reduce downtime via greater recovery flexibility | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand the fault tolerance section above, failure handling will be done at the application layer by propagating the error back to the driver. In this case it seems that the entire DAG would need to be re-run. So how would the proposal help address this requirement?
It's also a little unclear to me how important this requirement is for most serving applications. Most often latencies are low (~seconds at most) and failures are rare, so needing to re-run the full inference is not a big problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restarting here meaning the worker processes, not individual tasks.
reps/2023-12-04-accelerated-dag.md
Outdated
| | Key properties / requirements | Goals | | ||
|-----------------------------------------------------------------|----------------------------------------------------------------------|------------------------------------------------------------------| | ||
| vLLM tensor parallelism | | Reduce Ray overheads | | ||
| vLLM pipeline parallelism | P2P/cross-node GPU communication | Reduce (expected) Ray overheads; Validate cross-node performance | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding from vLLM folks' benchmarks was that cross-node GPU communication overhead is too high (even if using optimized transports) so for now they don't have a desire to use it. I could be wrong though, this is hearsay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For tensor parallelism, yes, but I don't think that's the case for pipeline parallelism.
Would the current proposal be viable for even this? As I understand this would require dynamic M:N communication, whereas the compiled dag is assuming a fully static and exclusive communication topology between actors. For example, if group 1 is a single actor dispatching work to group 2 actors, that would work (we could have multiple DAGs capturing "1->2" communication), but if group 1 had multiple dispatching actors then that would break the assumption that each actor doing work is part of a most one DAG at a time. |
I have impression the custom/faster transport has to be decoupled from DAG. It is a requirement for DAG, but DAG shouldn't be a requirement to use custom/faster transport. I think it could be easily implemented with existing ray API if we just establish the channel when the first communication is initiated. And if it works with existing Ray API, we don't have to support ^ from core? |
Yes you're correct @ericl. To optimize the basic case of M:N communication we essentially just want the fast transport layer. Whether we get that through the DAG abstraction (single-node DAG) or use the lower-level primitive directly doesn't really matter too much. |
Signed-off-by: Stephanie Wang <[email protected]>
reps/2023-12-04-accelerated-dag.md
Outdated
|
||
with InputNode() as inp: | ||
with InputGPUChannel() as gpu_channel: | ||
_, tensors = producer.produce_all_sync.bind(inp.input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This syntax is pretty strange, I'd expect something more like tensors.set_device("gpu")
or tensors.set_channel_impl(GPUChannel)
.
Ideally, this would be done automatically, but I'm not sure we can detect the return type is a cuda array easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, tensors.set_channel_impl
seems good. Mainly I want there to be a way to pass a specific Channel or Channel type, so tensors.set_device("gpu")
seems less ideal.
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vote on ray-committers just passed
No description provided.