Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 799f998

Browse files
committedSep 11, 2024·
chore: use parking_lot for local metadata locks
1 parent babdebf commit 799f998

File tree

4 files changed

+60
-43
lines changed

4 files changed

+60
-43
lines changed
 

‎Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ mimalloc = "0.1.39"
115115
mime = "0.3"
116116
nix = { version = "0.29.0", default-features = false }
117117
once_cell = "1.7.2"
118+
parking_lot = { version = "0.12.3", default-features = false }
118119
pin-project = "1.1.0"
119120
portpicker = "0.1.1"
120121
proc-macro2 = "1.0"

‎crates/fluvio-stream-dispatcher/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ once_cell = { workspace = true }
2626
serde = { workspace = true, features = ['derive'] }
2727
serde_json = { workspace = true, optional = true }
2828
serde_yaml = { workspace = true, optional = true }
29+
parking_lot = { workspace = true, features = ["send_guard"] }
2930
tokio = { workspace = true, features = ["macros"] }
3031
tracing = { workspace = true }
3132
tempfile = { workspace = true }

‎crates/fluvio-stream-dispatcher/src/metadata/local.rs

+57-43
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ cfg_if::cfg_if! {
102102

103103
use anyhow::{Result, anyhow, Context};
104104
use async_channel::{Sender, Receiver, bounded};
105-
use async_lock::{RwLock, RwLockUpgradableReadGuard};
105+
use parking_lot::RwLock;
106106
use futures_util::{stream::BoxStream, StreamExt};
107107
use serde::{de::DeserializeOwned};
108108
use tracing::{warn, debug, trace};
@@ -133,7 +133,7 @@ cfg_if::cfg_if! {
133133
where
134134
S: K8ExtendedSpec,
135135
{
136-
let store = self.get_store::<S>().await?;
136+
let store = self.get_store::<S>()?;
137137
store.retrieve_items().await
138138
}
139139

@@ -142,7 +142,7 @@ cfg_if::cfg_if! {
142142
S: K8ExtendedSpec,
143143
{
144144
trace!(?metadata, "delete item");
145-
let store = self.get_store::<S>().await?;
145+
let store = self.get_store::<S>()?;
146146
if let Some(item) = store.try_retrieve_item::<S>(&metadata).await? {
147147
if let Some(owner) = item.ctx().item().owner() {
148148
self.unlink_parent::<S>(owner, item.ctx().item()).await?;
@@ -166,7 +166,7 @@ cfg_if::cfg_if! {
166166
<S as Spec>::Owner: K8ExtendedSpec,
167167
{
168168
trace!(?value, "apply");
169-
let store = self.get_store::<S>().await?;
169+
let store = self.get_store::<S>()?;
170170
value.ctx_mut().item_mut().id = value.key().to_string();
171171
if let Some(owner) = value.ctx().item().owner() {
172172
self.link_parent::<S>(owner, value.ctx().item()).await?;
@@ -181,7 +181,7 @@ cfg_if::cfg_if! {
181181
use std::str::FromStr;
182182

183183
trace!(?metadata, ?spec, "update spec");
184-
let store = self.get_store::<S>().await?;
184+
let store = self.get_store::<S>()?;
185185
let item = match store.try_retrieve_item::<S>(&metadata).await? {
186186
Some(mut item) => {
187187
item.ctx_mut().set_item(metadata);
@@ -214,7 +214,7 @@ cfg_if::cfg_if! {
214214
id: key.to_string(),
215215
..Default::default()
216216
};
217-
let store = self.get_store::<S>().await?;
217+
let store = self.get_store::<S>()?;
218218
let item = match store.try_retrieve_item::<S>(&metadata).await? {
219219
Some(mut item) => {
220220
item.set_spec(spec);
@@ -235,7 +235,7 @@ cfg_if::cfg_if! {
235235
S: K8ExtendedSpec,
236236
{
237237
trace!(?metadata, ?status, "update status");
238-
let store = self.get_store::<S>().await?;
238+
let store = self.get_store::<S>()?;
239239
let mut item = store.retrieve_item::<S>(&metadata).await?;
240240
item.ctx_mut().set_item(metadata.clone());
241241
item.set_status(status);
@@ -252,12 +252,11 @@ cfg_if::cfg_if! {
252252
S: K8ExtendedSpec,
253253
{
254254
trace!(label = S::LABEL, ?resource_version, "watch stream");
255-
futures_util::stream::once(self.get_store::<S>())
256-
.flat_map(move |store| match store {
257-
Ok(store) => store.watch_stream_since(resource_version.as_ref()),
258-
Err(err) => futures_util::stream::once(async { Result::<_>::Err(err) }).boxed(),
259-
})
260-
.boxed()
255+
let store = self.get_store::<S>();
256+
match store {
257+
Ok(store) => store.watch_stream_since(resource_version.as_ref()),
258+
Err(err) => futures_util::stream::once(async { Result::<_>::Err(err) }).boxed(),
259+
}
261260
}
262261

263262
async fn patch_status<S>(
@@ -270,7 +269,7 @@ cfg_if::cfg_if! {
270269
S: K8ExtendedSpec,
271270
{
272271
trace!(?metadata, ?status, "patch status");
273-
let store = self.get_store::<S>().await?;
272+
let store = self.get_store::<S>()?;
274273
let mut item = store.retrieve_item::<S>(&metadata).await?;
275274
item.ctx_mut().set_item(metadata.clone());
276275
item.set_status(status);
@@ -309,15 +308,17 @@ cfg_if::cfg_if! {
309308
Self { path, stores }
310309
}
311310

312-
async fn get_store<S: Spec + DeserializeOwned>(&self) -> Result<Arc<SpecStore>> {
311+
fn get_store<S: Spec + DeserializeOwned>(&self) -> Result<Arc<SpecStore>> {
313312
let key = S::LABEL;
314-
let read = self.stores.upgradable_read().await;
313+
let read = self.stores.read();
315314
Ok(match read.get(key) {
316315
Some(store) => store.clone(),
317316
None => {
318-
let mut write = RwLockUpgradableReadGuard::upgrade(read).await;
319-
let store = Arc::new(SpecStore::load::<S, _>(self.path.join(key)).await?);
317+
drop(read);
318+
let mut write = self.stores.write();
319+
let store = Arc::new(SpecStore::load::<S, _>(self.path.join(key))?);
320320
write.insert(key, store.clone());
321+
drop(write);
321322
store
322323
}
323324
})
@@ -342,7 +343,7 @@ cfg_if::cfg_if! {
342343
child: &LocalMetadataItem,
343344
) -> Result<()> {
344345
trace!(?parent, ?child, "link parent");
345-
let parent_store = self.get_store::<S::Owner>().await?;
346+
let parent_store = self.get_store::<S::Owner>()?;
346347
parent_store
347348
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
348349
parent_obj
@@ -360,7 +361,7 @@ cfg_if::cfg_if! {
360361
child: &LocalMetadataItem,
361362
) -> Result<()> {
362363
trace!(?parent, ?child, "link parent");
363-
let parent_store = self.get_store::<S::Owner>().await?;
364+
let parent_store = self.get_store::<S::Owner>()?;
364365
parent_store
365366
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
366367
parent_obj
@@ -375,15 +376,14 @@ cfg_if::cfg_if! {
375376
async fn get_store_by_key(&self, key: &str) -> Result<Arc<SpecStore>> {
376377
self.stores
377378
.read()
378-
.await
379379
.get(key)
380380
.cloned()
381381
.ok_or_else(|| anyhow!("store not found for key {key}"))
382382
}
383383
}
384384

385385
impl SpecStore {
386-
async fn load<S: Spec, P: AsRef<Path>>(path: P) -> Result<Self> {
386+
fn load<S: Spec, P: AsRef<Path>>(path: P) -> Result<Self> {
387387
std::fs::create_dir_all(&path)?;
388388
let version = Default::default();
389389
let mut data: HashMap<String, SpecPointer> = Default::default();
@@ -423,7 +423,7 @@ cfg_if::cfg_if! {
423423
.version
424424
.load(std::sync::atomic::Ordering::SeqCst)
425425
.to_string();
426-
let read = self.data.read().await;
426+
let read = self.data.read();
427427
let items: Vec<LocalStoreObject<S>> = read
428428
.values()
429429
.map(SpecPointer::downcast)
@@ -439,7 +439,7 @@ cfg_if::cfg_if! {
439439
where
440440
S: Spec,
441441
{
442-
let read = self.data.read().await;
442+
let read = self.data.read();
443443
read.get(metadata.uid())
444444
.map(SpecPointer::downcast)
445445
.transpose()
@@ -455,10 +455,18 @@ cfg_if::cfg_if! {
455455
}
456456

457457
async fn delete_item(&self, metadata: &LocalMetadataItem) {
458-
let mut write = self.data.write().await;
459-
if let Some(removed) = write.remove(metadata.uid()) {
460-
removed.delete();
461-
drop(write);
458+
let removed = {
459+
let mut write = self.data.write();
460+
if let Some(removed) = write.remove(metadata.uid()) {
461+
removed.delete();
462+
drop(write);
463+
Some(removed)
464+
} else {
465+
None
466+
}
467+
};
468+
469+
if let Some(removed) = removed {
462470
self.send_update(SpecUpdate::Delete(removed)).await;
463471
}
464472
}
@@ -468,20 +476,24 @@ cfg_if::cfg_if! {
468476
S: Spec + Serialize,
469477
{
470478
let id = value.ctx().item().uid().to_owned();
471-
let mut write = self.data.write().await;
472-
if let Some(prev) = write.get(&id) {
473-
let prev_meta = prev.downcast_ref::<S>()?.ctx().item();
474-
let prev_rev = prev_meta.revision;
475-
if prev_meta.is_newer(value.ctx().item()) {
476-
let new_rev = value.ctx().item().revision;
477-
anyhow::bail!("attempt to update by stale value: current version: {prev_rev}, proposed: {new_rev}");
478-
}
479-
value.ctx_mut().item_mut().revision = prev_rev + 1;
479+
let pointer =
480+
{
481+
let mut write = self.data.write();
482+
if let Some(prev) = write.get(&id) {
483+
let prev_meta = prev.downcast_ref::<S>()?.ctx().item();
484+
let prev_rev = prev_meta.revision;
485+
if prev_meta.is_newer(value.ctx().item()) {
486+
let new_rev = value.ctx().item().revision;
487+
anyhow::bail!("attempt to update by stale value: current version: {prev_rev}, proposed: {new_rev}");
488+
}
489+
value.ctx_mut().item_mut().revision = prev_rev + 1;
490+
};
491+
let pointer = SpecPointer::new(self.spec_file_name(&id), value);
492+
write.insert(id, pointer.clone());
493+
pointer.flush::<S>()?;
494+
drop(write);
495+
pointer
480496
};
481-
let pointer = SpecPointer::new(self.spec_file_name(&id), value);
482-
write.insert(id, pointer.clone());
483-
pointer.flush::<S>()?;
484-
drop(write);
485497
self.send_update(SpecUpdate::Mod(pointer)).await;
486498
Ok(())
487499
}
@@ -522,7 +534,7 @@ cfg_if::cfg_if! {
522534
where
523535
F: Fn(&mut LocalStoreObject<S>),
524536
{
525-
if let Some(spec) = self.data.write().await.get_mut(key) {
537+
if let Some(spec) = self.data.write().get_mut(key) {
526538
let mut obj = spec.downcast::<S>()?;
527539
func(&mut obj);
528540
spec.set(obj);
@@ -585,7 +597,9 @@ cfg_if::cfg_if! {
585597

586598
fn flush<S: Spec>(&self) -> Result<()> {
587599
let storage: VersionedSpecStorage<S> = self.try_into()?;
588-
serde_yaml::to_writer(std::fs::File::create(&self.path)?, &storage)?;
600+
let file = std::fs::File::create(&self.path)?;
601+
serde_yaml::to_writer(&file, &storage)?;
602+
file.sync_all()?;
589603
Ok(())
590604
}
591605

0 commit comments

Comments
 (0)
Please sign in to comment.