Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions dagster-nomad/dagster_nomad/run_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, ClassVar, Optional

import httpx
from dagster import Field, StringSource
from dagster import Field, StringSource, BoolSource
from dagster import _check as check
from dagster._core.instance import T_DagsterInstance
from dagster._core.launcher import LaunchRunContext, RunLauncher
Expand Down Expand Up @@ -107,13 +107,14 @@ def __init__(
inst_data: Optional[ConfigurableClassData] = None,
*,
docker_image: str | None = None,
prefix_by_code_location_name: bool = False,
job_id: str,
url: str,
token: str | None = None,
namespace: str | None = None,
):
self._inst_data = inst_data

self.prefix_by_code_location_name = prefix_by_code_location_name
self.docker_image = docker_image
self.nomad_job_id = job_id
self.nomad_client = NomadClient(url, token, namespace)
Expand All @@ -135,6 +136,11 @@ def config_type(cls) -> dict[str, Field]:
is_required=False,
description="The docker image to be used if the repository does not specify one.",
),
"prefix_by_code_location_name": Field(
BoolSource,
is_required=False,
description="If true, Nomad job name will be prefixed by code location name",
),
"job_id": Field(
StringSource,
is_required=True,
Expand Down Expand Up @@ -189,7 +195,14 @@ def launch_run(self, context: LaunchRunContext) -> None:
payload = "\n".join(command)

meta = {"IMAGE": docker_image}
dispatched_job_id = self.nomad_client.dispatch_job(self.nomad_job_id, payload=payload, meta=meta)

job_id = self.nomad_job_id
if self.prefix_by_code_location_name:
job_id = "{}-{}".format(
context.dagster_run.remote_job_origin.repository_origin.code_location_origin.location_name,
self.nomad_job_id,
)
dispatched_job_id = self.nomad_client.dispatch_job(job_id, payload=payload, meta=meta)

self._instance.report_engine_event(
message=f"Dispatched a new run for job `{self.nomad_job_id}` with dispatched_job_id `{dispatched_job_id}`",
Expand Down Expand Up @@ -269,8 +282,7 @@ def resume_run(self, context: ResumeRunContext) -> None:

self._instance.report_engine_event(
message=(
f"Dispatched a new resume_run for job `{self.nomad_job_id}`"
f"with dispatched_job_id `{dispatched_job_id}`"
f"Dispatched a new resume_run for job `{self.nomad_job_id}`with dispatched_job_id `{dispatched_job_id}`"
),
dagster_run=run,
cls=self.__class__,
Expand Down
1 change: 1 addition & 0 deletions dagster-nomad/dagster_nomad_test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Most of the code is adapted from dagster-aws
See: dagster-aws/dagster_aws_tests/ecs_tests/launcher_tests/conftest.py
"""

from typing import Iterator

import pytest
Expand Down
2 changes: 1 addition & 1 deletion dagster-nomad/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dagster-nomad"
version = "0.0.1"
version = "0.1.0"
description = "Dagster integration library for Nomad"
readme = "README.md"
authors = [
Expand Down
3 changes: 3 additions & 0 deletions dagster/dagster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ run_launcher:
module: dagster_nomad
class: NomadRunLauncher
config:
# if true, code location name will used as prefix, followed by a -
prefix_by_code_location_name: true
job_id:
env: DAGSTER_NOMAD_JOB_ID
url:
env: DAGSTER_NOMAD_URL


compute_logs:
module: dagster_aws.s3.compute_log_manager
class: S3ComputeLogManager
Expand Down
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ setup-dev:
uv sync

quality-format:
uv run ruff format --fix-only --exit-zero .
uv run ruff format
uv run black .

quality-check:
Expand Down
2 changes: 1 addition & 1 deletion tf/dagster-executor.hcl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
job "dagster-executor" {
job "${name}" {
datacenters = ["dc1"]
type = "batch"

Expand Down
17 changes: 13 additions & 4 deletions tf/job.tf
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
resource "nomad_job" "dagster-executor" {
hcl2 {
enabled = true
}
resource "nomad_job" "main-dagster-executor" {
jobspec = templatefile(
"dagster-executor.hcl",
{
name = "main-dagster-executor",
docker_registry = local.docker_registry,
custom = local.custom
}
)
}

resource "nomad_job" "demo-dagster-executor" {
jobspec = templatefile(
"dagster-executor.hcl",
{
name = "demo-dagster-executor",
docker_registry = local.docker_registry,
custom = local.custom
}
Expand Down
34 changes: 34 additions & 0 deletions tf/main-dagster-executor.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
job "main-dagster-executor" {
datacenters = ["dc1"]
type = "batch"

parameterized {
payload = "required"
meta_required = ["IMAGE"]
}

task "server" {
driver = "docker"

dispatch_payload {
file = "input.txt"
}

config {
image = "$${NOMAD_META_IMAGE}"
entrypoint = [ "/dagster_job.sh" ]
args = [ "$${NOMAD_TASK_DIR}/input.txt" ]
}

env {
%{~ for key, value in custom }
${key} = "${value}"
%{ endfor ~}
}

resources {
cpu = 1000 # MHz
memory = 2048 # MB
}
}
}
2 changes: 2 additions & 0 deletions user_code/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster import Definitions, load_assets_from_modules

from . import assets
from .assets import sample_job

S3_IO_MANAGER_CONFIG = {
"s3_bucket": {"env": "DAGSTER_S3_BUCKET"},
Expand All @@ -18,6 +19,7 @@

defs = Definitions(
assets=currency_assets,
jobs=[sample_job],
resources={
"io_manager": s3_pickle_io_manager.configured(S3_IO_MANAGER_CONFIG),
"s3": s3_resource.configured(S3_SESSION_CONFIG),
Expand Down
Empty file added user_code_demo/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions user_code_demo/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import io

import httpx
import pandas as pd
from dagster import asset, Output, define_asset_job


EURO_FX_REF_CSV_FILE_URL = "https://www.ecb.europa.eu/stats/eurofxref/eurofxref-hist.zip"


@asset
def demo_euro_fx_ref_csv_file() -> Output[bytes]:
res = httpx.get(EURO_FX_REF_CSV_FILE_URL)
res.raise_for_status()
return Output(res.content)


@asset
def demo_currencies_rates(demo_euro_fx_ref_csv_file: bytes) -> Output[pd.DataFrame]:
df = pd.read_csv(io.BytesIO(demo_euro_fx_ref_csv_file), sep=",", compression="zip")
df.drop(columns=[c for c in df.columns if c.startswith("Unnamed")], inplace=True)
return Output(df)


sample_job = define_asset_job(
"demo_sample_job", selection=[demo_euro_fx_ref_csv_file, demo_currencies_rates], tags={"executor": "sample"}
)
27 changes: 27 additions & 0 deletions user_code_demo/defs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from dagster_aws.s3 import s3_pickle_io_manager, s3_resource

from dagster import Definitions, load_assets_from_modules

from . import assets
from .assets import sample_job

S3_IO_MANAGER_CONFIG = {
"s3_bucket": {"env": "DAGSTER_S3_BUCKET"},
"s3_prefix": "io_manager",
}
S3_SESSION_CONFIG = {
"endpoint_url": {"env": "DAGSTER_S3_ENDPOINT_URL"},
}

currency_assets = load_assets_from_modules([assets])

defs = Definitions(
assets=currency_assets,
jobs=[sample_job],
resources={
"io_manager": s3_pickle_io_manager.configured(S3_IO_MANAGER_CONFIG),
"s3": s3_resource.configured(S3_SESSION_CONFIG),
},
)
6 changes: 4 additions & 2 deletions workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ load_from:
- python_module:
module_name: user_code.defs
location_name: main

- python_module:
module_name: user_code_demo.defs
location_name: demo
# load_from:
# - grpc_server:
# host: localhost
# port: 4266
# location_name: "main"
# location_name: "main"