diff --git a/dagster-nomad/dagster_nomad/run_launcher.py b/dagster-nomad/dagster_nomad/run_launcher.py index 88fd10a..875f749 100644 --- a/dagster-nomad/dagster_nomad/run_launcher.py +++ b/dagster-nomad/dagster_nomad/run_launcher.py @@ -5,7 +5,7 @@ from typing import Any, ClassVar, Optional import httpx -from dagster import Field, StringSource +from dagster import Field, StringSource, BoolSource from dagster import _check as check from dagster._core.instance import T_DagsterInstance from dagster._core.launcher import LaunchRunContext, RunLauncher @@ -107,13 +107,14 @@ def __init__( inst_data: Optional[ConfigurableClassData] = None, *, docker_image: str | None = None, + prefix_by_code_location_name: bool = False, job_id: str, url: str, token: str | None = None, namespace: str | None = None, ): self._inst_data = inst_data - + self.prefix_by_code_location_name = prefix_by_code_location_name self.docker_image = docker_image self.nomad_job_id = job_id self.nomad_client = NomadClient(url, token, namespace) @@ -135,6 +136,11 @@ def config_type(cls) -> dict[str, Field]: is_required=False, description="The docker image to be used if the repository does not specify one.", ), + "prefix_by_code_location_name": Field( + BoolSource, + is_required=False, + description="If true, Nomad job name will be prefixed by code location name", + ), "job_id": Field( StringSource, is_required=True, @@ -189,7 +195,14 @@ def launch_run(self, context: LaunchRunContext) -> None: payload = "\n".join(command) meta = {"IMAGE": docker_image} - dispatched_job_id = self.nomad_client.dispatch_job(self.nomad_job_id, payload=payload, meta=meta) + + job_id = self.nomad_job_id + if self.prefix_by_code_location_name: + job_id = "{}-{}".format( + context.dagster_run.remote_job_origin.repository_origin.code_location_origin.location_name, + self.nomad_job_id, + ) + dispatched_job_id = self.nomad_client.dispatch_job(job_id, payload=payload, meta=meta) self._instance.report_engine_event( message=f"Dispatched a new run for job `{self.nomad_job_id}` with dispatched_job_id `{dispatched_job_id}`", @@ -269,8 +282,7 @@ def resume_run(self, context: ResumeRunContext) -> None: self._instance.report_engine_event( message=( - f"Dispatched a new resume_run for job `{self.nomad_job_id}`" - f"with dispatched_job_id `{dispatched_job_id}`" + f"Dispatched a new resume_run for job `{self.nomad_job_id}`with dispatched_job_id `{dispatched_job_id}`" ), dagster_run=run, cls=self.__class__, diff --git a/dagster-nomad/dagster_nomad_test/conftest.py b/dagster-nomad/dagster_nomad_test/conftest.py index ea2dcf9..6c3ff0f 100644 --- a/dagster-nomad/dagster_nomad_test/conftest.py +++ b/dagster-nomad/dagster_nomad_test/conftest.py @@ -4,6 +4,7 @@ Most of the code is adapted from dagster-aws See: dagster-aws/dagster_aws_tests/ecs_tests/launcher_tests/conftest.py """ + from typing import Iterator import pytest diff --git a/dagster-nomad/pyproject.toml b/dagster-nomad/pyproject.toml index 836ee90..e528cbf 100644 --- a/dagster-nomad/pyproject.toml +++ b/dagster-nomad/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dagster-nomad" -version = "0.0.1" +version = "0.1.0" description = "Dagster integration library for Nomad" readme = "README.md" authors = [ diff --git a/dagster/dagster.yaml b/dagster/dagster.yaml index 6ed0b01..5eeb442 100644 --- a/dagster/dagster.yaml +++ b/dagster/dagster.yaml @@ -7,11 +7,14 @@ run_launcher: module: dagster_nomad class: NomadRunLauncher config: + # if true, code location name will used as prefix, followed by a - + prefix_by_code_location_name: true job_id: env: DAGSTER_NOMAD_JOB_ID url: env: DAGSTER_NOMAD_URL + compute_logs: module: dagster_aws.s3.compute_log_manager class: S3ComputeLogManager diff --git a/justfile b/justfile index 26ceffc..d8b3b72 100644 --- a/justfile +++ b/justfile @@ -15,7 +15,7 @@ setup-dev: uv sync quality-format: - uv run ruff format --fix-only --exit-zero . + uv run ruff format uv run black . quality-check: diff --git a/tf/dagster-executor.hcl b/tf/dagster-executor.hcl index 800cce1..4f62d9a 100644 --- a/tf/dagster-executor.hcl +++ b/tf/dagster-executor.hcl @@ -1,4 +1,4 @@ -job "dagster-executor" { +job "${name}" { datacenters = ["dc1"] type = "batch" diff --git a/tf/job.tf b/tf/job.tf index 266c204..e69dcb1 100644 --- a/tf/job.tf +++ b/tf/job.tf @@ -1,10 +1,19 @@ -resource "nomad_job" "dagster-executor" { - hcl2 { - enabled = true - } +resource "nomad_job" "main-dagster-executor" { jobspec = templatefile( "dagster-executor.hcl", { + name = "main-dagster-executor", + docker_registry = local.docker_registry, + custom = local.custom + } + ) +} + +resource "nomad_job" "demo-dagster-executor" { + jobspec = templatefile( + "dagster-executor.hcl", + { + name = "demo-dagster-executor", docker_registry = local.docker_registry, custom = local.custom } diff --git a/tf/main-dagster-executor.hcl b/tf/main-dagster-executor.hcl new file mode 100644 index 0000000..d7848a2 --- /dev/null +++ b/tf/main-dagster-executor.hcl @@ -0,0 +1,34 @@ +job "main-dagster-executor" { + datacenters = ["dc1"] + type = "batch" + + parameterized { + payload = "required" + meta_required = ["IMAGE"] + } + + task "server" { + driver = "docker" + + dispatch_payload { + file = "input.txt" + } + + config { + image = "$${NOMAD_META_IMAGE}" + entrypoint = [ "/dagster_job.sh" ] + args = [ "$${NOMAD_TASK_DIR}/input.txt" ] + } + + env { + %{~ for key, value in custom } + ${key} = "${value}" + %{ endfor ~} + } + + resources { + cpu = 1000 # MHz + memory = 2048 # MB + } + } +} diff --git a/user_code/defs.py b/user_code/defs.py index bda82ea..5b25edc 100644 --- a/user_code/defs.py +++ b/user_code/defs.py @@ -5,6 +5,7 @@ from dagster import Definitions, load_assets_from_modules from . import assets +from .assets import sample_job S3_IO_MANAGER_CONFIG = { "s3_bucket": {"env": "DAGSTER_S3_BUCKET"}, @@ -18,6 +19,7 @@ defs = Definitions( assets=currency_assets, + jobs=[sample_job], resources={ "io_manager": s3_pickle_io_manager.configured(S3_IO_MANAGER_CONFIG), "s3": s3_resource.configured(S3_SESSION_CONFIG), diff --git a/user_code_demo/__init__.py b/user_code_demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/user_code_demo/assets.py b/user_code_demo/assets.py new file mode 100644 index 0000000..0564241 --- /dev/null +++ b/user_code_demo/assets.py @@ -0,0 +1,27 @@ +import io + +import httpx +import pandas as pd +from dagster import asset, Output, define_asset_job + + +EURO_FX_REF_CSV_FILE_URL = "https://www.ecb.europa.eu/stats/eurofxref/eurofxref-hist.zip" + + +@asset +def demo_euro_fx_ref_csv_file() -> Output[bytes]: + res = httpx.get(EURO_FX_REF_CSV_FILE_URL) + res.raise_for_status() + return Output(res.content) + + +@asset +def demo_currencies_rates(demo_euro_fx_ref_csv_file: bytes) -> Output[pd.DataFrame]: + df = pd.read_csv(io.BytesIO(demo_euro_fx_ref_csv_file), sep=",", compression="zip") + df.drop(columns=[c for c in df.columns if c.startswith("Unnamed")], inplace=True) + return Output(df) + + +sample_job = define_asset_job( + "demo_sample_job", selection=[demo_euro_fx_ref_csv_file, demo_currencies_rates], tags={"executor": "sample"} +) diff --git a/user_code_demo/defs.py b/user_code_demo/defs.py new file mode 100644 index 0000000..5b25edc --- /dev/null +++ b/user_code_demo/defs.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from dagster_aws.s3 import s3_pickle_io_manager, s3_resource + +from dagster import Definitions, load_assets_from_modules + +from . import assets +from .assets import sample_job + +S3_IO_MANAGER_CONFIG = { + "s3_bucket": {"env": "DAGSTER_S3_BUCKET"}, + "s3_prefix": "io_manager", +} +S3_SESSION_CONFIG = { + "endpoint_url": {"env": "DAGSTER_S3_ENDPOINT_URL"}, +} + +currency_assets = load_assets_from_modules([assets]) + +defs = Definitions( + assets=currency_assets, + jobs=[sample_job], + resources={ + "io_manager": s3_pickle_io_manager.configured(S3_IO_MANAGER_CONFIG), + "s3": s3_resource.configured(S3_SESSION_CONFIG), + }, +) diff --git a/workspace.yaml b/workspace.yaml index 518b8f3..923071b 100644 --- a/workspace.yaml +++ b/workspace.yaml @@ -2,9 +2,11 @@ load_from: - python_module: module_name: user_code.defs location_name: main - + - python_module: + module_name: user_code_demo.defs + location_name: demo # load_from: # - grpc_server: # host: localhost # port: 4266 -# location_name: "main" \ No newline at end of file +# location_name: "main"