Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ pub struct OntapS3ExistenceCacheSpec {
pub backend: Box<ExperimentalOntapS3Spec>,
}

#[derive(Serialize, Deserialize, Default, Debug, Clone, Copy, PartialEq)]
#[derive(Serialize, Deserialize, Default, Debug, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum StoreDirection {
/// The store operates normally and all get and put operations are
Expand Down
22 changes: 10 additions & 12 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,17 @@ impl SimpleScheduler {
.await;
for action_state in &actions {
let name = action_state.stage.name();
match oldest_actions_in_state.get_mut(&name) {
Some(values) => {
values.insert(action_state.clone());
if values.len() > max_items {
values.pop_first();
}
}
None => {
let mut values = BTreeSet::new();
values.insert(action_state.clone());
oldest_actions_in_state
.insert(name, values);
if let Some(values) =
oldest_actions_in_state.get_mut(&name)
{
values.insert(action_state.clone());
if values.len() > max_items {
values.pop_first();
}
} else {
let mut values = BTreeSet::new();
values.insert(action_state.clone());
oldest_actions_in_state.insert(name, values);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use nativelink_util::shutdown_guard::ShutdownGuard;
use crate::platform_property_manager::PlatformPropertyManager;
use crate::worker::{Worker, WorkerTimestamp};

/// WorkerScheduler interface is responsible for interactions between the scheduler
/// `WorkerScheduler` interface is responsible for interactions between the scheduler
/// and worker related operations.
#[async_trait]
pub trait WorkerScheduler: Sync + Send + Unpin + RootMetricsComponent + 'static {
Expand Down
36 changes: 15 additions & 21 deletions nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use fred::bytes_utils::string::Str;
use fred::clients::SubscriberClient;
use fred::error::{Error as RedisError, ErrorKind as RedisErrorKind};
use fred::mocks::{MockCommand, Mocks};
use fred::prelude::{Builder, Pool as RedisPool};
use fred::prelude::Builder;
use fred::types::Value as RedisValue;
use fred::types::config::{Config as RedisConfig, PerformanceConfig};
use fred::types::config::Config as RedisConfig;
use futures::StreamExt;
use mock_instant::global::SystemTime as MockSystemTime;
use nativelink_config::schedulers::SimpleSpec;
Expand All @@ -46,7 +46,7 @@ use nativelink_scheduler::simple_scheduler::SimpleScheduler;
use nativelink_scheduler::store_awaited_action_db::StoreAwaitedActionDb;
use nativelink_scheduler::worker::Worker;
use nativelink_scheduler::worker_scheduler::WorkerScheduler;
use nativelink_store::redis_store::{RedisStore, RedisSubscriptionManager};
use nativelink_store::redis_store::{RecoverablePool, RedisStore, RedisSubscriptionManager};
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, OperationId, WorkerId,
};
Expand Down Expand Up @@ -393,7 +393,7 @@ fn make_redis_store(sub_channel: &str, mocks: Arc<impl Mocks>) -> Arc<RedisStore
mocks: Some(mocks),
..Default::default()
});
let (client_pool, subscriber_client) = make_clients(builder);
let (client_pool, subscriber_client) = make_clients(&builder);
Arc::new(
RedisStore::new_from_builder_and_parts(
client_pool,
Expand All @@ -410,15 +410,9 @@ fn make_redis_store(sub_channel: &str, mocks: Arc<impl Mocks>) -> Arc<RedisStore
)
}

fn make_clients(mut builder: Builder) -> (RedisPool, SubscriberClient) {
fn make_clients(builder: &Builder) -> (RecoverablePool, SubscriberClient) {
const CONNECTION_POOL_SIZE: usize = 1;
let client_pool = builder
.set_performance_config(PerformanceConfig {
broadcast_channel_capacity: 4096,
..Default::default()
})
.build_pool(CONNECTION_POOL_SIZE)
.unwrap();
let client_pool = RecoverablePool::new(builder.clone(), CONNECTION_POOL_SIZE).unwrap();

let subscriber_client = builder.build_subscriber_client().unwrap();
(client_pool, subscriber_client)
Expand Down Expand Up @@ -521,23 +515,23 @@ async fn add_action_smoke_test() -> Result<(), Error> {
mocks
.expect(
MockCommand {
cmd: Str::from_static("FT.AGGREGATE"),
cmd: Str::from_static("SUBSCRIBE"),
subcommand: None,
args: ft_aggregate_args.clone(),
args: vec![SUB_CHANNEL.as_bytes().into()],
},
Err(RedisError::new(
RedisErrorKind::NotFound,
String::new(),
)),
Ok(RedisValue::Integer(0)),
None,
)
.expect(
MockCommand {
cmd: Str::from_static("SUBSCRIBE"),
cmd: Str::from_static("FT.AGGREGATE"),
subcommand: None,
args: vec![SUB_CHANNEL.as_bytes().into()],
args: ft_aggregate_args.clone(),
},
Ok(RedisValue::Integer(0)),
Err(RedisError::new(
RedisErrorKind::NotFound,
String::new(),
)),
None,
)
.expect(
Expand Down
2 changes: 1 addition & 1 deletion nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ impl StoreDriver for FastSlowStore {
Ok(())
}

/// FastSlowStore has optimizations for dealing with files.
/// `FastSlowStore` has optimizations for dealing with files.
fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
optimization == StoreOptimizations::FileUpdates
}
Expand Down
4 changes: 3 additions & 1 deletion nativelink-store/src/noop_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ impl StoreDriver for NoopStore {
_keys: &[StoreKey<'_>],
results: &mut [Option<u64>],
) -> Result<(), Error> {
results.iter_mut().for_each(|r| *r = None);
for result in results.iter_mut() {
*result = None;
}
Ok(())
}

Expand Down
Loading
Loading