Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Accelerated DAGs #48

Merged
merged 11 commits into from
Mar 21, 2024
Merged

RFC: Accelerated DAGs #48

merged 11 commits into from
Mar 21, 2024

Conversation

stephanie-wang
Copy link
Contributor

No description provided.

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang changed the title wip RFC: Accelerated DAGs Dec 4, 2023
@stephanie-wang stephanie-wang marked this pull request as ready for review December 4, 2023 18:30
Signed-off-by: Stephanie Wang <[email protected]>
@ericl
Copy link
Contributor

ericl commented Dec 7, 2023 via email

Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang
Copy link
Contributor Author

Another TODO: Add some more detail on interaction with load-balancing and autoscaling, ideally some psuedocode.

Copy link
Contributor

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the motivation makes a lot of sense. Right using the Ray API as the dataplane for ML workloads is a performance loss over other libraries and we should close that gap.

From a serving perspective, there are basically three categories of communication that happens (excluding the pattern used by vLLM where you use multiple GPUs for a single logical replica):

(1) Point-to-point communication passing small messages between two groups of actors (often colocated on the same node). This is the pattern for HTTP proxy -> replica communication and many user-defined applications doing basic model composition.

Requirements:

  • Backpressure, load balancing (including adding/removing actors), pipelining.

Non-requirements:

  • Dynamic control flow
  • Passing output to a downstream actor.

(2) An extension of (1) where there is composition across multiple groups of actors (for example A calls B and passes the output to C). In some cases the output of (B) might be large or on a GPU, so passing it by reference may be helpful.

Requirements:

  • Backpressure, load balancing (including adding/removing actors), pipelining.
  • Passing outputs to a downstream actor.

Non-requirements:

  • Dynamic control flow

(3) An extension of (1) and (2) where there is also dynamic control flow (i.e., users choose the downstream group of actors based on some input or intermediate output).

Requirements:

  • Backpressure, load balancing (including adding/removing actors), pipelining.
  • Passing outputs to a downstream actor.
  • Dynamic control flow.

To me, it's clear that the faster transport layer could be beneficial to all of the above, particularly (1) where we mostly want fast message passing between two actors on the same node.

It's unclear how the optimized DAG as proposed could satisfy the requirements around load balancing and autoscaling across groups of actors. Would be interested to hear if you have any ideas there.


<img style="background-color:white" src="2023-12-04-accelerated-dag-figures/image2.png">

*Left: Instantiation. "Compile" a DAG before the first execution, by allocating buffers for task inputs/outputs and sending the task descriptions to the actor executors. Right: Execute the compiled dataplane. Communication edges can take place over shared memory when local.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*Left: Instantiation. "Compile" a DAG before the first execution, by allocating buffers for task inputs/outputs and sending the task descriptions to the actor executors. Right: Execute the compiled dataplane. Communication edges can take place over shared memory when local.*
*Left: Instantiation. "Compile" a DAG before the first execution, by allocating buffers for task inputs/outputs and sending the task descriptions to the actor executors. Right: Execute the compiled DAG over an optimized dataplane. Communication edges can take place over shared memory when local.*

Comment on lines +224 to +225
sender/receiver are colocated, and shared memory/Ray Core for
cross-node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any plan to experiment with faster transports such as RDMA for cross-node communication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but not for a while probably. The initial focus is GPU communication.

|-----------------------------------------------------------------|----------------------------------------------------------------------|------------------------------------------------------------------|
| [Pipedream](https://arxiv.org/pdf/1806.03377.pdf) style pipeline-parallel distributed training (PP) | Iterative P2P GPU communication | Performance parity; Increase flexibility of partitioning scheme |
| vLLM pipeline parallelism on heterogeneous GPUs | Asymmetric compute | Reduce implementation burden |
| Fault-tolerant distributed serving | Resume execution w/o restarting everyone | Reduce downtime via greater recovery flexibility |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand the fault tolerance section above, failure handling will be done at the application layer by propagating the error back to the driver. In this case it seems that the entire DAG would need to be re-run. So how would the proposal help address this requirement?

It's also a little unclear to me how important this requirement is for most serving applications. Most often latencies are low (~seconds at most) and failures are rare, so needing to re-run the full inference is not a big problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restarting here meaning the worker processes, not individual tasks.

| | Key properties / requirements | Goals |
|-----------------------------------------------------------------|----------------------------------------------------------------------|------------------------------------------------------------------|
| vLLM tensor parallelism | | Reduce Ray overheads |
| vLLM pipeline parallelism | P2P/cross-node GPU communication | Reduce (expected) Ray overheads; Validate cross-node performance |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding from vLLM folks' benchmarks was that cross-node GPU communication overhead is too high (even if using optimized transports) so for now they don't have a desire to use it. I could be wrong though, this is hearsay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tensor parallelism, yes, but I don't think that's the case for pipeline parallelism.

@ericl
Copy link
Contributor

ericl commented Dec 22, 2023

Point-to-point communication passing small messages between two groups of actors

Would the current proposal be viable for even this? As I understand this would require dynamic M:N communication, whereas the compiled dag is assuming a fully static and exclusive communication topology between actors.

For example, if group 1 is a single actor dispatching work to group 2 actors, that would work (we could have multiple DAGs capturing "1->2" communication), but if group 1 had multiple dispatching actors then that would break the assumption that each actor doing work is part of a most one DAG at a time.

@rkooo567
Copy link
Contributor

I have impression the custom/faster transport has to be decoupled from DAG. It is a requirement for DAG, but DAG shouldn't be a requirement to use custom/faster transport. I think it could be easily implemented with existing ray API if we just establish the channel when the first communication is initiated. And if it works with existing Ray API, we don't have to support ^ from core?

@edoakes
Copy link
Contributor

edoakes commented Dec 28, 2023

Yes you're correct @ericl. To optimize the basic case of M:N communication we essentially just want the fast transport layer. Whether we get that through the DAG abstraction (single-node DAG) or use the lower-level primitive directly doesn't really matter too much.

Signed-off-by: Stephanie Wang <[email protected]>

with InputNode() as inp:
with InputGPUChannel() as gpu_channel:
_, tensors = producer.produce_all_sync.bind(inp.input)
Copy link
Contributor

@ericl ericl Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This syntax is pretty strange, I'd expect something more like tensors.set_device("gpu") or tensors.set_channel_impl(GPUChannel).

Ideally, this would be done automatically, but I'm not sure we can detect the return type is a cuda array easily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, tensors.set_channel_impl seems good. Mainly I want there to be a way to pass a specific Channel or Channel type, so tensors.set_device("gpu") seems less ideal.

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Copy link
Collaborator

@zhe-thoughts zhe-thoughts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vote on ray-committers just passed

@zhe-thoughts zhe-thoughts merged commit a8bac2d into main Mar 21, 2024
@hongchaodeng hongchaodeng deleted the accelerated-dag branch January 3, 2025 19:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants