Skip to content

Commit 903b575

Browse files
Merge branch 'main' into implement-remote-execution-metrics
2 parents dbae9a5 + ea0e0ae commit 903b575

16 files changed

+268
-168
lines changed

nativelink-scheduler/src/api_worker_scheduler.rs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use nativelink_metric::{
2727
use nativelink_util::action_messages::{OperationId, WorkerId};
2828
use nativelink_util::operation_state_manager::{UpdateOperationType, WorkerStateManager};
2929
use nativelink_util::platform_properties::PlatformProperties;
30+
use nativelink_util::shutdown_guard::ShutdownGuard;
3031
use nativelink_util::spawn;
3132
use nativelink_util::task::JoinHandleDropGuard;
3233
use tokio::sync::Notify;
@@ -240,6 +241,7 @@ impl ApiWorkerSchedulerImpl {
240241
UpdateOperationType::UpdateWithError(err) => {
241242
(true, err.code == Code::ResourceExhausted)
242243
}
244+
UpdateOperationType::UpdateWithDisconnect => (true, false),
243245
};
244246

245247
// Update the operation in the worker state manager.
@@ -354,21 +356,30 @@ impl ApiWorkerSchedulerImpl {
354356
.notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
355357
.await;
356358

357-
if notify_worker_result.is_err() {
359+
if let Err(notify_worker_result) = notify_worker_result {
358360
warn!(
359361
?worker_id,
360362
?action_info,
361363
?notify_worker_result,
362364
"Worker command failed, removing worker",
363365
);
364366

367+
// A slightly nasty way of figuring out that the worker disconnected
368+
// from send_msg_to_worker without introducing complexity to the
369+
// code path from here to there.
370+
let is_disconnect = notify_worker_result.code == Code::Internal
371+
&& notify_worker_result.messages.len() == 1
372+
&& notify_worker_result.messages[0] == "Worker Disconnected";
373+
365374
let err = make_err!(
366375
Code::Internal,
367376
"Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
368377
);
369378

370-
return Result::<(), _>::Err(err.clone())
371-
.merge(self.immediate_evict_worker(&worker_id, err).await);
379+
return Result::<(), _>::Err(err.clone()).merge(
380+
self.immediate_evict_worker(&worker_id, err, is_disconnect)
381+
.await,
382+
);
372383
}
373384
} else {
374385
warn!(
@@ -386,19 +397,21 @@ impl ApiWorkerSchedulerImpl {
386397
&mut self,
387398
worker_id: &WorkerId,
388399
err: Error,
400+
is_disconnect: bool,
389401
) -> Result<(), Error> {
390402
let mut result = Ok(());
391403
if let Some(mut worker) = self.remove_worker(worker_id) {
392404
// We don't care if we fail to send message to worker, this is only a best attempt.
393405
drop(worker.notify_update(WorkerUpdate::Disconnect).await);
406+
let update = if is_disconnect {
407+
UpdateOperationType::UpdateWithDisconnect
408+
} else {
409+
UpdateOperationType::UpdateWithError(err)
410+
};
394411
for (operation_id, _) in worker.running_action_infos.drain() {
395412
result = result.merge(
396413
self.worker_state_manager
397-
.update_operation(
398-
&operation_id,
399-
worker_id,
400-
UpdateOperationType::UpdateWithError(err.clone()),
401-
)
414+
.update_operation(&operation_id, worker_id, update.clone())
402415
.await,
403416
);
404417
}
@@ -536,7 +549,7 @@ impl WorkerScheduler for ApiWorkerScheduler {
536549
.err_tip(|| "Error while adding worker, removing from pool");
537550
if let Err(err) = result {
538551
return Result::<(), _>::Err(err.clone())
539-
.merge(inner.immediate_evict_worker(&worker_id, err).await);
552+
.merge(inner.immediate_evict_worker(&worker_id, err, false).await);
540553
}
541554
Ok(())
542555
}
@@ -577,10 +590,32 @@ impl WorkerScheduler for ApiWorkerScheduler {
577590
.immediate_evict_worker(
578591
worker_id,
579592
make_err!(Code::Internal, "Received request to remove worker"),
593+
false,
580594
)
581595
.await
582596
}
583597

598+
async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
599+
let mut inner = self.inner.lock().await;
600+
while let Some(worker_id) = inner
601+
.workers
602+
.peek_lru()
603+
.map(|(worker_id, _worker)| worker_id.clone())
604+
{
605+
if let Err(err) = inner
606+
.immediate_evict_worker(
607+
&worker_id,
608+
make_err!(Code::Internal, "Scheduler shutdown"),
609+
true,
610+
)
611+
.await
612+
{
613+
error!(?err, "Error evicting worker on shutdown.");
614+
}
615+
}
616+
drop(shutdown_guard);
617+
}
618+
584619
async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
585620
let mut inner = self.inner.lock().await;
586621

@@ -609,6 +644,7 @@ impl WorkerScheduler for ApiWorkerScheduler {
609644
Code::Internal,
610645
"Worker {worker_id} timed out, removing from pool"
611646
),
647+
false,
612648
)
613649
.await,
614650
);

nativelink-scheduler/src/awaited_action_db/awaited_action.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,17 @@ use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
3333
/// This number will always increment by one each time
3434
/// the action is updated.
3535
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
36-
struct AwaitedActionVersion(u64);
36+
struct AwaitedActionVersion(i64);
3737

3838
impl MetricsComponent for AwaitedActionVersion {
3939
fn publish(
4040
&self,
4141
_kind: MetricKind,
4242
_field_metadata: MetricFieldData,
4343
) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
44-
Ok(MetricPublishKnownKindData::Counter(self.0))
44+
Ok(MetricPublishKnownKindData::Counter(u64::from_ne_bytes(
45+
self.0.to_ne_bytes(),
46+
)))
4547
}
4648
}
4749

@@ -136,11 +138,11 @@ impl AwaitedAction {
136138
}
137139
}
138140

139-
pub(crate) const fn version(&self) -> u64 {
141+
pub(crate) const fn version(&self) -> i64 {
140142
self.version.0
141143
}
142144

143-
pub(crate) const fn set_version(&mut self, version: u64) {
145+
pub(crate) const fn set_version(&mut self, version: i64) {
144146
self.version = AwaitedActionVersion(version);
145147
}
146148

nativelink-scheduler/src/awaited_action_db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,5 +186,6 @@ pub trait AwaitedActionDb: Send + Sync + MetricsComponent + Unpin + 'static {
186186
&self,
187187
client_operation_id: OperationId,
188188
action_info: Arc<ActionInfo>,
189+
no_event_action_timeout: Duration,
189190
) -> impl Future<Output = Result<Self::Subscriber, Error>> + Send;
190191
}

nativelink-scheduler/src/memory_awaited_action_db.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use core::ops::{Bound, RangeBounds};
16+
use core::time::Duration;
1617
use std::collections::hash_map::Entry;
1718
use std::collections::{BTreeMap, BTreeSet, HashMap};
1819
use std::sync::Arc;
@@ -1080,6 +1081,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite
10801081
&self,
10811082
client_operation_id: OperationId,
10821083
action_info: Arc<ActionInfo>,
1084+
_no_event_action_timeout: Duration,
10831085
) -> Result<Self::Subscriber, Error> {
10841086
let subscriber = self
10851087
.inner

nativelink-scheduler/src/simple_scheduler.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use nativelink_util::operation_state_manager::{
2929
OperationFilter, OperationStageFlags, OrderDirection, UpdateOperationType,
3030
};
3131
use nativelink_util::origin_event::OriginMetadata;
32+
use nativelink_util::shutdown_guard::ShutdownGuard;
3233
use nativelink_util::spawn;
3334
use nativelink_util::task::JoinHandleDropGuard;
3435
use opentelemetry::KeyValue;
@@ -541,6 +542,10 @@ impl WorkerScheduler for SimpleScheduler {
541542
self.worker_scheduler.remove_worker(worker_id).await
542543
}
543544

545+
async fn shutdown(&self, shutdown_guard: ShutdownGuard) {
546+
self.worker_scheduler.shutdown(shutdown_guard).await;
547+
}
548+
544549
async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error> {
545550
self.worker_scheduler
546551
.remove_timedout_workers(now_timestamp)

nativelink-scheduler/src/simple_scheduler_state_manager.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use core::ops::Bound;
1616
use core::time::Duration;
1717
use std::string::ToString;
1818
use std::sync::{Arc, Weak};
19-
use std::time::SystemTime;
2019

2120
use async_lock::Mutex;
2221
use async_trait::async_trait;
@@ -439,19 +438,16 @@ where
439438
return Ok(());
440439
}
441440

442-
let last_worker_updated = awaited_action
441+
let worker_should_update_before = awaited_action
443442
.last_worker_updated_timestamp()
444-
.duration_since(SystemTime::UNIX_EPOCH)
445-
.map_err(|e| {
443+
.checked_add(self.no_event_action_timeout)
444+
.ok_or_else(|| {
446445
make_err!(
447446
Code::Internal,
448-
"Failed to convert last_worker_updated to duration since epoch {e:?}"
447+
"Timestamp overflow for operation {operation_id} in SimpleSchedulerStateManager::timeout_operation_id"
449448
)
450449
})?;
451-
let worker_should_update_before = last_worker_updated
452-
.checked_add(self.no_event_action_timeout)
453-
.err_tip(|| "Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id")?;
454-
if worker_should_update_before < (self.now_fn)().elapsed() {
450+
if worker_should_update_before >= (self.now_fn)().now() {
455451
// The action was updated recently, we should not timeout the action.
456452
// This is to prevent timing out actions that have recently been updated
457453
// (like multiple clients timeout the same action at the same time).
@@ -566,6 +562,7 @@ where
566562
ActionStage::Queued
567563
}
568564
}
565+
UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued,
569566
};
570567
let now = (self.now_fn)().now();
571568
if matches!(stage, ActionStage::Queued) {
@@ -619,7 +616,11 @@ where
619616
action_info: Arc<ActionInfo>,
620617
) -> Result<T::Subscriber, Error> {
621618
self.action_db
622-
.add_action(new_client_operation_id, action_info)
619+
.add_action(
620+
new_client_operation_id,
621+
action_info,
622+
self.no_event_action_timeout,
623+
)
623624
.await
624625
.err_tip(|| "In SimpleSchedulerStateManager::add_operation")
625626
}

0 commit comments

Comments
 (0)