-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compiled dag #17
Compiled dag #17
Conversation
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
python/ray/dag/dag_node.py
Outdated
*args, | ||
_ray_cache_refs: bool = False, | ||
_ray_cache_actors: bool = True, | ||
compiled: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we make an experimental_compile()
method that returns a compiled node type instead of adding an arg here? I think the arg is a bit confusing since it's not clear that we cache the compiled DAG on the first interaction with execute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agree, I was thinking that too actually.
python/ray/dag/dag_node.py
Outdated
self.cache_from_last_execute = executor.cache | ||
return result | ||
|
||
def destroy_compiled_dag(self): | ||
_, _, _, monitor = self.compiled() | ||
monitor.destroy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop the fault tolerance/cancellation stuff from this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, missed this...
This reverts commit 9396810.
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: is the perf benchmark result similar or the same from before?
python/ray/dag/compiled_dag_node.py
Outdated
|
||
# Find the (multi-)output node to the DAG. | ||
for idx, task in self.idx_to_task.items(): | ||
if len(task.dependent_node_idxs) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: this could be included in the prvious loop?
for idx, task in self.idx_to_task.items():
if isinstance(task.dag_node, InputNode):
assert self.input_task_idx is None, "more than one InputNode found"
self.input_task_idx = idx
if len(task.dependent_node_idxs) == 0:
assert self.output_task_idx is None, "More than one output node found"
self.output_task_idx = idx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just did this for readability. The performance here doesn't matter much since we only do it once.
python/ray/dag/compiled_dag_node.py
Outdated
|
||
# Find the (multi-)output node to the DAG. | ||
for idx, task in self.idx_to_task.items(): | ||
if len(task.dependent_node_idxs) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part seems a bit confusing to me.
Isn't dependent_node == arguments? Why does output node have 0 dependent node id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah it's a reverse index for arguments. dependent_node is downstream tasks. I'll call it "downstream_node_idxs" name to make it clearer.
) | ||
|
||
# Assign the task with the correct input and output buffers. | ||
worker_fn = task.dag_node._get_remote_method("__ray_call__") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can make a wrapper?
def invoke_remote_method_from_task(task, func, *args, **kwargs):
worker_fn = task.dag_node._get_remote_method("__ray_call__")
return worker_fn.remote(func, args, kwargs)
something like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine not to do.
""" | ||
|
||
|
||
class CompiledDAG: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add tests that validate DAG itself (without running execute)?
- Convert regular DAG and make sure returned compiled dag contains all regular DAG nodes + correct worker_task_refs
- Test cases where exceptions are raised
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'll do 1.
2 is already done (see test_dag_errors).
…ay-project#41686) After much discussion, the Data Processing tag is too general. Removing it from the Example Gallery. cc: @amogkam @scottjlee @pcmoritz Signed-off-by: angelinalg <[email protected]>
Subsections are appearing in the Serve Examples page as separate docs vs sections of the Streaming doc. Signed-off-by: angelinalg <[email protected]>
For a few build that doesn't use rayci, need to add code to upload build data to go/flaky Signed-off-by: can <[email protected]>
…oject#41785) ray-project#40127 removed the "Implementing a Custom Datasource" example because it used deprecated APIs. This PR introduces a new example that uses up-to-date APIs. --------- Signed-off-by: Balaji Veeramani <[email protected]>
) Support OutputNode Allow to create bind from regular actor. It is needed because Actor needs to be reused Currently, you can have only 1 DAG per actor because the actor is a starting point of the DAG. This allows us to make a task as a starting node of the DAG instead of the actor This also allows to have more than one InputNode per each actor This PR also removes the unused code
See ray-project#41515. This updates to only compile new code on linux. OSX does not support shared memory semaphores, only named semaphores. --------- Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
…ples (ray-project#41758) Purging outdated or low-value examples from the Example Gallery. @pcmoritz @richardliaw --------- Signed-off-by: angelinalg <[email protected]>
Signed-off-by: Lonnie Liu <[email protected]>
formatting only Signed-off-by: Lonnie Liu <[email protected]>
Part of a larger effort to better curate examples. This example appears twice, once as a Code Example, and a second time as a Tutorial. I thought that was redundant so I removed the duplication. Signed-off-by: angelinalg <[email protected]>
If a deployment is autoscaling and replicas take a long time to start, there is a bug that makes the state transition to (UPDATING, AUTOSCALING) which is a combination that should never occur. Instead, we should just update the message but not the status. --------- Signed-off-by: Cindy Zhang <[email protected]>
…or multi-node distributed training and checkpointing (ray-project#41832) Ray 2.7 removed support for using the head node as the persistent storage for checkpoints and artifacts in a multi-node distributed training. The alternative recommendation is to use cloud storage or a shared filesystem instead via `RunConfig(storage_path)`. Ray Train/Tune will error if the user attempts to checkpoint `ray.train.report(..., checkpoint=...)` from a worker that's on a remote node. This is because the new assumption is that all worker nodes have access to read/write from the same persistent storage, and the "head node local storage" is not accessible by all nodes. However, the error message that shows up is confusing. All nodes can technically access the local path in the message -- the problem is that not all nodes can access the the SAME local path. This PR improves the error message to make this more clear and to suggest an actionable fix. This PR also updates most of the getting started user guides to mention the multi-node storage requirement and links to the storage user guide. --------- Signed-off-by: Justin Yu <[email protected]>
…de `set_epoch` usage in all examples (ray-project#41807) `prepare_data_loader` adds a `DistributedSampler` to an existing pytorch `DataLoader` object. To do this, it recreates a `DataLoader` object and passes most arguments through from the original object, but also makes some implicit assumptions that are not configurable/visible to the caller. For example, if using just vanilla pytorch by itself, it's possible to do: `train_dataloader = DataLoader(..., shuffle=False, sampler=DistributedSampler(shuffle=True))`. Here, the `DataLoader` sets `shuffle=False`, but the `DistributedSampler` will still do a shuffle on every epoch so that the training data order is not always the same. The `shuffle=False` argument of the `DataLoader` is pretty much ignored because a custom sampler is supplied. **However, with Ray Train, since this `prepare_data_loader` utility injects the `DistributedSampler` for the user, there's no visibility on the `shuffle` parameter.** Ray Train will detect the `shuffle` parameter set on the *original* dataloader, then pass that along to the `DistributedSampler`. So, it's not possible to have this `False+True` situation. **Additionally, if `shuffle=True`, `DistributedSampler.set_epoch` must be called at the start of each epoch in order for the dataset ordering to be different for all workers *on every epoch.*** This is because the seed of the sampler is determined at the epoch start (`epoch seed = base random seed + epoch number`). Shuffling can be very important for training a model successfully -- if the data order remains the same every epoch, it's possible that training never converges (ex: we ran into this issue training resnet18 on imagenet). Signed-off-by: Justin Yu <[email protected]>
Adding example code and descriptions for using gRPC context in Serve. --------- Signed-off-by: Gene Su <[email protected]> Signed-off-by: Gene Der Su <[email protected]> Co-authored-by: Edward Oakes <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Un-break CI. Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Why are these changes needed?
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.