Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 199 additions & 59 deletions confluent/docker_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,88 +4,152 @@

import boto3
import docker
from compose.config.config import ConfigDetails, ConfigFile, load
from compose.container import Container
from compose.project import Project
from compose.service import ImageType
from compose.cli.docker_client import docker_client
from compose.config.environment import Environment
from .compose import (
ComposeConfig, ComposeProject, ComposeContainer,
create_docker_client, ContainerStatus, DockerStateKeys,
FileConstants, Separators
)


# Docker Testing Constants
class DockerTestingLabels:
"""Docker testing label constants."""
TESTING_LABEL = "io.confluent.docker.testing"
TRUE_VALUE = "true"


# AWS ECR Constants
class ECRKeys:
"""AWS ECR service keys."""
ECR_SERVICE = "ecr"
AUTH_DATA = "authorizationData"
AUTH_TOKEN = "authorizationToken"
PROXY_ENDPOINT = "proxyEndpoint"


# Command and Shell Constants
class CommandStrings:
"""Command and shell constants."""
BASH_C = "bash -c"
SUCCESS_TEXT = "success"
SUCCESS_BYTES = b"success"
BUSYBOX_IMAGE = "busybox"
HOST_NETWORK = "host"
TMP_VOLUME = "/tmp:/tmp"


# Environment Variable Constants
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these redundant comment as it is clear from the class name itself.

class EnvVarPatterns:
"""Environment variable patterns."""
DOCKER_PREFIX = "DOCKER_"
REGISTRY_SUFFIX = "REGISTRY"
TAG_SUFFIX = "TAG"
DEFAULT_TAG = "latest"
UPSTREAM_SCOPE = "UPSTREAM"
TEST_SCOPE = "TEST"
SCOPE_SEPARATOR = "_"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like a coherent set of clubbing variables.
Here these are not simple pattern (e.g. regexes) but env variables used at different places in the entire variable to construct the final env variable. Enum doesn't seem to fit well here



# Container Configuration Keys
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these redundant comment as it is clear from the class name itself.

class ContainerConfigKeys:
"""Container configuration keys."""
IMAGE = "image"
COMMAND = "command"
LABELS = "labels"
HOST_CONFIG = "host_config"
NETWORK_MODE = "NetworkMode"
BINDS = "Binds"
DETACH = "detach"
NETWORK_MODE_KEY = "network_mode"
VOLUMES = "volumes"


# Text Encoding Constants
class EncodingConstants:
"""Text encoding constants."""
UTF8 = "utf-8"
IGNORE_ERRORS = "ignore"
STREAM_KEY = "stream"


def api_client():
return docker.from_env().api
"""Get Docker client compatible with both legacy and new usage."""
return docker.from_env()


def ecr_login():
# see docker/docker-py#1677
ecr = boto3.client('ecr')
ecr = boto3.client(ECRKeys.ECR_SERVICE)
login = ecr.get_authorization_token()
b64token = login['authorizationData'][0]['authorizationToken'].encode('utf-8')
username, password = base64.b64decode(b64token).decode('utf-8').split(':')
registry = login['authorizationData'][0]['proxyEndpoint']
b64token = login[ECRKeys.AUTH_DATA][0][ECRKeys.AUTH_TOKEN].encode(EncodingConstants.UTF8)
username, password = base64.b64decode(b64token).decode(EncodingConstants.UTF8).split(Separators.COLON)
registry = login[ECRKeys.AUTH_DATA][0][ECRKeys.PROXY_ENDPOINT]
client = docker.from_env()
client.login(username, password, registry=registry)


def build_image(image_name, dockerfile_dir):
print("Building image %s from %s" % (image_name, dockerfile_dir))
client = api_client()
output = client.build(dockerfile_dir, rm=True, tag=image_name)
response = "".join([" %s" % (line,) for line in output])
image, build_logs = client.images.build(path=dockerfile_dir, rm=True, tag=image_name)
response = "".join([" %s" % (line.get(EncodingConstants.STREAM_KEY, '')) for line in build_logs if EncodingConstants.STREAM_KEY in line])
print(response)


def image_exists(image_name):
client = api_client()
tags = [t for image in client.images() for t in image['RepoTags'] or []]
return image_name in tags
try:
client.images.get(image_name)
return True
except docker.errors.ImageNotFound:
return False


def pull_image(image_name):
client = api_client()
if not image_exists(image_name):
client.pull(image_name)
client.images.pull(image_name)


def run_docker_command(timeout=None, **kwargs):
pull_image(kwargs["image"])
pull_image(kwargs[ContainerConfigKeys.IMAGE])
client = api_client()
kwargs["labels"] = {"io.confluent.docker.testing": "true"}
kwargs[ContainerConfigKeys.LABELS] = {DockerTestingLabels.TESTING_LABEL: DockerTestingLabels.TRUE_VALUE}
container = TestContainer.create(client, **kwargs)
container.start()
container.wait(timeout)
logs = container.logs()
print("Running command %s: %s" % (kwargs["command"], logs))
print("Running command %s: %s" % (kwargs[ContainerConfigKeys.COMMAND], logs))
container.shutdown()
return logs


def path_exists_in_image(image, path):
print("Checking for %s in %s" % (path, image))
cmd = "bash -c '[ ! -e %s ] || echo success' " % (path,)
cmd = f"{CommandStrings.BASH_C} '[ ! -e {path} ] || echo {CommandStrings.SUCCESS_TEXT}' "
output = run_docker_command(image=image, command=cmd)
return b"success" in output
return CommandStrings.SUCCESS_BYTES in output


def executable_exists_in_image(image, path):
print("Checking for %s in %s" % (path, image))
cmd = "bash -c '[ ! -x %s ] || echo success' " % (path,)
cmd = f"{CommandStrings.BASH_C} '[ ! -x {path} ] || echo {CommandStrings.SUCCESS_TEXT}' "
output = run_docker_command(image=image, command=cmd)
return b"success" in output
return CommandStrings.SUCCESS_BYTES in output


def run_command_on_host(command):
logs = run_docker_command(
image="busybox",
image=CommandStrings.BUSYBOX_IMAGE,
command=command,
host_config={'NetworkMode': 'host', 'Binds': ['/tmp:/tmp']})
host_config={ContainerConfigKeys.NETWORK_MODE: CommandStrings.HOST_NETWORK, ContainerConfigKeys.BINDS: [CommandStrings.TMP_VOLUME]})
print("Running command %s: %s" % (command, logs))
return logs


def run_cmd(command):
if command.startswith('"'):
cmd = "bash -c %s" % command
cmd = "%s %s" % (CommandStrings.BASH_C, command)
else:
cmd = command

Expand All @@ -107,98 +171,174 @@ def add_registry_and_tag(image, scope=""):
"""

if scope:
scope += "_"
scope += EnvVarPatterns.SCOPE_SEPARATOR

return "{0}{1}:{2}".format(os.environ.get("DOCKER_{0}REGISTRY".format(scope), ""),
return "{0}{1}:{2}".format(os.environ.get(f"{EnvVarPatterns.DOCKER_PREFIX}{scope}{EnvVarPatterns.REGISTRY_SUFFIX}", ""),
image,
os.environ.get("DOCKER_{0}TAG".format(scope), "latest")
os.environ.get(f"{EnvVarPatterns.DOCKER_PREFIX}{scope}{EnvVarPatterns.TAG_SUFFIX}", EnvVarPatterns.DEFAULT_TAG)
)


class TestContainer(Container):

class TestContainer(ComposeContainer):
"""Extended container class for testing purposes."""

def __init__(self, container):
super().__init__(container)

@classmethod
def create(cls, client, **kwargs):
"""Create a new container using Docker SDK."""
# Extract Docker SDK compatible parameters
image = kwargs.get(ContainerConfigKeys.IMAGE)
command = kwargs.get(ContainerConfigKeys.COMMAND)
labels = kwargs.get(ContainerConfigKeys.LABELS, {})
host_config = kwargs.get(ContainerConfigKeys.HOST_CONFIG, {})

# Create container configuration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these redundant comment as it is clear from the code.

container_config = {
ContainerConfigKeys.IMAGE: image,
ContainerConfigKeys.COMMAND: command,
ContainerConfigKeys.LABELS: labels,
ContainerConfigKeys.DETACH: True,
}

# Add host configuration if provided
if host_config:
if ContainerConfigKeys.NETWORK_MODE in host_config:
container_config[ContainerConfigKeys.NETWORK_MODE_KEY] = host_config[ContainerConfigKeys.NETWORK_MODE]
if ContainerConfigKeys.BINDS in host_config:
volumes = {}
for bind in host_config[ContainerConfigKeys.BINDS]:
host_path, container_path = bind.split(Separators.COLON)
volumes[host_path] = {FileConstants.BIND_MODE: container_path, 'mode': FileConstants.READ_WRITE_MODE}
container_config[ContainerConfigKeys.VOLUMES] = volumes

# Create the container
docker_container = client.containers.create(**container_config)

# Return wrapped container
return cls(docker_container)

def start(self):
"""Start the container."""
self.container.start()

def state(self):
return self.inspect_container["State"]
"""Get container state information."""
self.container.reload()
return self.container.attrs[DockerStateKeys.STATE]

def status(self):
return self.state()["Status"]
"""Get container status."""
return self.state()[DockerStateKeys.STATUS]

def shutdown(self):
"""Stop and remove the container."""
self.stop()
self.remove()

def execute(self, command):
eid = self.create_exec(command)
return self.start_exec(eid)
"""Execute a command in the container."""
result = self.container.exec_run(command)
return result.output

def wait(self, timeout):
return self.client.wait(self.id, timeout)
"""Wait for the container to stop."""
return self.container.wait(timeout=timeout)


class TestCluster():
"""Test cluster management using modern Docker SDK."""

def __init__(self, name, working_dir, config_file):
config_file_path = os.path.join(working_dir, config_file)
cfg_file = ConfigFile.from_filename(config_file_path)
c = ConfigDetails(working_dir, [cfg_file],)
self.cd = load(c)
self.name = name
self.config = ComposeConfig(working_dir, config_file)
self._project = None

def get_project(self):
# Dont reuse the client to fix this bug : https://github.com/docker/compose/issues/1275
client = docker_client(Environment())
project = Project.from_config(self.name, self.cd, client)
return project
"""Get the compose project, creating a new client each time to avoid issues."""
# Create a new client each time to avoid reuse issues
client = create_docker_client()
self._project = ComposeProject(self.name, self.config, client)
return self._project

def start(self):
"""Start all services in the cluster."""
self.shutdown()
self.get_project().up()

def is_running(self):
state = [container.is_running for container in self.get_project().containers()]
return all(state) and len(state) > 0
"""Check if all services in the cluster are running."""
containers = self.get_project().containers()
if not containers:
return False
return all(container.is_running for container in containers)

def is_service_running(self, service_name):
return self.get_container(service_name).is_running
"""Check if a specific service is running."""
try:
return self.get_container(service_name).is_running
except RuntimeError:
return False

def shutdown(self):
"""Shutdown all services in the cluster."""
project = self.get_project()
project.down(ImageType.none, True, True)
project.down(remove_volumes=True, remove_orphans=True)
project.remove_stopped()

def get_container(self, service_name, stopped=False):
"""Get a container for a specific service."""
if stopped:
containers = self.get_project().containers([service_name], stopped=True)
if containers:
return containers[0]
raise RuntimeError(f"No container found for service '{service_name}'")
return self.get_project().get_service(service_name).get_container()

def exit_code(self, service_name):
"""Get the exit code of a service container."""
containers = self.get_project().containers([service_name], stopped=True)
return containers[0].exit_code
if containers:
return containers[0].exit_code
return None

def wait(self, service_name, timeout):
container = self.get_project().containers([service_name], stopped=True)
if container[0].is_running:
return self.get_project().client.wait(container[0].id, timeout)
"""Wait for a service container to stop."""
containers = self.get_project().containers([service_name], stopped=True)
if containers and containers[0].is_running:
return containers[0].wait(timeout)

def run_command_on_service(self, service_name, command):
"""Run a command on a specific service container."""
return self.run_command(command, self.get_container(service_name))

def service_logs(self, service_name, stopped=False):
"""Get logs from a service container."""
if stopped:
containers = self.get_project().containers([service_name], stopped=True)
print(containers[0].logs())
return containers[0].logs()
if containers:
logs = containers[0].logs()
print(logs)
return logs
return b''
else:
return self.get_container(service_name).logs()

def run_command(self, command, container):
print("Running %s on %s :" % (command, container))
eid = container.create_exec(command)
output = container.start_exec(eid)
print("\n%s " % output)
"""Run a command on a container."""
print("Running %s on %s :" % (command, container.name))
result = container.container.exec_run(command)
output = result.output
if isinstance(output, bytes):
print("\n%s " % output.decode(EncodingConstants.UTF8, errors=EncodingConstants.IGNORE_ERRORS))
else:
print("\n%s " % output)
return output

def run_command_on_all(self, command):
"""Run a command on all containers in the cluster."""
results = {}
for container in self.get_project().containers():
results[container.name_without_project] = self.run_command(command, container)

return results
Loading