Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/746.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for the `cylc validate-reinstall` command.
149 changes: 124 additions & 25 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
DEVNULL,
PIPE,
Popen,
TimeoutExpired,
)
from textwrap import indent
from time import time
Expand Down Expand Up @@ -216,31 +217,34 @@ class Services:
# log file stream lag
CAT_LOG_SLEEP = 1

# command timeout for commands which start schedulers
START_TIMEOUT = 120

@staticmethod
def _error(message: Union[Exception, str]):
"""Format error case response."""
return [
return (
False,
str(message)
]
)

@staticmethod
def _return(message: str):
"""Format success case response."""
return [
return (
True,
message
]
)

@classmethod
async def clean(
cls,
workflows_mgr: 'WorkflowsManager',
workflows: Iterable['Tokens'],
args: dict,
workflows_mgr: 'WorkflowsManager',
executor: 'Executor',
log: 'Logger'
):
) -> tuple[bool, str]:
"""Calls `cylc clean`"""
# Convert Schema options → cylc.flow.workflow_files.init_clean opts:
opts = _schema_opts_to_api_opts(args, schema=CleanOptions)
Expand Down Expand Up @@ -273,25 +277,51 @@ async def scan(
cls,
args: dict,
workflows_mgr: 'WorkflowsManager',
):
) -> tuple[bool, str]:
await workflows_mgr.scan()
return cls._return("Scan requested")

@classmethod
async def play(
async def run_command(
cls,
command: Iterable[str],
workflows: Iterable[Tokens],
args: Dict[str, Any],
workflows_mgr: 'WorkflowsManager',
log: 'Logger',
) -> List[Union[bool, str]]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from list[bool | str] to tuple[bool, str] as this is safer for the way we are trying to use it (('msg', False) would have been acceptable before).

"""Calls `cylc play`."""
timeout: int,
success_msg: str = 'Command succeeded',
fail_msg: str = 'Command failed',
) -> tuple[bool, str]:
"""Calls the specified Cylc command.

Args:
command:
The Cylc subcommand to run.
e.g ["play"] or ["cat-log", "-m", "p"].
workflows:
The workflows to run this command against.
args:
CLI arguments to be provided to this command.
e.g {'color': 'never'} would result in "--color=never".
log:
The application log, used to record this command invocation.
timeout:
Length of time to wait for the command to complete.
The command will be killed if the timeout elapses.
success_msg:
Message to be used in the response if the command succeeds.
fail_msg:
Message to be used in the response if the command fails.

Returns:

"""
cylc_version = args.pop('cylc_version', None)
results: Dict[str, str] = {}
failed = False
for tokens in workflows:
try:
cmd = _build_cmd(['cylc', 'play', '--color=never'], args)
cmd = _build_cmd(['cylc', *command, '--color=never'], args)

if tokens['user'] and tokens['user'] != getuser():
return cls._error(
Expand All @@ -313,7 +343,7 @@ async def play(
env.pop('CYLC_ENV_NAME', None)
env['CYLC_VERSION'] = cylc_version

# run cylc play
# run command
proc = Popen(
cmd,
env=env,
Expand All @@ -322,11 +352,21 @@ async def play(
stderr=PIPE,
text=True
)
ret_code = proc.wait(timeout=120)

if ret_code:
msg = f"Command failed ({ret_code}): {cmd_repr}"
try:
ret_code = proc.wait(timeout=timeout)
except TimeoutExpired as exc:
proc.kill()
ret_code = 124 # mimic `timeout` command error code
# NOTE: preserve any stderr that the command produced this
# far as this may help with debugging
out, err = proc.communicate()
err = str(exc) + (('\n' + err) if err else '')
Comment on lines +356 to +364
Copy link
Member Author

@oliver-sanders oliver-sanders Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it turns out this timeout logic was bunk. You have to actually .kill() to process if you want it to stop.

https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate

Additionally, this will now log the command's stderr which might be useful in the event of a timeout.

else:
out, err = proc.communicate()

if ret_code:
msg = f"{fail_msg} ({ret_code}): {cmd_repr}"
results[wflow] = err.strip() or out.strip() or msg
log.error(
f"{msg}\n"
Expand All @@ -335,26 +375,79 @@ async def play(
)
failed = True
else:
results[wflow] = 'started'
results[wflow] = success_msg

except Exception as exc: # unexpected error
log.exception(exc)
return cls._error(exc)

if failed:
if len(results) == 1:
# all commands failed
return cls._error(results.popitem()[1])
# else log each workflow result on separate lines

# some commands failed
return cls._error(
# log each workflow result on separate lines
"\n\n" + "\n\n".join(
f"{wflow}: {msg}" for wflow, msg in results.items()
)
)

# all commands succeeded
return cls._return(f'Workflow(s) {success_msg}')

@classmethod
async def play(
cls,
workflows_mgr: 'WorkflowsManager',
workflows: Iterable[Tokens],
args: dict,
log,
**kwargs,
) -> tuple[bool, str]:
"""Calls `cylc play`."""
ret = await cls.run_command(
('play',),
workflows,
args,
log,
cls.START_TIMEOUT,
**kwargs,
success_msg='started',
)

# trigger a re-scan
await workflows_mgr.scan()

# return results
return ret

@classmethod
async def validate_reinstall(
cls,
workflows_mgr: 'WorkflowsManager',
workflows: Iterable[Tokens],
args: dict,
log,
**kwargs,
) -> tuple[bool, str]:
"""Calls `cylc validate-reinstall`."""
ret = await cls.run_command(
('validate-reinstall', '--yes'),
workflows,
args,
log,
cls.START_TIMEOUT,
**kwargs,
success_msg='reinstalled',
)

# trigger a re-scan
await workflows_mgr.scan()
# send a success message
return cls._return('Workflow(s) started')

# return results
return ret

@staticmethod
async def enqueue(stream, queue):
Expand Down Expand Up @@ -581,8 +674,7 @@ async def service(
command: str,
workflows: Iterable['Tokens'],
kwargs: Dict[str, Any],
) -> List[Union[bool, str]]:

) -> tuple[bool, str]:
# GraphQL v3 includes all variables that are set, even if set to null.
kwargs = {
k: v
Expand All @@ -592,19 +684,26 @@ async def service(

if command == 'clean': # noqa: SIM116
return await Services.clean(
self.workflows_mgr,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched arg order for consistency.

workflows,
kwargs,
self.workflows_mgr,
log=self.log,
executor=self.executor
)
elif command == 'play':
elif command == 'play': # noqa: SIM116
return await Services.play(
self.workflows_mgr,
workflows,
kwargs,
self.workflows_mgr,
log=self.log
)
elif command == 'validate_reinstall':
return await Services.validate_reinstall(
self.workflows_mgr,
workflows,
kwargs,
log=self.log,
)
elif command == 'scan':
return await Services.scan(
kwargs,
Expand Down
43 changes: 43 additions & 0 deletions cylc/uiserver/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,48 @@ class Arguments:
result = GenericScalar()


class ValidateReinstall(graphene.Mutation):
class Meta:
description = sstrip('''
Validate, reinstall, then reload or restart as appropriate.

This command updates a workflow to reflect any new changes made in
the workflow source directory since it was installed.

The workflow will be reinstalled, then either:
* Reloaded (if the workflow is running),
* or restarted (if it is stopped).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* or restarted (if it is stopped).
* or restarted (if it is stopped).
Valid for: stopped, paused, running workflows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Omitting this line makes it valid for all states.

I guess doing this explicitly prevents it from being called on stopping workflows which is worthwhile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Ah right, I thought it would be disabled if this line was not specified at all)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you suggestion is probably good though as it omits stopping.

''')
resolver = partial(mutator, command='validate_reinstall')

class Arguments:
Copy link
Member

@MetRonnie MetRonnie Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are more arguments that this shares with play - worth extracting out into a class that can be inherited by both Arguments classes?

Edit: e.g.
image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was purposefully avoiding implementing all opts in favour of getting something simple working quickly.

My main worry is that the options valid for the command vary according to workflow state (i.e, you can only specify play opts if the workflow is stopped) and that inheriting both (along with defaults) could cause issues.

But we need to cross this bridge at some point, I'll see how hard it is...

workflows = graphene.List(WorkflowID, required=True)
cylc_version = CylcVersion(
description=sstrip('''
Set the Cylc version that the workflow starts with.
''')
)
set = graphene.List( # noqa: A003 (graphql field name)
graphene.String,
description=sstrip('''
Set the value of a Jinja2 template variable in the workflow
definition. Values should be valid Python literals so strings
must be quoted e.g. `STR="string"`, `INT=43`, `BOOL=True`.
This option can be used multiple times on the command line.
NOTE: these settings persist across workflow restarts, but can
be set again on the `cylc play` command line if they need to be
overridden.
''')
)
reload_global = graphene.Boolean(
default_value=False,
required=False,
description="Reload global config as well as the workflow config"
)

result = GenericScalar()


class Clean(graphene.Mutation):
class Meta:
description = sstrip('''
Expand Down Expand Up @@ -894,6 +936,7 @@ class Logs(graphene.ObjectType):

class UISMutations(Mutations):
play = _mut_field(Play)
validate_reinstall = _mut_field(ValidateReinstall)
clean = _mut_field(Clean)
scan = _mut_field(Scan)

Expand Down
23 changes: 3 additions & 20 deletions cylc/uiserver/tests/test_authorise.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,13 @@
"set_verbosity",
"stop",
"trigger",
"validate_reinstall",
]

ALL_OPS = [
"clean",
*CONTROL_OPS,
"read",
"broadcast",
"ext_trigger",
"hold",
"kill",
"message",
"pause",
"play",
"poll",
"release",
"release_hold_point",
"reload",
"remove",
"resume",
"scan",
"set",
"set_graph_window_extent",
"set_hold_point",
"set_verbosity",
"stop",
"trigger",
]

FAKE_SITE_CONF = {
Expand Down Expand Up @@ -154,6 +136,7 @@
"pause",
"set",
"release",
"validate_reinstall",
},
id="user in * and groups, owner in * and groups",
),
Expand Down
Loading