Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: concurrency problems at local metadata store #4175

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 101 additions & 50 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -945,56 +945,107 @@ jobs:
path: diagnostics*.gz
retention-days: 1

# FIXME: This is disabled until we fix the problem with the test when creating a new topic
# local_upgrade_test:
# name: Upgrade local cluster test on (${{ matrix.run }})
# needs: build_primary_binaries
# runs-on: ${{ matrix.os }}
# strategy:
# # fail-fast: false
# matrix:
# os: [ubuntu-latest]
# rust-target: [x86_64-unknown-linux-musl]
# run: [r1]
# steps:
# - uses: actions/checkout@v4
# # Download artifacts
# - name: Download artifact - fluvio
# uses: actions/download-artifact@v4
# with:
# name: fluvio-${{ matrix.rust-target }}
# path: ~/bin
# - name: Download artifact - fluvio-run
# uses: actions/download-artifact@v4
# with:
# name: fluvio-run-${{ matrix.rust-target }}
# path: ~/bin
# - name: Mark executable
# run: |
# chmod +x ~/bin/fluvio-run
# chmod +x ~/bin/fluvio && ~/bin/fluvio version
# echo "${HOME}/bin" >> $GITHUB_PATH
#
# - name: Run upgrade test with CI artifacts
# timeout-minutes: 10
# env:
# TEST_DATA_BYTES: 10000
# run: |
# date
# make FLUVIO_MODE=local FLUVIO_BIN=~/bin/fluvio upgrade-test
#
# - name: Run diagnostics
# if: ${{ !success() }}
# timeout-minutes: 5
# run: fluvio cluster diagnostics
# - name: Upload logs
# timeout-minutes: 5
# if: ${{ !success() }}
# uses: actions/upload-artifact@v4
# with:
# name: local_upgrade_${{ matrix.run }}_log
# path: diagnostics*.gz
# retention-days: 1
local_upgrade_test:
name: Upgrade local cluster test on (${{ matrix.run }})
needs: build_primary_binaries
runs-on: ${{ matrix.os }}
strategy:
# fail-fast: false
matrix:
os: [ubuntu-latest]
rust-target: [x86_64-unknown-linux-musl]
run: [r1]
steps:
- uses: actions/checkout@v4
# Download artifacts
- name: Download artifact - fluvio
uses: actions/download-artifact@v4
with:
name: fluvio-${{ matrix.rust-target }}
path: ~/bin
- name: Download artifact - fluvio-run
uses: actions/download-artifact@v4
with:
name: fluvio-run-${{ matrix.rust-target }}
path: ~/bin
- name: Mark executable
run: |
chmod +x ~/bin/fluvio-run
chmod +x ~/bin/fluvio && ~/bin/fluvio version
echo "${HOME}/bin" >> $GITHUB_PATH

- name: Run upgrade test with CI artifacts
timeout-minutes: 10
env:
TEST_DATA_BYTES: 10000
run: |
date
make FLUVIO_MODE=local FLUVIO_BIN=~/bin/fluvio upgrade-test

- name: Run diagnostics
if: ${{ !success() }}
timeout-minutes: 5
run: fluvio cluster diagnostics
- name: Upload logs
timeout-minutes: 5
if: ${{ !success() }}
uses: actions/upload-artifact@v4
with:
name: local_upgrade_${{ matrix.run }}_log
path: diagnostics*.gz
retention-days: 1

local_resume_test:
name: Resume local cluster test on (${{ matrix.run }})
needs: build_primary_binaries
runs-on: ${{ matrix.os }}
strategy:
# fail-fast: false
matrix:
os: [ubuntu-latest]
rust-target: [x86_64-unknown-linux-musl]
run: [r1]
steps:
- uses: actions/checkout@v4
# Download artifacts
- name: Download artifact - fluvio
uses: actions/download-artifact@v4
with:
name: fluvio-${{ matrix.rust-target }}
path: ~/bin
- name: Download artifact - fluvio-run
uses: actions/download-artifact@v4
with:
name: fluvio-run-${{ matrix.rust-target }}
path: ~/bin
- name: Mark executable
run: |
chmod +x ~/bin/fluvio-run
chmod +x ~/bin/fluvio && ~/bin/fluvio version
echo "${HOME}/bin" >> $GITHUB_PATH
- name: Install Parallel
run: sudo apt-get install -y parallel

- name: Run resume test with CI artifacts
timeout-minutes: 10
env:
TEST_DATA_BYTES: 10000
run: |
date
make FLUVIO_BIN=~/bin/fluvio resume-test

- name: Run diagnostics
if: ${{ !success() }}
timeout-minutes: 5
run: fluvio cluster diagnostics
- name: Upload logs
timeout-minutes: 5
if: ${{ !success() }}
uses: actions/upload-artifact@v4
with:
name: local_upgrade_${{ matrix.run }}_log
path: diagnostics*.gz
retention-days: 1

# Smoke test across different version of fluvio
cli_smoke:
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -115,6 +115,7 @@ mimalloc = "0.1.39"
mime = "0.3"
nix = { version = "0.29.0", default-features = false }
once_cell = "1.7.2"
parking_lot = { version = "0.12.3", default-features = false }
pin-project = "1.1.0"
portpicker = "0.1.1"
proc-macro2 = "1.0"
3 changes: 2 additions & 1 deletion crates/fluvio-stream-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ name = "fluvio_stream_dispatcher"
path = "src/lib.rs"

[features]
local = ["fluvio-stream-model/use_serde", "fluvio-stream-model/k8", "serde_yaml"]
local = ["fluvio-stream-model/use_serde", "fluvio-stream-model/k8", "serde_yaml", "parking_lot"]
k8 = ["fluvio-stream-model/k8", "k8-client", "serde_json"]

[dependencies]
@@ -26,6 +26,7 @@ once_cell = { workspace = true }
serde = { workspace = true, features = ['derive'] }
serde_json = { workspace = true, optional = true }
serde_yaml = { workspace = true, optional = true }
parking_lot = { workspace = true, features = ["send_guard"], optional = true }
tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true }
tempfile = { workspace = true }
96 changes: 54 additions & 42 deletions crates/fluvio-stream-dispatcher/src/metadata/local.rs
Original file line number Diff line number Diff line change
@@ -102,7 +102,7 @@ cfg_if::cfg_if! {

use anyhow::{Result, anyhow, Context};
use async_channel::{Sender, Receiver, bounded};
use async_lock::{RwLock, RwLockUpgradableReadGuard};
use parking_lot::RwLock;
use futures_util::{stream::BoxStream, StreamExt};
use serde::{de::DeserializeOwned};
use tracing::{warn, debug, trace};
@@ -133,7 +133,7 @@ cfg_if::cfg_if! {
where
S: K8ExtendedSpec,
{
let store = self.get_store::<S>().await?;
let store = self.get_store::<S>()?;
store.retrieve_items().await
}

@@ -142,7 +142,7 @@ cfg_if::cfg_if! {
S: K8ExtendedSpec,
{
trace!(?metadata, "delete item");
let store = self.get_store::<S>().await?;
let store = self.get_store::<S>()?;
if let Some(item) = store.try_retrieve_item::<S>(&metadata).await? {
if let Some(owner) = item.ctx().item().owner() {
self.unlink_parent::<S>(owner, item.ctx().item()).await?;
@@ -166,7 +166,7 @@ cfg_if::cfg_if! {
<S as Spec>::Owner: K8ExtendedSpec,
{
trace!(?value, "apply");
let store = self.get_store::<S>().await?;
let store = self.get_store::<S>()?;
value.ctx_mut().item_mut().id = value.key().to_string();
if let Some(owner) = value.ctx().item().owner() {
self.link_parent::<S>(owner, value.ctx().item()).await?;
@@ -181,7 +181,7 @@ cfg_if::cfg_if! {
use std::str::FromStr;

trace!(?metadata, ?spec, "update spec");
let store = self.get_store::<S>().await?;
let store = self.get_store::<S>()?;
let item = match store.try_retrieve_item::<S>(&metadata).await? {
Some(mut item) => {
item.ctx_mut().set_item(metadata);
@@ -214,7 +214,7 @@ cfg_if::cfg_if! {
id: key.to_string(),
..Default::default()
};
let store = self.get_store::<S>().await?;
let store = self.get_store::<S>()?;
let item = match store.try_retrieve_item::<S>(&metadata).await? {
Some(mut item) => {
item.set_spec(spec);
@@ -235,7 +235,7 @@ cfg_if::cfg_if! {
S: K8ExtendedSpec,
{
trace!(?metadata, ?status, "update status");
let store = self.get_store::<S>().await?;
let store = self.get_store::<S>()?;
let mut item = store.retrieve_item::<S>(&metadata).await?;
item.ctx_mut().set_item(metadata.clone());
item.set_status(status);
@@ -252,12 +252,11 @@ cfg_if::cfg_if! {
S: K8ExtendedSpec,
{
trace!(label = S::LABEL, ?resource_version, "watch stream");
futures_util::stream::once(self.get_store::<S>())
.flat_map(move |store| match store {
Ok(store) => store.watch_stream_since(resource_version.as_ref()),
Err(err) => futures_util::stream::once(async { Result::<_>::Err(err) }).boxed(),
})
.boxed()
let store = self.get_store::<S>();
match store {
Ok(store) => store.watch_stream_since(resource_version.as_ref()),
Err(err) => futures_util::stream::once(async { Result::<_>::Err(err) }).boxed(),
}
}

async fn patch_status<S>(
@@ -270,7 +269,7 @@ cfg_if::cfg_if! {
S: K8ExtendedSpec,
{
trace!(?metadata, ?status, "patch status");
let store = self.get_store::<S>().await?;
let store = self.get_store::<S>()?;
let mut item = store.retrieve_item::<S>(&metadata).await?;
item.ctx_mut().set_item(metadata.clone());
item.set_status(status);
@@ -309,15 +308,17 @@ cfg_if::cfg_if! {
Self { path, stores }
}

async fn get_store<S: Spec + DeserializeOwned>(&self) -> Result<Arc<SpecStore>> {
fn get_store<S: Spec + DeserializeOwned>(&self) -> Result<Arc<SpecStore>> {
let key = S::LABEL;
let read = self.stores.upgradable_read().await;
let read = self.stores.read();
Ok(match read.get(key) {
Some(store) => store.clone(),
None => {
let mut write = RwLockUpgradableReadGuard::upgrade(read).await;
let store = Arc::new(SpecStore::load::<S, _>(self.path.join(key)).await?);
drop(read);
let mut write = self.stores.write();
let store = Arc::new(SpecStore::load::<S, _>(self.path.join(key))?);
write.insert(key, store.clone());
drop(write);
store
}
})
@@ -342,7 +343,7 @@ cfg_if::cfg_if! {
child: &LocalMetadataItem,
) -> Result<()> {
trace!(?parent, ?child, "link parent");
let parent_store = self.get_store::<S::Owner>().await?;
let parent_store = self.get_store::<S::Owner>()?;
parent_store
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
parent_obj
@@ -360,7 +361,7 @@ cfg_if::cfg_if! {
child: &LocalMetadataItem,
) -> Result<()> {
trace!(?parent, ?child, "link parent");
let parent_store = self.get_store::<S::Owner>().await?;
let parent_store = self.get_store::<S::Owner>()?;
parent_store
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
parent_obj
@@ -375,15 +376,14 @@ cfg_if::cfg_if! {
async fn get_store_by_key(&self, key: &str) -> Result<Arc<SpecStore>> {
self.stores
.read()
.await
.get(key)
.cloned()
.ok_or_else(|| anyhow!("store not found for key {key}"))
}
}

impl SpecStore {
async fn load<S: Spec, P: AsRef<Path>>(path: P) -> Result<Self> {
fn load<S: Spec, P: AsRef<Path>>(path: P) -> Result<Self> {
std::fs::create_dir_all(&path)?;
let version = Default::default();
let mut data: HashMap<String, SpecPointer> = Default::default();
@@ -423,7 +423,7 @@ cfg_if::cfg_if! {
.version
.load(std::sync::atomic::Ordering::SeqCst)
.to_string();
let read = self.data.read().await;
let read = self.data.read();
let items: Vec<LocalStoreObject<S>> = read
.values()
.map(SpecPointer::downcast)
@@ -439,7 +439,7 @@ cfg_if::cfg_if! {
where
S: Spec,
{
let read = self.data.read().await;
let read = self.data.read();
read.get(metadata.uid())
.map(SpecPointer::downcast)
.transpose()
@@ -455,10 +455,18 @@ cfg_if::cfg_if! {
}

async fn delete_item(&self, metadata: &LocalMetadataItem) {
let mut write = self.data.write().await;
if let Some(removed) = write.remove(metadata.uid()) {
removed.delete();
drop(write);
let removed = {
let mut write = self.data.write();
if let Some(removed) = write.remove(metadata.uid()) {
removed.delete();
drop(write);
Some(removed)
} else {
None
}
};

if let Some(removed) = removed {
self.send_update(SpecUpdate::Delete(removed)).await;
}
}
@@ -468,20 +476,24 @@ cfg_if::cfg_if! {
S: Spec + Serialize,
{
let id = value.ctx().item().uid().to_owned();
let mut write = self.data.write().await;
if let Some(prev) = write.get(&id) {
let prev_meta = prev.downcast_ref::<S>()?.ctx().item();
let prev_rev = prev_meta.revision;
if prev_meta.is_newer(value.ctx().item()) {
let new_rev = value.ctx().item().revision;
anyhow::bail!("attempt to update by stale value: current version: {prev_rev}, proposed: {new_rev}");
}
value.ctx_mut().item_mut().revision = prev_rev + 1;
let pointer =
{
let mut write = self.data.write();
if let Some(prev) = write.get(&id) {
let prev_meta = prev.downcast_ref::<S>()?.ctx().item();
let prev_rev = prev_meta.revision;
if prev_meta.is_newer(value.ctx().item()) {
let new_rev = value.ctx().item().revision;
anyhow::bail!("attempt to update by stale value: current version: {prev_rev}, proposed: {new_rev}");
}
value.ctx_mut().item_mut().revision = prev_rev + 1;
};
let pointer = SpecPointer::new(self.spec_file_name(&id), value);
write.insert(id, pointer.clone());
pointer.flush::<S>()?;
drop(write);
pointer
};
let pointer = SpecPointer::new(self.spec_file_name(&id), value);
write.insert(id, pointer.clone());
pointer.flush::<S>()?;
drop(write);
self.send_update(SpecUpdate::Mod(pointer)).await;
Ok(())
}
@@ -522,7 +534,7 @@ cfg_if::cfg_if! {
where
F: Fn(&mut LocalStoreObject<S>),
{
if let Some(spec) = self.data.write().await.get_mut(key) {
if let Some(spec) = self.data.write().get_mut(key) {
let mut obj = spec.downcast::<S>()?;
func(&mut obj);
spec.set(obj);
3 changes: 3 additions & 0 deletions makefiles/test.mk
Original file line number Diff line number Diff line change
@@ -181,6 +181,9 @@ ifeq (${CI},true)
# In CI, we expect all artifacts to already be built and loaded for the script
upgrade-test:
./tests/upgrade-test.sh
else ifeq (${FLUVIO_MODE},local)
upgrade-test: build-cli
./tests/upgrade-test.sh
else
# When not in CI (i.e. development), load the dev k8 image before running test
upgrade-test: build-cli build_k8_image
4 changes: 0 additions & 4 deletions tests/upgrade-test.sh
Original file line number Diff line number Diff line change
@@ -52,11 +52,7 @@ function cleanup() {
# If we're in CI, we want to slow down execution
# to give CPU some time to rest, so we don't time out
function ci_check() {
if [[ "$FLUVIO_MODE" == "local" ]]; then
sleep $CI_SLEEP
else
:
fi
}

# This function is intended to be run second after the Stable-1 validation