-
Notifications
You must be signed in to change notification settings - Fork 13
[APMSP-1874] Add fork support by dropping runtime #1056
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[APMSP-1874] Add fork support by dropping runtime #1056
Conversation
b954af4
to
575db6c
Compare
BenchmarksComparisonBenchmark execution time: 2025-06-10 13:28:35 Comparing candidate commit 87d96ad in PR branch Found 2 performance improvements and 0 performance regressions! Performance is the same for 50 metrics, 2 unstable metrics. scenario:normalization/normalize_service/normalize_service/[empty string]
CandidateCandidate benchmark detailsGroup 1
Group 2
Group 3
Group 4
Group 5
Group 6
Group 7
Group 8
Group 9
Group 10
Group 11
Group 12
Group 13
BaselineOmitted due to size. |
3782462
to
c802b61
Compare
feat(data-pipeline): Rename stop to pause fix(data_pipeline): Remove on_pause in tests fix(data-pipeline): Use result in test fix(data-pipeline): Remove unit return type
c802b61
to
efc38ea
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1056 +/- ##
==========================================
- Coverage 71.05% 70.97% -0.09%
==========================================
Files 323 324 +1
Lines 49503 49723 +220
==========================================
+ Hits 35176 35291 +115
- Misses 14327 14432 +105
🚀 New features to boost your workflow:
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1056 +/- ##
==========================================
- Coverage 71.09% 71.03% -0.06%
==========================================
Files 334 335 +1
Lines 50889 51081 +192
==========================================
+ Hits 36178 36287 +109
- Misses 14711 14794 +83
🚀 New features to boost your workflow:
|
ddtelemetry/src/worker/mod.rs
Outdated
#[allow(dead_code)] | ||
#[derive(Debug)] | ||
struct TelemetryWorker<'a> { | ||
flavor: &'a TelemetryWorkerFlavor, | ||
config: &'a Config, | ||
mailbox: &'a mpsc::Receiver<TelemetryActions>, | ||
cancellation_token: &'a CancellationToken, | ||
seq_id: &'a AtomicU64, | ||
runtime_id: &'a String, | ||
deadlines: &'a scheduler::Scheduler<LifecycleAction>, | ||
data: &'a TelemetryWorkerData, | ||
} | ||
let Self { | ||
flavor, | ||
config, | ||
mailbox, | ||
cancellation_token, | ||
seq_id, | ||
runtime_id, | ||
client: _, | ||
deadlines, | ||
data, | ||
} = self; | ||
Debug::fmt( | ||
&TelemetryWorker { | ||
flavor, | ||
config, | ||
mailbox, | ||
cancellation_token, | ||
seq_id, | ||
runtime_id, | ||
deadlines, | ||
data, | ||
}, | ||
f, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this could be simpler
https://doc.rust-lang.org/std/fmt/struct.Formatter.html#method.debug_struct
#[allow(dead_code)] | |
#[derive(Debug)] | |
struct TelemetryWorker<'a> { | |
flavor: &'a TelemetryWorkerFlavor, | |
config: &'a Config, | |
mailbox: &'a mpsc::Receiver<TelemetryActions>, | |
cancellation_token: &'a CancellationToken, | |
seq_id: &'a AtomicU64, | |
runtime_id: &'a String, | |
deadlines: &'a scheduler::Scheduler<LifecycleAction>, | |
data: &'a TelemetryWorkerData, | |
} | |
let Self { | |
flavor, | |
config, | |
mailbox, | |
cancellation_token, | |
seq_id, | |
runtime_id, | |
client: _, | |
deadlines, | |
data, | |
} = self; | |
Debug::fmt( | |
&TelemetryWorker { | |
flavor, | |
config, | |
mailbox, | |
cancellation_token, | |
seq_id, | |
runtime_id, | |
deadlines, | |
data, | |
}, | |
f, | |
) | |
f.debug_struct("TelemetryWorker") | |
.field("flavor", self.flavor) | |
.field(...) | |
.finish() |
data-pipeline/src/pausable_worker.rs
Outdated
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
//! Defines a pausable worker to be able to stop background processes before forks | ||
use anyhow::{anyhow, Result}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use the internal errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
data-pipeline/src/pausable_worker.rs
Outdated
fn run(&mut self) -> impl std::future::Future<Output = ()> + Send; | ||
} | ||
|
||
impl Worker for StatsExporter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These impls should probably live with the respective workers and not here in the trait definition?
data-pipeline/src/pausable_worker.rs
Outdated
} | ||
} | ||
|
||
/// A pausable worker which can be paused and restarded on forks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// A pausable worker which can be paused and restarded on forks. | |
/// A pausable worker which can be paused and restarted on forks. |
data-pipeline/src/pausable_worker.rs
Outdated
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
//! Defines a pausable worker to be able to stop background processes before forks | ||
use anyhow::{anyhow, Result}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Self::Paused { worker } | ||
} | ||
|
||
/// Start the worker on the given runtime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the public functions, should we have more thorough rustdoc comments with examples? Or perhaps, just more thorough rustdoc comment for the module or enum that explains why this is necessary and when it should be used? If someone outside of our team needs to implement a worker in the future, do they have enough information to do so independently?
pub async fn pause(&mut self) -> Result<()> { | ||
match self { | ||
PausableWorker::Running { handle, stop_token } => { | ||
stop_token.cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a possible race condition here? If the task is already shutting down and pause is called can we wind up in an InvalidState when we don't want to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're safe since pause takes a mutable reference. handle.await()
only returns an error if the tokio task has been aborted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're safe since pause takes a mutable reference
Yes, which is why I wasn't concerned with pause()
being called multiple times. I was more concerned with a potential race condition when pause()
is called at the same time that a worker is shutting down outside of the pause workflow. If the task shuts down gracefully in this situation we don't want to be in InvalidState
because of timing.
handle.await() only returns an error if the tokio task has been aborted.
Looking at this some more, I agree that we are probably ok. handle.await()
is going to return Ok
in the scenario I described as long as it's a graceful shutdown. We'll be in an InvalidState
with a panic or abort...but that is what we want anyway.
InvalidState, | ||
} | ||
|
||
impl<T: Worker + Send + Sync + 'static> PausableWorker<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to impl Drop
? What happens if PausableWorker
goes out of scope? Are tokio tasks going to continue to run in the background? Should you be canceling the stop_token
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task is going to continue in the background until the runtime is dropped. I think it is fine to not cancel the token. Either way this should be described in the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a very strong opinion, but why is it ok for the task to continue? Is it not likely that workers can go out of scope but the runtime stick around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion either. The way I see it the PausableWorker is more of a handle to the worker which is running in the runtime.
data-pipeline/src/pausable_worker.rs
Outdated
if let Self::Running { .. } = self { | ||
Ok(()) | ||
} else if let Self::Paused { mut worker } = std::mem::replace(self, Self::InvalidState) { | ||
// Worker is temporarly in an invalid state, but since this block is failsafe it will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Worker is temporarly in an invalid state, but since this block is failsafe it will | |
// Worker is temporarily in an invalid state, but since this block is failsafe it will |
match self { | ||
PausableWorker::Running { handle, stop_token } => { | ||
stop_token.cancel(); | ||
if let Ok(worker) = handle.await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we have workers that don't frequently yield, or wind up deadlocked? Are we going to await forever? Or, if not forever, long enough where it causes problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I'll add a warning in doc and a timeout in stop_worker
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually tokio timeout won't work as it will be checked when the task yields so we'll have the same issue. I think we can keep the warning for now and add timeout as a follow-up item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what it's worth, tokio supports "cancellation tokens", which is how in profiling we stop tokio if folks e.g. ctrl+c and the profiler is uploading a profile on a slow connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pausable worker uses a cancellation token internally but we still have to wait for the worker to yield. I don't think there's a way force the worker to yield earlier though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's a way force the worker to yield earlier though.
I think it's possible, but would require a big refactor and add significant complexity to our workers. I don't think it's worth pursuing until we see evidence this solution isn't sufficient.
I think the only real problem is high-frequency forking apps. We need to support that situation. I guess the only way to do that is to ensure the workers are as efficient as possible and yield appropriately? I don't think there is anything that can be practically done in pause()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM on the approach. Just some suggested doc edits, request for more docs, and suggestion about errors.
93c9a63
to
da6dee5
Compare
3979219
to
17ba3b8
Compare
Artifact Size Benchmark Reportaarch64-alpine-linux-musl
aarch64-unknown-linux-gnu
libdatadog-x64-windows
libdatadog-x86-windows
x86_64-alpine-linux-musl
x86_64-unknown-linux-gnu
|
d4a857b
to
a85c21c
Compare
What does this PR do?
Add a PausableWorker to allow background workers using the tokio runtime to be paused and save their state to restart them later.
Motivation
To support forks in python we need to drop the tokio runtime before the fork to stop all the threads.
Additional Notes
Some changes have been made to the ddtelemetry crate to expose the internal worker.
How to test the change?
Tests have been added to the PausableWorker. The changes can also be tested using the NativeWriter in dd-trace-py !13071 by building ddtrace-native using this branch.