Skip to content

Commit 7546e4f

Browse files
Merge branch 'main' into fix/stackdriver-read-run-id
2 parents 1c619fe + 452fed8 commit 7546e4f

9 files changed

Lines changed: 90 additions & 64 deletions

File tree

airflow-core/src/airflow/config_templates/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3226,7 +3226,7 @@ state_store:
32263226
description: |
32273227
Number of days to retain task state after their last update.
32283228
Rows older than this are removed when cleanup is triggered.
3229-
This config does not affect asset_state rows.
3229+
This config does not affect asset_state_store rows.
32303230
Set to 0 to disable time-based cleanup entirely.
32313231
version_added: 3.3.0
32323232
type: integer
@@ -3235,7 +3235,7 @@ state_store:
32353235
state_cleanup_batch_size:
32363236
description: |
32373237
Number of rows deleted per batch during cleanup. Defaults to 0 (no batching).
3238-
Tune this on deployments with large task_state tables to improve performance per transaction.
3238+
Tune this on deployments with large task_state_store tables to improve performance per transaction.
32393239
version_added: 3.3.0
32403240
type: integer
32413241
example: "10000"

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state_store.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def _create_asset(session) -> AssetModel:
4242
return asset
4343

4444

45-
def _create_asset_state(session, asset_id: int, key: str, value: str) -> None:
45+
def _create_asset_state_store_row(session, asset_id: int, key: str, value: str) -> None:
4646
row = AssetStateStoreModel(asset_id=asset_id, key=key, value=json.dumps(value))
4747
session.add(row)
4848
session.flush()
@@ -91,8 +91,8 @@ def test_returns_empty_list_when_no_state(self, test_client):
9191
assert response.json() == {"asset_state_store": [], "total_entries": 0}
9292

9393
def test_returns_all_keys(self, test_client):
94-
_create_asset_state(self._session, self.asset.id, "watermark", "2026-05-01")
95-
_create_asset_state(self._session, self.asset.id, "file_count", "42")
94+
_create_asset_state_store_row(self._session, self.asset.id, "watermark", "2026-05-01")
95+
_create_asset_state_store_row(self._session, self.asset.id, "file_count", "42")
9696
self._session.commit()
9797

9898
response = test_client.get(self._base_url)
@@ -103,7 +103,7 @@ def test_returns_all_keys(self, test_client):
103103
assert keys == {"watermark": "2026-05-01", "file_count": "42"}
104104

105105
def test_returns_metadata_fields(self, test_client):
106-
_create_asset_state(self._session, self.asset.id, "watermark", "2026-05-01")
106+
_create_asset_state_store_row(self._session, self.asset.id, "watermark", "2026-05-01")
107107
self._session.commit()
108108

109109
item = test_client.get(self._base_url).json()["asset_state_store"][0]
@@ -134,7 +134,7 @@ def test_last_updated_by_returned_when_set(self, test_client, create_task_instan
134134

135135
def test_pagination_limit(self, test_client):
136136
for k in ("watermark", "file_count", "last_run"):
137-
_create_asset_state(self._session, self.asset.id, k, "v")
137+
_create_asset_state_store_row(self._session, self.asset.id, k, "v")
138138
self._session.commit()
139139

140140
response = test_client.get(f"{self._base_url}?limit=2")
@@ -144,7 +144,7 @@ def test_pagination_limit(self, test_client):
144144

145145
def test_pagination_offset(self, test_client):
146146
for k in ("watermark", "file_count", "last_run"):
147-
_create_asset_state(self._session, self.asset.id, k, "v")
147+
_create_asset_state_store_row(self._session, self.asset.id, k, "v")
148148
self._session.commit()
149149

150150
response = test_client.get(f"{self._base_url}?limit=2&offset=2")
@@ -158,7 +158,7 @@ def test_unauthorized_returns_401(self, unauthenticated_test_client):
158158

159159
class TestGetAssetState(TestAssetStateEndpoint):
160160
def test_returns_value(self, test_client):
161-
_create_asset_state(self._session, self.asset.id, "watermark", "2026-05-01")
161+
_create_asset_state_store_row(self._session, self.asset.id, "watermark", "2026-05-01")
162162
self._session.commit()
163163

164164
response = test_client.get(f"{self._base_url}/watermark")
@@ -196,7 +196,7 @@ def test_missing_key_returns_404(self, test_client):
196196

197197
def test_key_with_slash_is_supported(self, test_client):
198198
"""Keys containing slashes must work — route uses {key:path}."""
199-
_create_asset_state(self._session, self.asset.id, "partition/date", "2026-05-01")
199+
_create_asset_state_store_row(self._session, self.asset.id, "partition/date", "2026-05-01")
200200
self._session.commit()
201201

202202
response = test_client.get(f"{self._base_url}/partition/date")
@@ -274,7 +274,7 @@ def test_core_api_write_read_roundtrip(self, test_client, value):
274274
@pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"], "hello"])
275275
def test_worker_write_core_api_read_roundtrip(self, test_client, value):
276276
"""Worker write (json.dumps in DB) then Core API read returns native value."""
277-
_create_asset_state(self._session, self.asset.id, "k", value)
277+
_create_asset_state_store_row(self._session, self.asset.id, "k", value)
278278
self._session.commit()
279279
assert test_client.get(f"{self._base_url}/k").json()["value"] == value
280280

@@ -292,7 +292,7 @@ def test_unauthorized_returns_401(self, unauthenticated_test_client):
292292

293293
class TestDeleteAssetState(TestAssetStateEndpoint):
294294
def test_deletes_key(self, test_client):
295-
_create_asset_state(self._session, self.asset.id, "watermark", "2026-05-01")
295+
_create_asset_state_store_row(self._session, self.asset.id, "watermark", "2026-05-01")
296296
self._session.commit()
297297

298298
assert test_client.delete(f"{self._base_url}/watermark").status_code == 204
@@ -302,8 +302,8 @@ def test_delete_noop_for_missing_key(self, test_client):
302302
assert test_client.delete(f"{self._base_url}/nonexistent").status_code == 204
303303

304304
def test_only_deletes_target_key(self, test_client):
305-
_create_asset_state(self._session, self.asset.id, "watermark", "a")
306-
_create_asset_state(self._session, self.asset.id, "file_count", "b")
305+
_create_asset_state_store_row(self._session, self.asset.id, "watermark", "a")
306+
_create_asset_state_store_row(self._session, self.asset.id, "file_count", "b")
307307
self._session.commit()
308308

309309
test_client.delete(f"{self._base_url}/watermark")
@@ -312,7 +312,7 @@ def test_only_deletes_target_key(self, test_client):
312312
assert test_client.get(f"{self._base_url}/file_count").json()["value"] == "b"
313313

314314
def test_key_with_slash_is_supported(self, test_client):
315-
_create_asset_state(self._session, self.asset.id, "partition/date", "v")
315+
_create_asset_state_store_row(self._session, self.asset.id, "partition/date", "v")
316316
self._session.commit()
317317

318318
assert test_client.delete(f"{self._base_url}/partition/date").status_code == 204
@@ -325,7 +325,7 @@ def test_unauthorized_returns_401(self, unauthenticated_test_client):
325325
class TestClearAssetState(TestAssetStateEndpoint):
326326
def test_clears_all_keys(self, test_client):
327327
for k, v in [("watermark", "a"), ("file_count", "b"), ("last_run", "c")]:
328-
_create_asset_state(self._session, self.asset.id, k, v)
328+
_create_asset_state_store_row(self._session, self.asset.id, k, v)
329329
self._session.commit()
330330

331331
assert test_client.delete(self._base_url).status_code == 204
@@ -338,8 +338,8 @@ def test_clear_does_not_affect_other_assets(self, test_client):
338338
other_asset = AssetModel(uri="s3://other/asset", name="other_asset", group="test")
339339
self._session.add(other_asset)
340340
self._session.flush()
341-
_create_asset_state(self._session, self.asset.id, "watermark", "mine")
342-
_create_asset_state(self._session, other_asset.id, "watermark", "theirs")
341+
_create_asset_state_store_row(self._session, self.asset.id, "watermark", "mine")
342+
_create_asset_state_store_row(self._session, other_asset.id, "watermark", "theirs")
343343
self._session.commit()
344344

345345
test_client.delete(self._base_url)
@@ -348,7 +348,7 @@ def test_clear_does_not_affect_other_assets(self, test_client):
348348
assert test_client.get(f"{other_url}/watermark").json()["value"] == "theirs"
349349

350350
def test_clears_slash_keyed_entries(self, test_client):
351-
_create_asset_state(self._session, self.asset.id, "partition/date", "v")
351+
_create_asset_state_store_row(self._session, self.asset.id, "partition/date", "v")
352352
self._session.commit()
353353

354354
assert test_client.delete(self._base_url).status_code == 204
@@ -372,7 +372,7 @@ class TestRoutesNeverCallCustomBackend(TestAssetStateEndpoint):
372372
],
373373
)
374374
def test_route_never_calls_get_state_backend(self, test_client, method, path_suffix, kwargs):
375-
_create_asset_state(self._session, self.asset.id, "watermark", "v1")
375+
_create_asset_state_store_row(self._session, self.asset.id, "watermark", "v1")
376376
self._session.commit()
377377

378378
with patch("airflow.state.get_state_backend") as mock_get_backend:

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state_store.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def _create_dag_run(dag_maker, session):
5555
session.commit()
5656

5757

58-
def _create_task_state(session, key: str, value: str, dag_run: DagRun) -> None:
58+
def _create_task_state_store_row(session, key: str, value: str, dag_run: DagRun) -> None:
5959
row = TaskStateStoreModel(
6060
dag_run_id=dag_run.id,
6161
dag_id=DAG_ID,
@@ -94,8 +94,8 @@ def test_returns_empty_list_when_no_state(self, test_client):
9494
assert response.json() == {"task_state_store": [], "total_entries": 0}
9595

9696
def test_returns_all_keys(self, test_client):
97-
_create_task_state(self._session, "job_id", "spark_001", self.dag_run)
98-
_create_task_state(self._session, "checkpoint", "step_3", self.dag_run)
97+
_create_task_state_store_row(self._session, "job_id", "spark_001", self.dag_run)
98+
_create_task_state_store_row(self._session, "checkpoint", "step_3", self.dag_run)
9999
self._session.commit()
100100

101101
response = test_client.get(BASE_URL)
@@ -106,7 +106,7 @@ def test_returns_all_keys(self, test_client):
106106
assert keys == {"job_id": "spark_001", "checkpoint": "step_3"}
107107

108108
def test_returns_state_metadata_fields(self, test_client):
109-
_create_task_state(self._session, "job_id", "spark_001", self.dag_run)
109+
_create_task_state_store_row(self._session, "job_id", "spark_001", self.dag_run)
110110
self._session.commit()
111111

112112
response = test_client.get(BASE_URL)
@@ -133,7 +133,7 @@ def test_map_index_isolation(self, test_client):
133133

134134
def test_pagination_limit(self, test_client):
135135
for k in ("a", "b", "c"):
136-
_create_task_state(self._session, k, "v", self.dag_run)
136+
_create_task_state_store_row(self._session, k, "v", self.dag_run)
137137
self._session.commit()
138138

139139
response = test_client.get(f"{BASE_URL}?limit=2")
@@ -143,7 +143,7 @@ def test_pagination_limit(self, test_client):
143143

144144
def test_pagination_offset(self, test_client):
145145
for k in ("a", "b", "c"):
146-
_create_task_state(self._session, k, "v", self.dag_run)
146+
_create_task_state_store_row(self._session, k, "v", self.dag_run)
147147
self._session.commit()
148148

149149
response = test_client.get(f"{BASE_URL}?limit=2&offset=2")
@@ -157,7 +157,7 @@ def test_unauthorized_returns_401(self, unauthenticated_test_client):
157157

158158
class TestGetTaskState(TestTaskStateEndpoint):
159159
def test_returns_value(self, test_client):
160-
_create_task_state(self._session, "job_id", "spark_001", self.dag_run)
160+
_create_task_state_store_row(self._session, "job_id", "spark_001", self.dag_run)
161161
self._session.commit()
162162

163163
response = test_client.get(f"{BASE_URL}/job_id")
@@ -172,7 +172,7 @@ def test_missing_key_returns_404(self, test_client):
172172

173173
def test_key_with_slash_is_supported(self, test_client):
174174
"""Keys containing slashes must work — route uses {key:path}."""
175-
_create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
175+
_create_task_state_store_row(self._session, "workflow/step_1", "v", self.dag_run)
176176
self._session.commit()
177177

178178
response = test_client.get(f"{BASE_URL}/workflow/step_1")
@@ -254,7 +254,7 @@ def test_core_api_write_read_roundtrip(self, test_client, value):
254254
@pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"], "hello"])
255255
def test_worker_write_core_api_read_roundtrip(self, test_client, value):
256256
"""Worker write (json.dumps in DB) then Core API read returns native value."""
257-
_create_task_state(self._session, "k", value, self.dag_run)
257+
_create_task_state_store_row(self._session, "k", value, self.dag_run)
258258
self._session.commit()
259259
assert test_client.get(f"{BASE_URL}/k").json()["value"] == value
260260

@@ -314,7 +314,7 @@ def test_unauthorized_returns_401(self, unauthenticated_test_client):
314314

315315
class TestPatchTaskState(TestTaskStateEndpoint):
316316
def test_patch_updates_value(self, test_client):
317-
_create_task_state(self._session, "job_id", "v1", self.dag_run)
317+
_create_task_state_store_row(self._session, "job_id", "v1", self.dag_run)
318318
self._session.commit()
319319

320320
assert test_client.patch(f"{BASE_URL}/job_id", json={"value": "v2"}).status_code == 200
@@ -332,12 +332,12 @@ def test_patch_missing_key_returns_404(self, test_client):
332332
assert test_client.patch(f"{BASE_URL}/nonexistent", json={"value": "v"}).status_code == 404
333333

334334
def test_patch_empty_body_returns_422(self, test_client):
335-
_create_task_state(self._session, "job_id", "v", self.dag_run)
335+
_create_task_state_store_row(self._session, "job_id", "v", self.dag_run)
336336
self._session.commit()
337337
assert test_client.patch(f"{BASE_URL}/job_id", json={}).status_code == 422
338338

339339
def test_patch_null_value_returns_422(self, test_client):
340-
_create_task_state(self._session, "job_id", "v", self.dag_run)
340+
_create_task_state_store_row(self._session, "job_id", "v", self.dag_run)
341341
self._session.commit()
342342
assert test_client.patch(f"{BASE_URL}/job_id", json={"value": None}).status_code == 422
343343

@@ -356,7 +356,7 @@ def test_patch_non_finite_float_rejected_by_validator(self, bad_value):
356356
],
357357
)
358358
def test_patch_stores_json_encoded_value(self, test_client, value, expected_db):
359-
_create_task_state(self._session, "job_id", "initial", self.dag_run)
359+
_create_task_state_store_row(self._session, "job_id", "initial", self.dag_run)
360360
self._session.commit()
361361
test_client.patch(f"{BASE_URL}/job_id", json={"value": value})
362362
row = self._session.scalar(
@@ -376,7 +376,7 @@ def test_unauthorized_returns_401(self, unauthenticated_test_client):
376376

377377
class TestDeleteTaskState(TestTaskStateEndpoint):
378378
def test_deletes_key(self, test_client):
379-
_create_task_state(self._session, "job_id", "spark_001", self.dag_run)
379+
_create_task_state_store_row(self._session, "job_id", "spark_001", self.dag_run)
380380
self._session.commit()
381381

382382
assert test_client.delete(f"{BASE_URL}/job_id").status_code == 204
@@ -386,8 +386,8 @@ def test_delete_noop_for_missing_key(self, test_client):
386386
assert test_client.delete(f"{BASE_URL}/nonexistent").status_code == 204
387387

388388
def test_only_deletes_target_key(self, test_client):
389-
_create_task_state(self._session, "job_id", "a", self.dag_run)
390-
_create_task_state(self._session, "checkpoint", "b", self.dag_run)
389+
_create_task_state_store_row(self._session, "job_id", "a", self.dag_run)
390+
_create_task_state_store_row(self._session, "checkpoint", "b", self.dag_run)
391391
self._session.commit()
392392

393393
test_client.delete(f"{BASE_URL}/job_id")
@@ -396,7 +396,7 @@ def test_only_deletes_target_key(self, test_client):
396396
assert test_client.get(f"{BASE_URL}/checkpoint").json()["value"] == "b"
397397

398398
def test_key_with_slash_is_supported(self, test_client):
399-
_create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
399+
_create_task_state_store_row(self._session, "workflow/step_1", "v", self.dag_run)
400400
self._session.commit()
401401

402402
assert test_client.delete(f"{BASE_URL}/workflow/step_1").status_code == 204
@@ -409,7 +409,7 @@ def test_unauthorized_returns_401(self, unauthenticated_test_client):
409409
class TestClearTaskState(TestTaskStateEndpoint):
410410
def test_clears_all_keys(self, test_client):
411411
for k, v in [("job_id", "a"), ("checkpoint", "b"), ("retry_count", "c")]:
412-
_create_task_state(self._session, k, v, self.dag_run)
412+
_create_task_state_store_row(self._session, k, v, self.dag_run)
413413
self._session.commit()
414414

415415
assert test_client.delete(BASE_URL).status_code == 204
@@ -442,7 +442,7 @@ def test_all_map_indices_clears_across_mapped_instances(self, test_client):
442442
assert test_client.get(f"{BASE_URL}?map_index=1").json()["total_entries"] == 0
443443

444444
def test_key_with_slash_is_supported(self, test_client):
445-
_create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
445+
_create_task_state_store_row(self._session, "workflow/step_1", "v", self.dag_run)
446446
self._session.commit()
447447

448448
assert test_client.delete(BASE_URL).status_code == 204
@@ -467,7 +467,7 @@ class TestRoutesNeverCallCustomBackend(TestTaskStateEndpoint):
467467
],
468468
)
469469
def test_route_never_calls_get_state_backend(self, test_client, method, path, kwargs):
470-
_create_task_state(self._session, "job_id", "v1", self.dag_run)
470+
_create_task_state_store_row(self._session, "job_id", "v1", self.dag_run)
471471
self._session.commit()
472472

473473
with patch("airflow.state.get_state_backend") as mock_get_backend:

providers/apache/spark/docs/operators.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ independently on the cluster. If the Airflow worker dies while the Spark job is
190190
Airflow loses track of it and the behaviour to submit a brand new job would be wasting
191191
the compute already done or even cause conflicts if the Spark job itself is not designed to be idempotent.
192192

193-
Now, the ``SparkSubmitOperator`` solves this by persisting the driver ID to ``task_state`` immediately after
193+
Now, the ``SparkSubmitOperator`` solves this by persisting the driver ID to ``task_state_store`` immediately after
194194
submission. On retry, it reads the ID back and reconnects to the already-running driver instead of
195195
resubmitting.
196196

@@ -212,7 +212,7 @@ The reconnection polling calls the Spark standalone REST API
212212
See :doc:`connections/spark-submit` for how to configure these fields.
213213

214214
.. note::
215-
Crash recovery in cluster mode requires Airflow 3.3+ (``task_state`` support). On earlier
215+
Crash recovery in cluster mode requires Airflow 3.3+ (``task_state_store`` support). On earlier
216216
versions the operator falls back to the previous behavior of always submitting fresh.
217217

218218
Tracking driver status via Kubernetes API

providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
# ResumableJobMixin does not exist in Airflow 2, so we need to add a stub to make it
3838
# behave as before
3939
class ResumableJobMixin: # type: ignore[no-redef]
40-
"""Airflow 2 stub — no task_state, always submits fresh."""
40+
"""Airflow 2 stub — no task_state_store, always submits fresh."""
4141

4242
external_id_key: str = "remote_job_id"
4343

@@ -264,7 +264,7 @@ def execute(self, context: Context) -> None:
264264
if hook._should_track_driver_status:
265265
if self.reconnect_on_retry:
266266
return self.execute_resumable(context)
267-
# reconnect_on_retry=False: still submit-and-poll, just skip task_state persistence.
267+
# reconnect_on_retry=False: still submit-and-poll, just skip task_state_store persistence.
268268
driver_id = self.submit_job(context)
269269
self.poll_until_complete(driver_id, context)
270270
return self.get_job_result(driver_id, context)
@@ -284,7 +284,7 @@ def execute(self, context: Context) -> None:
284284
hook._validate_yarn_track_via_rm_api_config()
285285
if self.reconnect_on_retry:
286286
return self.execute_resumable(context)
287-
# reconnect_on_retry=False: still submit-and-poll, just skip task_state persistence.
287+
# reconnect_on_retry=False: still submit-and-poll, just skip task_state_store persistence.
288288
driver_id = self.submit_job(context)
289289
self.poll_until_complete(driver_id, context)
290290
return self.get_job_result(driver_id, context)

0 commit comments

Comments
 (0)