Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

specs,schedulers: add Role.mounts+BindMount and implement in docker,kubernetes,aws_batch schedulers #420

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/ext/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"cancel": "Cancel Job",
"describe": "Describe Job",
"workspaces": "Workspaces / Patching",
"mounts": "Mounts",
},
}

Expand Down
9 changes: 9 additions & 0 deletions docs/source/specs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ Run Status
.. autoclass:: ReplicaState
:members:


Mounts
--------

.. autoclass:: BindMount
:members:

.. autofunction:: parse_mounts

Component Linter
-----------------
.. automodule:: torchx.specs.file_linter
Expand Down
5 changes: 4 additions & 1 deletion torchx/components/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
import os
import shlex
from pathlib import Path
from typing import Dict, Iterable, Optional
from typing import Dict, Iterable, Optional, List

import torchx
import torchx.specs as specs
Expand All @@ -146,6 +146,7 @@ def ddp(
max_retries: int = 0,
rdzv_backend: str = "c10d",
rdzv_endpoint: Optional[str] = None,
mounts: Optional[List[str]] = None,
) -> specs.AppDef:
"""
Distributed data parallel style application (one role, multi-replica).
Expand All @@ -171,6 +172,7 @@ def ddp(
max_retries: the number of scheduler retries allowed
rdzv_backend: rendezvous backend (only matters when nnodes > 1)
rdzv_endpoint: rendezvous server endpoint (only matters when nnodes > 1), defaults to rank0 host for schedulers that support it
mounts: the list of mounts to bind mount into the worker environment/container (ex. type=bind,src=/host,dst=/job[,readonly])
"""

if (script is None) == (m is None):
Expand Down Expand Up @@ -244,6 +246,7 @@ def ddp(
"c10d": 29500,
},
max_retries=max_retries,
mounts=specs.parse_mounts(mounts) if mounts else [],
)
],
)
Expand Down
6 changes: 6 additions & 0 deletions torchx/components/test/dist_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@
class DistributedComponentTest(ComponentTestCase):
def test_ddp(self) -> None:
self.validate(dist, "ddp")

def test_ddp_mounts(self) -> None:
app = dist.ddp(
script="foo.py", mounts=["type=bind", "src=/dst", "dst=/dst", "readonly"]
)
self.assertEqual(len(app.roles[0].mounts), 1)
25 changes: 24 additions & 1 deletion torchx/schedulers/aws_batch_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,26 @@ def _role_to_node_properties(idx: int, role: Role) -> Dict[str, object]:
if resource.gpu > 0:
reqs.append({"type": "GPU", "value": str(resource.gpu)})

mount_points = []
volumes = []
for i, mount in enumerate(role.mounts):
name = f"mount_{i}"
volumes.append(
{
"name": name,
"host": {
"sourcePath": mount.src_path,
},
}
)
mount_points.append(
{
"containerPath": mount.dst_path,
"readOnly": mount.read_only,
"sourceVolume": name,
}
)

container = {
"command": [role.entrypoint] + role.args,
"image": role.image,
Expand All @@ -99,6 +119,8 @@ def _role_to_node_properties(idx: int, role: Role) -> Dict[str, object]:
"logConfiguration": {
"logDriver": "awslogs",
},
"mountPoints": mount_points,
"volumes": volumes,
}

return {
Expand Down Expand Up @@ -165,7 +187,8 @@ class AWSBatchScheduler(Scheduler, DockerWorkspace):
describe: |
Partial support. AWSBatchScheduler will return job and replica
status but does not provide the complete original AppSpec.
workspaces: false
workspaces: true
mounts: true
"""

def __init__(
Expand Down
15 changes: 14 additions & 1 deletion torchx/schedulers/docker_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class DockerScheduler(Scheduler, DockerWorkspace):
Partial support. DockerScheduler will return job and replica
status but does not provide the complete original AppSpec.
workspaces: true
mounts: true
"""

def __init__(self, session_name: str) -> None:
Expand Down Expand Up @@ -171,7 +172,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DockerJob]) -> str:
def _submit_dryrun(
self, app: AppDef, cfg: Mapping[str, CfgVal]
) -> AppDryRunInfo[DockerJob]:
from docker.types import DeviceRequest
from docker.types import DeviceRequest, Mount

default_env = {}
copy_env = cfg.get("copy_env")
Expand All @@ -189,6 +190,17 @@ def _submit_dryrun(
req = DockerJob(app_id=app_id, containers=[])
rank0_name = f"{app_id}-{app.roles[0].name}-0"
for role in app.roles:
mounts = []
for mount in role.mounts:
mounts.append(
Mount(
target=mount.dst_path,
source=mount.src_path,
read_only=mount.read_only,
type="bind",
)
)

for replica_id in range(role.num_replicas):
values = macros.Values(
img_root="",
Expand Down Expand Up @@ -220,6 +232,7 @@ def _submit_dryrun(
},
"hostname": name,
"network": NETWORK,
"mounts": mounts,
},
)
if replica_role.max_retries > 0:
Expand Down
27 changes: 27 additions & 0 deletions torchx/schedulers/kubernetes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod
V1ResourceRequirements,
V1ContainerPort,
V1ObjectMeta,
V1VolumeMount,
V1Volume,
V1HostPathVolumeSource,
)

requests = {}
Expand All @@ -183,6 +186,26 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod
requests=requests,
)

volumes = []
volume_mounts = []
for i, mount in enumerate(role.mounts):
mount_name = f"mount-{i}"
volumes.append(
V1Volume(
name=mount_name,
host_path=V1HostPathVolumeSource(
path=mount.src_path,
),
)
)
volume_mounts.append(
V1VolumeMount(
name=mount_name,
mount_path=mount.dst_path,
read_only=mount.read_only,
)
)

container = V1Container(
command=[role.entrypoint] + role.args,
image=role.image,
Expand All @@ -202,12 +225,15 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod
)
for name, port in role.port_map.items()
],
volume_mounts=volume_mounts,
)

return V1Pod(
spec=V1PodSpec(
containers=[container],
restart_policy="Never",
service_account_name=service_account,
volumes=volumes,
),
metadata=V1ObjectMeta(
annotations={
Expand Down Expand Up @@ -360,6 +386,7 @@ class KubernetesScheduler(Scheduler, DockerWorkspace):
Partial support. KubernetesScheduler will return job and replica
status but does not provide the complete original AppSpec.
workspaces: true
mounts: true
"""

def __init__(
Expand Down
1 change: 1 addition & 0 deletions torchx/schedulers/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ class LocalScheduler(Scheduler):
workspaces: |
Partial support. LocalScheduler runs the app from a local
directory but does not support programmatic workspaces.
mounts: false
"""

def __init__(
Expand Down
22 changes: 22 additions & 0 deletions torchx/schedulers/ray_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ class RayJob:
actors: List[RayActor] = field(default_factory=list)

class RayScheduler(Scheduler):
"""
**Config Options**

.. runopts::
class: torchx.schedulers.ray_scheduler.RayScheduler

**Compatibility**

.. compatibility::
type: scheduler
features:
cancel: true
logs: true
distributed: true
describe: |
Partial support. RayScheduler will return job status but
does not provide the complete original AppSpec.
workspaces: false
mounts: false

"""

def __init__(self, session_name: str) -> None:
super().__init__("ray", session_name)

Expand Down
1 change: 1 addition & 0 deletions torchx/schedulers/slurm_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ class SlurmScheduler(Scheduler, DirWorkspace):
workspaces: |
If ``job_dir`` is specified the DirWorkspace will create a new
isolated directory with a snapshot of the workspace.
mounts: false
"""

def __init__(self, session_name: str) -> None:
Expand Down
33 changes: 33 additions & 0 deletions torchx/schedulers/test/aws_batch_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def _test_app() -> specs.AppDef:
port_map={"foo": 1234},
num_replicas=2,
max_retries=3,
mounts=[
specs.BindMount(src_path="/src", dst_path="/dst", read_only=True),
],
)

return specs.AppDef("test", roles=[trainer_role])
Expand Down Expand Up @@ -109,6 +112,21 @@ def test_submit_dryrun(self) -> None:
{"type": "GPU", "value": "4"},
],
"logConfiguration": {"logDriver": "awslogs"},
"mountPoints": [
{
"containerPath": "/dst",
"readOnly": True,
"sourceVolume": "mount_0",
}
],
"volumes": [
{
"name": "mount_0",
"host": {
"sourcePath": "/src",
},
}
],
},
},
{
Expand Down Expand Up @@ -136,6 +154,21 @@ def test_submit_dryrun(self) -> None:
{"type": "GPU", "value": "4"},
],
"logConfiguration": {"logDriver": "awslogs"},
"mountPoints": [
{
"containerPath": "/dst",
"readOnly": True,
"sourceVolume": "mount_0",
}
],
"volumes": [
{
"name": "mount_0",
"host": {
"sourcePath": "/src",
},
}
],
},
},
],
Expand Down
13 changes: 12 additions & 1 deletion torchx/schedulers/test/docker_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from unittest.mock import patch

import fsspec
from docker.types import DeviceRequest
from docker.types import DeviceRequest, Mount
from torchx import specs
from torchx.components.dist import ddp
from torchx.schedulers.api import Stream
Expand Down Expand Up @@ -48,6 +48,9 @@ def _test_app() -> specs.AppDef:
port_map={"foo": 1234},
num_replicas=1,
max_retries=3,
mounts=[
specs.BindMount(src_path="/tmp", dst_path="/tmp", read_only=True),
],
)

return specs.AppDef("test", roles=[trainer_role])
Expand Down Expand Up @@ -105,6 +108,14 @@ def test_submit_dryrun(self) -> None:
"MaximumRetryCount": 3,
},
"network": "torchx",
"mounts": [
Mount(
target="/tmp",
source="/tmp",
read_only=True,
type="bind",
)
],
},
)
],
Expand Down
Loading