Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
33286f5
orchestrators/factory: collapse container enabled+launch into contain…
atnair-amd May 29, 2026
b31b343
runtimes: drop launch short-circuit, add image_sha_status
atnair-amd May 29, 2026
fa6ec07
orchestrators/container: branch setup/teardown on container.lifetime
atnair-amd May 29, 2026
c14b914
input/cluster_file: use container.lifetime in cluster_container.json …
atnair-amd May 29, 2026
ab74309
core/unittests: cover container.lifetime resolution and per-lifetime …
atnair-amd May 29, 2026
7c8be5c
docs: document container.lifetime schema (replace enabled/launch)
atnair-amd May 29, 2026
0ddd336
orchestrators/scripts: add default container provisioning script
atnair-amd May 30, 2026
778b39b
orchestrators/factory: resolve and validate container.setup_script
atnair-amd May 30, 2026
f3d2b66
orchestrators/container: provision launched containers via setup_script
atnair-amd May 30, 2026
11218e3
orchestrators/container: fix setup_sshd pgrep precheck matching its p…
atnair-amd May 30, 2026
dadc82b
input/cluster_file: document and sample container.setup_script
atnair-amd May 30, 2026
8fcfec9
docs: document container.setup_script in cluster-file references
atnair-amd May 30, 2026
87b9065
orchestrators/unittests: cover container.setup_script resolution
atnair-amd May 30, 2026
aed4df1
orchestrators/unittests: cover container provisioning and sshd pgrep …
atnair-amd May 30, 2026
6d182a3
orchestrators/runtimes: harden container persistent lifetime
atnair-amd May 31, 2026
f950082
orchestrators: apply ruff format to satisfy the fmt-check gate
atnair-amd May 31, 2026
fa4ca32
orchestrators: rename container lifetime 'external' to 'no_launch'
atnair-amd Jun 1, 2026
ec87e4f
orchestrators: replace container enabled/launch with container.lifetime
atnair-amd Jun 9, 2026
cd23180
address review: trim normalize comment, carry lifetime in runtime tes…
atnair-amd Jun 10, 2026
a643858
tests: drop redundant persistent-idempotent test, strip dead warning …
atnair-amd Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 170 additions & 84 deletions cvs/core/orchestrators/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def __init__(self, log, config, stop_on_errors=False):
if not self.container_config:
raise ValueError("ContainerOrchestrator requires 'container' config in OrchestratorConfig")

self.container_enabled = True
self.container_id = None # Track running container ID

# Initialize container runtime
Expand Down Expand Up @@ -252,6 +251,35 @@ def is_privileged(self):
runtime_args = runtime_config.get('args', {})
return runtime_args.get('privileged', DEFAULT_CONTAINER_ARGS['privileged'])

def _partition_by_status(self, status):
"""Partition the expected hosts into (running, absent, probe_failed).

The single source of truth for interpreting a runtime is_running() result.
Both the persistent setup branch and verify_containers_running consume this
so they cannot drift apart on what counts as running vs unreachable.

running : probe succeeded and the named container is up.
absent : probe succeeded and no such container is running.
probe_failed : the probe cannot be trusted -- non-zero exit_code (an
SSH/sudo/docker error or a timeout) OR the host is missing
from status entirely (pruned as unreachable by an earlier
command). Either way the container's true state is unknown.

Iterates the expected host set (self.hosts), not status.keys(), so a host
that dropped out of the probe is surfaced as probe_failed rather than
silently ignored.
"""
running, absent, probe_failed = [], [], []
for host in self.hosts:
info = status.get(host)
if info is None or info.get('exit_code') != 0:
probe_failed.append(host)
elif info.get('running'):
running.append(host)
else:
absent.append(host)
return running, absent, probe_failed

def setup_containers(
self,
volumes=None,
Expand All @@ -263,12 +291,15 @@ def setup_containers(
ulimits=None,
):
"""
Set up containers based on configuration.
Set up containers according to the configured container.lifetime policy.

This method should be called explicitly by tests when they need containers.
It respects the 'enabled' flag and handles container lifecycle appropriately.
If 'launch' is true, checks that containers are already running and sets container_id.
If containers are not running when expected, fails and informs the user.
Behavior branches on container.lifetime:
- 'no_launch' : verify a container with the configured name is running
and set container_id; never starts anything.
- 'per_run' : start fresh containers on all hosts.
- 'persistent' : attach to a container already running on all hosts,
otherwise start fresh. Idempotent.

Args:
volumes: Optional list of volume mounts (uses standards if not provided)
Expand All @@ -282,67 +313,128 @@ def setup_containers(
Returns:
bool: True if containers were set up successfully or no setup needed
"""
if not self.container_config or not self.container_config.get('enabled', False):
self.log.debug("Container mode not enabled, skipping setup")
return True

if self.container_config.get('launch', False):
# Launch containers
self.log.info("Launching containers...")
container_name = self.get_container_name(self.container_config, self.container_config['image'])
self.container_id = container_name

# Use provided parameters or get standards
volumes = volumes if volumes is not None else self.get_volumes()
devices = devices if devices is not None else self.get_devices()
capabilities = capabilities if capabilities is not None else self.get_capabilities()
security_opts = security_opts if security_opts is not None else self.get_security_opts()
environment = environment if environment is not None else self.get_environment()
groups = groups if groups is not None else self.get_groups()
ulimits = ulimits if ulimits is not None else self.get_ulimits()

# Create a modified container config with standard settings
modified_config = dict(self.container_config)

# Ensure runtime config exists
if 'runtime' not in modified_config:
modified_config['runtime'] = {}
if 'args' not in modified_config['runtime']:
modified_config['runtime']['args'] = {}

# Set standard runtime args if not already set
runtime_args = modified_config['runtime']['args']
if 'network' not in runtime_args:
runtime_args['network'] = self.get_network_mode()
if 'ipc' not in runtime_args:
runtime_args['ipc'] = self.get_ipc_mode()
if 'privileged' not in runtime_args:
runtime_args['privileged'] = self.is_privileged()

# Add InfiniBand device discovery via shell expansion (per-host)
ib_device_expansion = '$(for dev in /dev/infiniband/*; do echo -n "--device $dev:$dev "; done)'

return self.runtime.setup_containers(
modified_config,
container_name,
volumes=volumes,
devices=devices,
capabilities=capabilities,
security_opts=security_opts,
environment=environment,
groups=groups,
ulimits=ulimits,
device_expansion=ib_device_expansion,
)
lifetime = self.container_config.get('lifetime', 'per_run')

# Assume containers are running
image = self.container_config.get('image')
if not image:
self.log.error("Container image not specified in config")
return False

container_name = self.get_container_name(self.container_config, image)
return self.verify_containers_running(container_name)

if lifetime == 'no_launch':
# CVS never launches it: verify only, never start.
return self.verify_containers_running(container_name)

if lifetime == 'persistent':
status = self.runtime.is_running(container_name)
running_hosts, absent_hosts, probe_failed_hosts = self._partition_by_status(status)

if probe_failed_hosts:
# The probe could not be trusted on these hosts (SSH/sudo/docker
# error, timeout, or the host dropped out). We cannot tell "absent"
# from "actually running but unreachable", and treating unreachable
# as absent would let the cold-start path below force-remove a
# container that is in fact running -- destroying its overlay. Refuse.
self.log.error(
f"Cannot determine state of persistent container '{container_name}' on "
f"{probe_failed_hosts} (probe failed: SSH/sudo/docker error, timeout, or "
f"host unreachable). Refusing to act on an untrustworthy probe -- resolve "
f"host/daemon health and rerun."
)
return False

if running_hosts and not absent_hosts:
# Running on every host: attach.
self.container_id = container_name
self.log.info(f"Attaching to running container '{container_name}'")
return True

if running_hosts and absent_hosts:
# Partial: refuse to auto-relaunch. _launch_containers force-removes
# the same-named container on ALL hosts before recreating, which would
# destroy the overlay (installs, clones) on the still-running hosts --
# the opposite of what 'persistent' promises. Fail loudly and let the
# user choose: remove on all hosts and rerun (clean rebuild), or
# restart the container on the missing hosts to reattach.
self.log.error(
f"Persistent container '{container_name}' is running on {running_hosts} "
f"but missing on {absent_hosts}. Refusing to auto-relaunch: that would "
f"force-remove and rebuild the containers on the still-running hosts, "
f"destroying their overlay. Either remove '{container_name}' on all hosts "
f"and rerun, or restart it on {absent_hosts} to reattach."
)
return False

# Genuinely absent on every host: legitimate cold start, launch fresh on all.
self.log.info("Persistent container not running on any host, launching...")
return self._launch_containers(volumes, devices, capabilities, security_opts, environment, groups, ulimits)

# 'per_run' (default): always start fresh.
return self._launch_containers(volumes, devices, capabilities, security_opts, environment, groups, ulimits)

def _launch_containers(
self,
volumes=None,
devices=None,
capabilities=None,
security_opts=None,
environment=None,
groups=None,
ulimits=None,
):
"""Start fresh containers on all hosts via the runtime.

Shared by the 'per_run' path and the 'persistent' path when no container
is already running. The runtime force-removes any stale same-named
container before starting.
"""
self.log.info("Launching containers...")
container_name = self.get_container_name(self.container_config, self.container_config['image'])
self.container_id = container_name

# Use provided parameters or get standards
volumes = volumes if volumes is not None else self.get_volumes()
devices = devices if devices is not None else self.get_devices()
capabilities = capabilities if capabilities is not None else self.get_capabilities()
security_opts = security_opts if security_opts is not None else self.get_security_opts()
environment = environment if environment is not None else self.get_environment()
groups = groups if groups is not None else self.get_groups()
ulimits = ulimits if ulimits is not None else self.get_ulimits()

# Create a modified container config with standard settings
modified_config = dict(self.container_config)

# Ensure runtime config exists
if 'runtime' not in modified_config:
modified_config['runtime'] = {}
if 'args' not in modified_config['runtime']:
modified_config['runtime']['args'] = {}

# Set standard runtime args if not already set
runtime_args = modified_config['runtime']['args']
if 'network' not in runtime_args:
runtime_args['network'] = self.get_network_mode()
if 'ipc' not in runtime_args:
runtime_args['ipc'] = self.get_ipc_mode()
if 'privileged' not in runtime_args:
runtime_args['privileged'] = self.is_privileged()

# Add InfiniBand device discovery via shell expansion (per-host)
ib_device_expansion = '$(for dev in /dev/infiniband/*; do echo -n "--device $dev:$dev "; done)'

launched = self.runtime.setup_containers(
modified_config,
container_name,
volumes=volumes,
devices=devices,
capabilities=capabilities,
security_opts=security_opts,
environment=environment,
groups=groups,
ulimits=ulimits,
device_expansion=ib_device_expansion,
)
return launched

def setup_sshd(self):
"""
Expand Down Expand Up @@ -414,21 +506,14 @@ def verify_containers_running(self, container_name):
bool: True if container is running on all hosts, False otherwise
"""
self.log.debug(f"Checking if container '{container_name}' is running on all hosts")
result = self.runtime.is_running(container_name)

# Verify container is running on all hosts
failed_hosts = []
for host, info in result.items():
exit_code = info.get('exit_code')
if exit_code != 0:
failed_hosts.append(f"{host} (exit code {exit_code})")
continue
if not info.get('running'):
running_name = info.get('name', '')
failed_hosts.append(f"{host} (container not running, found: '{running_name}')")

if failed_hosts:
self.log.error(f"Container '{container_name}' not running on hosts: {failed_hosts}")
status = self.runtime.is_running(container_name)
_running, absent_hosts, probe_failed_hosts = self._partition_by_status(status)

if absent_hosts or probe_failed_hosts:
self.log.error(
f"Container '{container_name}' not running on all hosts: "
f"not running on {absent_hosts}, probe failed on {probe_failed_hosts}"
)
return False

self.container_id = container_name
Expand All @@ -437,21 +522,22 @@ def verify_containers_running(self, container_name):

def teardown_containers(self):
"""
Tear down containers if they are running.
Tear down containers according to the configured container.lifetime policy.

This method should be called explicitly by tests for cleanup.
It respects the 'enabled' flag and handles container lifecycle appropriately.
If 'launch' is False, assumes containers should not be stopped (externally managed).
This method should be called explicitly by tests for cleanup. Behavior
branches on container.lifetime:
- 'no_launch' : no-op (CVS does not own a container it did not launch).
- 'persistent' : no-op (left running for the next run; user removes it
explicitly).
- 'per_run' : force-remove the container CVS started.

Returns:
bool: True if containers were torn down successfully or no teardown needed
"""
if not self.container_config or not self.container_config.get('enabled', False):
self.log.debug("Container mode not enabled, skipping teardown")
return True
lifetime = self.container_config.get('lifetime', 'per_run')

if not self.container_config.get('launch', False):
self.log.debug("launch is False, not stopping externally managed containers")
if lifetime in ('no_launch', 'persistent'):
self.log.debug(f"lifetime={lifetime}, leaving containers running")
return True

if not self.container_id:
Expand Down
Loading
Loading