Skip to content

Commit

Permalink
Convert job ID to UUID
Browse files Browse the repository at this point in the history
In some CI runs it was observed that unexpected results were being
returned for middleware jobs. This commit converts our job ids from
being monotonically incrementing integer to proper uuid so that the job
id that client is trying to track is guaranteed to uniquely identify it
regardless of which HA node is being connected to.

This commit also has benefit of making it much harder to guess
the job id when using public download endpoints for job results.
  • Loading branch information
anodos325 committed Jan 28, 2025
1 parent 7d0fed2 commit 678bf0b
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 25 deletions.
9 changes: 7 additions & 2 deletions src/middlewared/middlewared/apps/file_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from middlewared.service_exception import CallError
from truenas_api_client import json
from uuid import UUID

__all__ = ("FileApplication",)

Expand Down Expand Up @@ -47,12 +48,16 @@ async def _cleanup_job(self, job_id):

async def download(self, request):
path = request.path.split("/")
if not request.path[-1].isdigit():
try:
UUID(path[-1])
except ValueError:
self.middleware.logger.error('XXX: failed to parse %s', request.path, exc_info=True)
# The job id should be a valid UUID
resp = web.Response()
resp.set_status(404)
return resp

job_id = int(path[-1])
job_id = path[-1]

qs = parse_qs(request.query_string)
denied = False
Expand Down
11 changes: 3 additions & 8 deletions src/middlewared/middlewared/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import contextlib
from collections import OrderedDict
import copy
import enum
import errno
Expand All @@ -17,6 +16,7 @@
from middlewared.pipe import Pipes
from middlewared.utils.privilege import credential_is_limited_to_own_jobs, credential_has_full_admin
from middlewared.utils.time_utils import utc_now
from uuid import uuid4


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -225,8 +225,7 @@ class JobsDeque:

def __init__(self, maxlen=1000):
self.maxlen = maxlen
self.count = 0
self.__dict = OrderedDict()
self.__dict = {}
with contextlib.suppress(FileNotFoundError):
shutil.rmtree(LOGS_DIR)

Expand All @@ -244,7 +243,6 @@ def _get_next_id(self):
return self.count

def add(self, job):
job.set_id(self._get_next_id())
if len(self.__dict) > self.maxlen:
for old_job_id, old_job in self.__dict.items():
if old_job.state in (State.SUCCESS, State.FAILED, State.ABORTED):
Expand Down Expand Up @@ -291,7 +289,7 @@ def __init__(self, middleware, method_name, serviceobj, method, args, options, p
self.app = app
self.audit_callback = audit_callback

self.id = None
self.id = str(uuid4())
self.lock = None
self.result = None
self.error = None
Expand Down Expand Up @@ -377,9 +375,6 @@ def get_lock_name(self):
errno.EINVAL)
return lock_name

def set_id(self, id_):
self.id = id_

def set_result(self, result):
self.result = result

Expand Down
1 change: 1 addition & 0 deletions src/middlewared/middlewared/plugins/rdma/rdma.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,5 @@ async def capable_protocols(self):
if is_ent and 'MINI' not in await self.middleware.call('truenas.get_chassis_hardware'):
if await self.middleware.call('rdma.get_link_choices', True):
result.extend([RDMAprotocols.NFS.value, RDMAprotocols.ISER.value])

return result
10 changes: 5 additions & 5 deletions src/middlewared/middlewared/service/core_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __job_by_credential_and_id(self, credential, job_id, access):
@filterable
@filterable_returns(Dict(
'job',
Int('id'),
Str('id'),
Str('method'),
List('arguments'),
Bool('transient'),
Expand Down Expand Up @@ -175,7 +175,7 @@ def get_jobs(self, app, filters, options):
return jobs

@no_authz_required
@accepts(Int('id'), Str('filename'), Bool('buffered', default=False))
@accepts(Str('id'), Str('filename'), Bool('buffered', default=False))
@pass_app(rest=True)
async def job_download_logs(self, app, id_, filename, buffered):
"""
Expand All @@ -192,15 +192,15 @@ async def job_download_logs(self, app, id_, filename, buffered):
return (await self._download(app, 'filesystem.get', [job.logs_path], filename, buffered))[1]

@no_authz_required
@accepts(Int('id'))
@accepts(Str('id'))
@job()
async def job_wait(self, job, id_):
target_job = self.__job_by_credential_and_id(job.credentials, id_, JobAccess.READ)

return await job.wrap(target_job)

@private
@accepts(Int('id'), Dict(
@accepts(Str('id'), Dict(
'job-update',
Dict('progress', additional_attrs=True),
))
Expand Down Expand Up @@ -234,7 +234,7 @@ def notify_postinit(self):
self.middleware._setup_periodic_tasks()

@no_authz_required
@accepts(Int('id'))
@accepts(Str('id'))
@pass_app(rest=True)
def job_abort(self, app, id_):
job = self._job_by_app_and_id(app, id_, JobAccess.ABORT)
Expand Down
2 changes: 1 addition & 1 deletion tests/api2/test_006_pool_and_sysds.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_002_create_permanent_zpool(request, ws_client):
try:
sysdataset_update = ws_client.call('core.get_jobs', [
['method', '=', 'systemdataset.update']
], {'order_by': ['-id'], 'get': True})
], {'order_by': ['-time_started'], 'get': True})
except Exception:
fail('Failed to get status of systemdataset update')

Expand Down
4 changes: 2 additions & 2 deletions tests/api2/test_011_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def test_037_move_homedir_to_new_directory(request):
)

filters = [['method', '=', 'user.do_home_copy']]
opts = {'get': True, 'order_by': ['-id']}
opts = {'get': True, 'order_by': ['-time_started']}
move_job_timeout = 300 # 5 mins
move_job1 = call('core.get_jobs', filters, opts)
assert move_job1
Expand Down Expand Up @@ -518,7 +518,7 @@ def test_038_change_homedir_to_existing_path(request):
{'home': new_home}
)
filters = [['method', '=', 'user.do_home_copy']]
opts = {'get': True, 'order_by': ['-id']}
opts = {'get': True, 'order_by': ['-time_started']}
move_job_timeout = 300 # 5 mins
home_move_job = call('core.get_jobs', filters, opts)
rv = wait_on_job(home_move_job['id'], move_job_timeout)
Expand Down
2 changes: 1 addition & 1 deletion tests/api2/test_040_ad_user_group_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def do_ad_connection(request):
cache_fill_job = call(
'core.get_jobs',
[['method', '=', 'directoryservices.cache.refresh_impl']],
{'order_by': ['-id'], 'get': True}
{'order_by': ['-time_started'], 'get': True}
)
if cache_fill_job['state'] == 'RUNNING':
call('core.job_wait', cache_fill_job['id'], job=True)
Expand Down
3 changes: 1 addition & 2 deletions tests/api2/test_110_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ def test_certificate():
certificate_id = results["certificate"]["id"]

# successful delete
results = call("certificate.delete", certificate_id, True)
job_id = int(results)
job_id = call("certificate.delete", certificate_id, True)

# failed delete
while True:
Expand Down
4 changes: 2 additions & 2 deletions tests/api2/test_cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def test_snapshot(s3_credential):
def test_script_shebang(cloud_backup_task, expected):
ssh(f"touch /mnt/{cloud_backup_task.local_dataset}/blob")
run_task(cloud_backup_task.task)
job = call("core.get_jobs", [["method", "=", "cloud_backup.sync"]], {"order_by": ["-id"], "get": True})
job = call("core.get_jobs", [["method", "=", "cloud_backup.sync"]], {"order_by": ["-time_started"], "get": True})
assert job["logs_excerpt"].strip().split("\n")[-2] == expected


Expand All @@ -454,5 +454,5 @@ def test_pre_script_failure(cloud_backup_task, error, expected):

assert ve.value.error == error

job = call("core.get_jobs", [["method", "=", "cloud_backup.sync"]], {"order_by": ["-id"], "get": True})
job = call("core.get_jobs", [["method", "=", "cloud_backup.sync"]], {"order_by": ["-time_started"], "get": True})
assert job["logs_excerpt"].strip() == expected
4 changes: 2 additions & 2 deletions tests/api2/test_cloud_sync_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_pre_script_failure():

assert ve.value.error == "[EFAULT] Pre-script failed with exit code 123"

job = call("core.get_jobs", [["method", "=", "cloudsync.sync"]], {"order_by": ["-id"], "get": True})
job = call("core.get_jobs", [["method", "=", "cloudsync.sync"]], {"order_by": ["-time_started"], "get": True})
assert job["logs_excerpt"] == "[Pre-script] Custom error\n"


Expand Down Expand Up @@ -59,5 +59,5 @@ def test_script_shebang():
}) as task:
run_task(task)

job = call("core.get_jobs", [["method", "=", "cloudsync.sync"]], {"order_by": ["-id"], "get": True})
job = call("core.get_jobs", [["method", "=", "cloudsync.sync"]], {"order_by": ["-time_started"], "get": True})
assert job["logs_excerpt"].endswith("[Post-script] TestTest\n")

0 comments on commit 678bf0b

Please sign in to comment.