Skip to content

Commit

Permalink
feat: graphql add Run.hasRunMetricsEnabled field
Browse files Browse the repository at this point in the history
  • Loading branch information
mlarose committed Sep 23, 2024
1 parent c3ea316 commit ec2435b
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
RunRecord,
RunsFilter,
)
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, RUN_METRIC_TAGS, TagType, get_tag_type
from dagster._core.workspace.permissions import Permissions
from dagster._utils.tags import get_boolean_tag_value
from dagster._utils.yaml_utils import dump_run_config_yaml

from dagster_graphql.implementation.events import from_event_record, iterate_metadata_entries
Expand Down Expand Up @@ -80,6 +81,11 @@
if TYPE_CHECKING:
from dagster_graphql.schema.partition_sets import GrapheneJobSelectionPartition

UNSTARTED_STATUSES = [
DagsterRunStatus.QUEUED,
DagsterRunStatus.NOT_STARTED,
DagsterRunStatus.STARTING,
]

STARTED_STATUSES = {
DagsterRunStatus.STARTED,
Expand Down Expand Up @@ -385,6 +391,7 @@ class GrapheneRun(graphene.ObjectType):
hasConcurrencyKeySlots = graphene.NonNull(graphene.Boolean)
rootConcurrencyKeys = graphene.List(graphene.NonNull(graphene.String))
hasUnconstrainedRootNodes = graphene.NonNull(graphene.Boolean)
hasRunMetricsEnabled = graphene.NonNull(graphene.Boolean)

class Meta:
interfaces = (GraphenePipelineRun, GrapheneRunsFeedEntry)
Expand Down Expand Up @@ -622,6 +629,16 @@ def resolve_rootConcurrencyKeys(self, graphene_info: ResolveInfo):
root_concurrency_keys.extend([concurrency_key] * count)
return root_concurrency_keys

def resolve_hasRunMetricsEnabled(self, graphene_info: ResolveInfo):
if self.dagster_run.status in UNSTARTED_STATUSES:
return False

run_tags = self.dagster_run.tags
for tag in RUN_METRIC_TAGS:
if get_boolean_tag_value(run_tags.get(tag)):
return True
return False


class GrapheneIPipelineSnapshotMixin:
# Mixin this class to implement IPipelineSnapshot
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import tempfile

import pytest
import yaml
from dagster import AssetMaterialization, Output, job, op, repository
from dagster._core.definitions.job_base import InMemoryJob
Expand Down Expand Up @@ -273,6 +274,22 @@
}
"""

RUN_METRICS_QUERY = """
query RunQuery($runId: ID!) {
pipelineRunOrError(runId: $runId) {
__typename
... on Run {
runId
tags {
key
value
}
hasRunMetricsEnabled
}
}
}
"""


def _get_runs_data(result, run_id):
for run_data in result.data["pipelineOrError"]["runs"]:
Expand Down Expand Up @@ -931,3 +948,47 @@ def test_run_has_concurrency_slots():
assert result.data["pipelineRunsOrError"]["results"][0]["runId"] == run_id
assert result.data["pipelineRunsOrError"]["results"][0]["hasConcurrencyKeySlots"]
assert result.data["pipelineRunsOrError"]["results"][0]["rootConcurrencyKeys"]


@pytest.mark.parametrize(
"run_tag, run_tag_value, run_metrics_enabled, failure_message",
[
(None, None, False, "run_metrics tag not present should result to false"),
(".dagster/run_metrics", "true", True, "run_metrics tag set to true should result to true"),
(
".dagster/run_metrics",
"false",
False,
"run_metrics tag set to falsy value should result to false",
),
(
"dagster/run_metrics",
"true",
True,
"public run_metrics tag set to true should result to true",
),
],
)
def test_run_has_run_metrics_enabled(run_tag, run_tag_value, run_metrics_enabled, failure_message):
with instance_for_test() as instance:
repo = get_repo_at_time_1()
tags = (
{
run_tag: run_tag_value,
}
if run_tag
else {}
)
run = instance.create_run_for_job(
repo.get_job("foo_job"), status=DagsterRunStatus.STARTED, tags=tags
)

with define_out_of_process_context(__file__, "get_repo_at_time_1", instance) as context:
result = execute_dagster_graphql(
context,
RUN_METRICS_QUERY,
variables={"runId": run.run_id},
)
assert result.data
has_run_metrics_enabled = result.data["pipelineRunOrError"]["hasRunMetricsEnabled"]
assert has_run_metrics_enabled == run_metrics_enabled, failure_message
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
]

# Supports for the public tag is deprecated
RUN_METRIC_TAGS = [
f"{HIDDEN_TAG_PREFIX}run_metrics",
f"{SYSTEM_TAG_PREFIX}run_metrics",
]


class TagType(Enum):
# Custom tag provided by a user
Expand Down

0 comments on commit ec2435b

Please sign in to comment.