-
Notifications
You must be signed in to change notification settings - Fork 11
Support ML async job cancellation, fail jobs on redis errors #1162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 40 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
b60eab0
merge
carlos-irreverentlabs 644927f
Merge remote-tracking branch 'upstream/main'
carlosgjs 218f7aa
Merge remote-tracking branch 'upstream/main'
carlosgjs 90da389
Merge remote-tracking branch 'upstream/main'
carlosgjs 8618d3c
Merge remote-tracking branch 'upstream/main'
carlosgjs bd1be5f
Merge remote-tracking branch 'upstream/main'
carlosgjs b102ae1
Merge remote-tracking branch 'upstream/main'
carlosgjs bc908aa
fix: PSv2 follow-up fixes from integration tests (#1135)
mihow 4c3802a
PSv2: Improve task fetching & web worker concurrency configuration (#…
carlosgjs b717e80
fix: include pipeline_slug in MinimalJobSerializer (#1148)
mihow 883c4f8
Merge remote-tracking branch 'upstream/main'
carlosgjs 8df89be
Avoid redis based locking by using atomic updates
carlosgjs e26f3c6
Merge remote-tracking branch 'upstream/main'
carlosgjs 1096fd9
Merge branch 'main' into carlosg/redisatomic
carlosgjs 30c8db3
Test concurrency
carlosgjs deea095
Increase max ack pending
carlosgjs 20c0fbd
update comment
carlosgjs e84421e
CR feedback
carlosgjs cbb2d7f
Cancel jobs if Redis state is missing
carlosgjs 3861190
Add chaos monkey
carlosgjs d591bd6
CR feedback
carlosgjs 4720bb6
CR 2
carlosgjs f0cd403
fix: OrderedEnum comparisons now override str MRO in subclasses
mihow e3134a1
fix: correct misleading error log about NATS redelivery
mihow 41b1232
Merge branch 'carlosg/redisatomic' of github.com:uw-ssec/antenna into…
carlosgjs 94e1bbb
Use job.logger
carlosgjs dcf57fe
Use job.logger
carlosgjs 4a25e54
Integrate cancellation support
carlosgjs 654593b
Merge branch 'carlosg/redisatomic' into carlos/redisfail
carlosgjs 5d38d67
merge, update tests
carlosgjs ac90c2f
Remove pause support in monkey
carlosgjs 4eb763a
fix: cancel async jobs by cleaning up NATS/Redis and stopping task de…
mihow 8671214
fix(ui): hide Retry button while job is in CANCELING state
mihow b1146cc
fix: downgrade Redis-missing log to warning for canceled jobs
mihow dccaceb
docs: add async job monitoring reference
mihow d63be48
fix: update tests for active_states() guard on /tasks endpoint
mihow 4ef7a24
Merge branch 'RolnickLab:main' into main
mihow c389e90
Merge remote-tracking branch 'upstream/main'
carlosgjs 33a6425
Merge branch 'main' of github.com:uw-ssec/antenna
carlosgjs 934db1d
Merge branch 'main' into carlos/redisfail
carlosgjs f4d88ff
fix: improve job cancel ordering, fail status sync, and log handler s…
mihow 20e4ec2
fix: restore timeout on _stream_exists and use settings for NATS_URL
mihow cf18987
fix(ui): block retry button while job is in RETRY state
mihow f1bed5e
docs: clarify _stream_exists timeout propagation design
mihow a16fc05
docs: add language tag to fenced code block in monitoring guide
mihow File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| """ | ||
| Fault injection utility for manual chaos testing of ML async jobs. | ||
|
|
||
| Use alongside `test_ml_job_e2e` to verify job behaviour when Redis or NATS | ||
| becomes unavailable or loses state mid-processing. | ||
|
|
||
| Usage examples: | ||
|
|
||
| # Flush all Redis state immediately (simulates FLUSHDB mid-job) | ||
| python manage.py chaos_monkey flush redis | ||
|
|
||
| # Flush all NATS JetStream streams (simulates broker state loss) | ||
| python manage.py chaos_monkey flush nats | ||
| """ | ||
|
|
||
| from asgiref.sync import async_to_sync | ||
| from django.core.management.base import BaseCommand, CommandError | ||
| from django_redis import get_redis_connection | ||
|
|
||
| NATS_URL = "nats://ami_local_nats:4222" | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = "Inject faults into Redis or NATS for chaos/resilience testing" | ||
|
|
||
| def add_arguments(self, parser): | ||
| parser.add_argument( | ||
| "action", | ||
| choices=["flush"], | ||
| help="flush: wipe all state.", | ||
| ) | ||
| parser.add_argument( | ||
| "service", | ||
| choices=["redis", "nats"], | ||
| help="Target service to fault.", | ||
| ) | ||
|
|
||
| def handle(self, *args, **options): | ||
| action = options["action"] | ||
| service = options["service"] | ||
|
|
||
| if action == "flush" and service == "redis": | ||
| self._flush_redis() | ||
| elif action == "flush" and service == "nats": | ||
| self._flush_nats() | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Redis | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def _flush_redis(self): | ||
| self.stdout.write("Flushing Redis database (FLUSHDB)...") | ||
| try: | ||
| redis = get_redis_connection("default") | ||
| redis.flushdb() | ||
| self.stdout.write(self.style.SUCCESS("Redis flushed.")) | ||
| except Exception as e: | ||
| raise CommandError(f"Failed to flush Redis: {e}") from e | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # NATS | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def _flush_nats(self): | ||
| """Delete all JetStream streams via the NATS Python client.""" | ||
| self.stdout.write("Flushing all NATS JetStream streams...") | ||
|
|
||
| async def _delete_all_streams(): | ||
| import nats | ||
|
|
||
| nc = await nats.connect(NATS_URL, connect_timeout=5, allow_reconnect=False) | ||
| js = nc.jetstream() | ||
| try: | ||
| streams = await js.streams_info() | ||
| if not streams: | ||
| return [] | ||
| deleted = [] | ||
| for stream in streams: | ||
| name = stream.config.name | ||
| await js.delete_stream(name) | ||
| deleted.append(name) | ||
| return deleted | ||
| finally: | ||
| await nc.close() | ||
|
|
||
| try: | ||
| deleted = async_to_sync(_delete_all_streams)() | ||
| except Exception as e: | ||
| raise CommandError(f"Failed to flush NATS: {e}") from e | ||
|
|
||
| if deleted: | ||
| for name in deleted: | ||
| self.stdout.write(f" Deleted stream: {name}") | ||
| self.stdout.write(self.style.SUCCESS(f"Deleted {len(deleted)} stream(s).")) | ||
| else: | ||
| self.stdout.write("No streams found — NATS already empty.") | ||
mihow marked this conversation as resolved.
Show resolved
Hide resolved
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.