Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 21, 2025
1 parent 7946349 commit 5708bee
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ def _get_threadpool_executor(instance: DagsterInstance):
),
)
.start_asset_daemon() # starts the daemon again for non-sensor tests
.with_current_time_advanced(seconds=30)
.evaluate_tick()
.assert_requested_runs( # next tick should request the remaining runs from the stopped tick since the cursor was reset
run_request(
asset_keys=["C", "D"], partition_key=day_partition_key(state.current_time, delta=1)
)
run_request(["A", "B"], partition_key=hour_partition_key(state.current_time, delta=24)),
),
),
]
Expand Down Expand Up @@ -337,7 +337,6 @@ def test_asset_daemon_with_threadpool_without_sensor(
)
@pytest.mark.parametrize("num_threads", [0, 4])
def test_asset_daemon_with_sensor(scenario: AssetDaemonScenario, num_threads: int) -> None:
# test_asset_daemon_with_sensor[0-cursor_reset_correctly]
with get_daemon_instance(
extra_overrides={
"auto_materialize": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import Any, Callable, NamedTuple, Optional, cast
from unittest import mock

import dagster._check as check
from dagster import AssetKey, DagsterInstance, RunRequest, RunsFilter
Expand Down Expand Up @@ -38,7 +39,6 @@
from dagster._core.scheduler.instigation import SensorInstigatorData, TickStatus
from dagster._core.storage.tags import PARTITION_NAME_TAG
from dagster._core.test_utils import freeze_time, wait_for_futures
from dagster._core.utils import InheritContextThreadPoolExecutor
from dagster._daemon.asset_daemon import (
_PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID,
_PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID,
Expand Down Expand Up @@ -202,21 +202,6 @@ def _evaluate_tick_daemon(
# start sensor if it hasn't started already
self.instance.start_sensor(sensor)

def _stop_sensor():
if sensor:
self.instance.stop_sensor(
sensor.get_remote_origin_id(), sensor.selector_id, sensor
)

def _stop_amp():
set_auto_materialize_paused(instance=self.instance, paused=True)

use_auto_materialize_sensors = self.instance.auto_materialize_use_sensors
if use_auto_materialize_sensors:
_stop_tick = _stop_sensor
else:
_stop_tick = _stop_amp

def _run_daemon():
amp_tick_futures = {}

Expand All @@ -236,14 +221,11 @@ def _run_daemon():
wait_for_futures(amp_tick_futures)

if stop_mid_iteration:
test_futures = {}
sensor_controller_threadpool = InheritContextThreadPoolExecutor(
max_workers=2,
thread_name_prefix="unit_test_worker",
)
test_futures["daemon"] = sensor_controller_threadpool.submit(_run_daemon)
test_futures["stop"] = sensor_controller_threadpool.submit(_stop_tick)
wait_for_futures(test_futures)
with mock.patch(
"dagster._daemon.asset_daemon.AssetDaemon._sensor_is_enabled"
) as sensor_enabled_mock:
sensor_enabled_mock.return_value = False
_run_daemon()
else:
_run_daemon()

Expand Down

0 comments on commit 5708bee

Please sign in to comment.