diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9528a583df..2769135812 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -945,56 +945,107 @@ jobs: path: diagnostics*.gz retention-days: 1 - # FIXME: This is disabled until we fix the problem with the test when creating a new topic - # local_upgrade_test: - # name: Upgrade local cluster test on (${{ matrix.run }}) - # needs: build_primary_binaries - # runs-on: ${{ matrix.os }} - # strategy: - # # fail-fast: false - # matrix: - # os: [ubuntu-latest] - # rust-target: [x86_64-unknown-linux-musl] - # run: [r1] - # steps: - # - uses: actions/checkout@v4 - # # Download artifacts - # - name: Download artifact - fluvio - # uses: actions/download-artifact@v4 - # with: - # name: fluvio-${{ matrix.rust-target }} - # path: ~/bin - # - name: Download artifact - fluvio-run - # uses: actions/download-artifact@v4 - # with: - # name: fluvio-run-${{ matrix.rust-target }} - # path: ~/bin - # - name: Mark executable - # run: | - # chmod +x ~/bin/fluvio-run - # chmod +x ~/bin/fluvio && ~/bin/fluvio version - # echo "${HOME}/bin" >> $GITHUB_PATH - # - # - name: Run upgrade test with CI artifacts - # timeout-minutes: 10 - # env: - # TEST_DATA_BYTES: 10000 - # run: | - # date - # make FLUVIO_MODE=local FLUVIO_BIN=~/bin/fluvio upgrade-test - # - # - name: Run diagnostics - # if: ${{ !success() }} - # timeout-minutes: 5 - # run: fluvio cluster diagnostics - # - name: Upload logs - # timeout-minutes: 5 - # if: ${{ !success() }} - # uses: actions/upload-artifact@v4 - # with: - # name: local_upgrade_${{ matrix.run }}_log - # path: diagnostics*.gz - # retention-days: 1 + local_upgrade_test: + name: Upgrade local cluster test on (${{ matrix.run }}) + needs: build_primary_binaries + runs-on: ${{ matrix.os }} + strategy: + # fail-fast: false + matrix: + os: [ubuntu-latest] + rust-target: [x86_64-unknown-linux-musl] + run: [r1] + steps: + - uses: actions/checkout@v4 + # Download artifacts + - name: Download artifact - fluvio + uses: actions/download-artifact@v4 + with: + name: fluvio-${{ matrix.rust-target }} + path: ~/bin + - name: Download artifact - fluvio-run + uses: actions/download-artifact@v4 + with: + name: fluvio-run-${{ matrix.rust-target }} + path: ~/bin + - name: Mark executable + run: | + chmod +x ~/bin/fluvio-run + chmod +x ~/bin/fluvio && ~/bin/fluvio version + echo "${HOME}/bin" >> $GITHUB_PATH + + - name: Run upgrade test with CI artifacts + timeout-minutes: 10 + env: + TEST_DATA_BYTES: 10000 + run: | + date + make FLUVIO_MODE=local FLUVIO_BIN=~/bin/fluvio upgrade-test + + - name: Run diagnostics + if: ${{ !success() }} + timeout-minutes: 5 + run: fluvio cluster diagnostics + - name: Upload logs + timeout-minutes: 5 + if: ${{ !success() }} + uses: actions/upload-artifact@v4 + with: + name: local_upgrade_${{ matrix.run }}_log + path: diagnostics*.gz + retention-days: 1 + + local_resume_test: + name: Resume local cluster test on (${{ matrix.run }}) + needs: build_primary_binaries + runs-on: ${{ matrix.os }} + strategy: + # fail-fast: false + matrix: + os: [ubuntu-latest] + rust-target: [x86_64-unknown-linux-musl] + run: [r1] + steps: + - uses: actions/checkout@v4 + # Download artifacts + - name: Download artifact - fluvio + uses: actions/download-artifact@v4 + with: + name: fluvio-${{ matrix.rust-target }} + path: ~/bin + - name: Download artifact - fluvio-run + uses: actions/download-artifact@v4 + with: + name: fluvio-run-${{ matrix.rust-target }} + path: ~/bin + - name: Mark executable + run: | + chmod +x ~/bin/fluvio-run + chmod +x ~/bin/fluvio && ~/bin/fluvio version + echo "${HOME}/bin" >> $GITHUB_PATH + - name: Install Parallel + run: sudo apt-get install -y parallel + + - name: Run resume test with CI artifacts + timeout-minutes: 10 + env: + TEST_DATA_BYTES: 10000 + run: | + date + make FLUVIO_BIN=~/bin/fluvio resume-test + + - name: Run diagnostics + if: ${{ !success() }} + timeout-minutes: 5 + run: fluvio cluster diagnostics + - name: Upload logs + timeout-minutes: 5 + if: ${{ !success() }} + uses: actions/upload-artifact@v4 + with: + name: local_upgrade_${{ matrix.run }}_log + path: diagnostics*.gz + retention-days: 1 # Smoke test across different version of fluvio cli_smoke: diff --git a/Cargo.lock b/Cargo.lock index 3586b17af6..235949fb61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3144,6 +3144,7 @@ dependencies = [ "futures-util", "k8-client", "once_cell", + "parking_lot 0.12.3", "serde", "serde_json", "serde_yaml 0.9.34+deprecated", diff --git a/Cargo.toml b/Cargo.toml index 6f4e9711bb..d0224f361f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,6 +115,7 @@ mimalloc = "0.1.39" mime = "0.3" nix = { version = "0.29.0", default-features = false } once_cell = "1.7.2" +parking_lot = { version = "0.12.3", default-features = false } pin-project = "1.1.0" portpicker = "0.1.1" proc-macro2 = "1.0" diff --git a/crates/fluvio-stream-dispatcher/Cargo.toml b/crates/fluvio-stream-dispatcher/Cargo.toml index f04a59dd33..19d2a817a6 100644 --- a/crates/fluvio-stream-dispatcher/Cargo.toml +++ b/crates/fluvio-stream-dispatcher/Cargo.toml @@ -12,7 +12,7 @@ name = "fluvio_stream_dispatcher" path = "src/lib.rs" [features] -local = ["fluvio-stream-model/use_serde", "fluvio-stream-model/k8", "serde_yaml"] +local = ["fluvio-stream-model/use_serde", "fluvio-stream-model/k8", "serde_yaml", "parking_lot"] k8 = ["fluvio-stream-model/k8", "k8-client", "serde_json"] [dependencies] @@ -26,6 +26,7 @@ once_cell = { workspace = true } serde = { workspace = true, features = ['derive'] } serde_json = { workspace = true, optional = true } serde_yaml = { workspace = true, optional = true } +parking_lot = { workspace = true, features = ["send_guard"], optional = true } tokio = { workspace = true, features = ["macros"] } tracing = { workspace = true } tempfile = { workspace = true } diff --git a/crates/fluvio-stream-dispatcher/src/metadata/local.rs b/crates/fluvio-stream-dispatcher/src/metadata/local.rs index 35c2891877..7a2b56230b 100644 --- a/crates/fluvio-stream-dispatcher/src/metadata/local.rs +++ b/crates/fluvio-stream-dispatcher/src/metadata/local.rs @@ -102,7 +102,7 @@ cfg_if::cfg_if! { use anyhow::{Result, anyhow, Context}; use async_channel::{Sender, Receiver, bounded}; - use async_lock::{RwLock, RwLockUpgradableReadGuard}; + use parking_lot::RwLock; use futures_util::{stream::BoxStream, StreamExt}; use serde::{de::DeserializeOwned}; use tracing::{warn, debug, trace}; @@ -133,7 +133,7 @@ cfg_if::cfg_if! { where S: K8ExtendedSpec, { - let store = self.get_store::().await?; + let store = self.get_store::()?; store.retrieve_items().await } @@ -142,7 +142,7 @@ cfg_if::cfg_if! { S: K8ExtendedSpec, { trace!(?metadata, "delete item"); - let store = self.get_store::().await?; + let store = self.get_store::()?; if let Some(item) = store.try_retrieve_item::(&metadata).await? { if let Some(owner) = item.ctx().item().owner() { self.unlink_parent::(owner, item.ctx().item()).await?; @@ -166,7 +166,7 @@ cfg_if::cfg_if! { ::Owner: K8ExtendedSpec, { trace!(?value, "apply"); - let store = self.get_store::().await?; + let store = self.get_store::()?; value.ctx_mut().item_mut().id = value.key().to_string(); if let Some(owner) = value.ctx().item().owner() { self.link_parent::(owner, value.ctx().item()).await?; @@ -181,7 +181,7 @@ cfg_if::cfg_if! { use std::str::FromStr; trace!(?metadata, ?spec, "update spec"); - let store = self.get_store::().await?; + let store = self.get_store::()?; let item = match store.try_retrieve_item::(&metadata).await? { Some(mut item) => { item.ctx_mut().set_item(metadata); @@ -214,7 +214,7 @@ cfg_if::cfg_if! { id: key.to_string(), ..Default::default() }; - let store = self.get_store::().await?; + let store = self.get_store::()?; let item = match store.try_retrieve_item::(&metadata).await? { Some(mut item) => { item.set_spec(spec); @@ -235,7 +235,7 @@ cfg_if::cfg_if! { S: K8ExtendedSpec, { trace!(?metadata, ?status, "update status"); - let store = self.get_store::().await?; + let store = self.get_store::()?; let mut item = store.retrieve_item::(&metadata).await?; item.ctx_mut().set_item(metadata.clone()); item.set_status(status); @@ -252,12 +252,11 @@ cfg_if::cfg_if! { S: K8ExtendedSpec, { trace!(label = S::LABEL, ?resource_version, "watch stream"); - futures_util::stream::once(self.get_store::()) - .flat_map(move |store| match store { - Ok(store) => store.watch_stream_since(resource_version.as_ref()), - Err(err) => futures_util::stream::once(async { Result::<_>::Err(err) }).boxed(), - }) - .boxed() + let store = self.get_store::(); + match store { + Ok(store) => store.watch_stream_since(resource_version.as_ref()), + Err(err) => futures_util::stream::once(async { Result::<_>::Err(err) }).boxed(), + } } async fn patch_status( @@ -270,7 +269,7 @@ cfg_if::cfg_if! { S: K8ExtendedSpec, { trace!(?metadata, ?status, "patch status"); - let store = self.get_store::().await?; + let store = self.get_store::()?; let mut item = store.retrieve_item::(&metadata).await?; item.ctx_mut().set_item(metadata.clone()); item.set_status(status); @@ -309,15 +308,17 @@ cfg_if::cfg_if! { Self { path, stores } } - async fn get_store(&self) -> Result> { + fn get_store(&self) -> Result> { let key = S::LABEL; - let read = self.stores.upgradable_read().await; + let read = self.stores.read(); Ok(match read.get(key) { Some(store) => store.clone(), None => { - let mut write = RwLockUpgradableReadGuard::upgrade(read).await; - let store = Arc::new(SpecStore::load::(self.path.join(key)).await?); + drop(read); + let mut write = self.stores.write(); + let store = Arc::new(SpecStore::load::(self.path.join(key))?); write.insert(key, store.clone()); + drop(write); store } }) @@ -342,7 +343,7 @@ cfg_if::cfg_if! { child: &LocalMetadataItem, ) -> Result<()> { trace!(?parent, ?child, "link parent"); - let parent_store = self.get_store::().await?; + let parent_store = self.get_store::()?; parent_store .mut_in_place::(parent.uid(), |parent_obj| { parent_obj @@ -360,7 +361,7 @@ cfg_if::cfg_if! { child: &LocalMetadataItem, ) -> Result<()> { trace!(?parent, ?child, "link parent"); - let parent_store = self.get_store::().await?; + let parent_store = self.get_store::()?; parent_store .mut_in_place::(parent.uid(), |parent_obj| { parent_obj @@ -375,7 +376,6 @@ cfg_if::cfg_if! { async fn get_store_by_key(&self, key: &str) -> Result> { self.stores .read() - .await .get(key) .cloned() .ok_or_else(|| anyhow!("store not found for key {key}")) @@ -383,7 +383,7 @@ cfg_if::cfg_if! { } impl SpecStore { - async fn load>(path: P) -> Result { + fn load>(path: P) -> Result { std::fs::create_dir_all(&path)?; let version = Default::default(); let mut data: HashMap = Default::default(); @@ -423,7 +423,7 @@ cfg_if::cfg_if! { .version .load(std::sync::atomic::Ordering::SeqCst) .to_string(); - let read = self.data.read().await; + let read = self.data.read(); let items: Vec> = read .values() .map(SpecPointer::downcast) @@ -439,7 +439,7 @@ cfg_if::cfg_if! { where S: Spec, { - let read = self.data.read().await; + let read = self.data.read(); read.get(metadata.uid()) .map(SpecPointer::downcast) .transpose() @@ -455,10 +455,18 @@ cfg_if::cfg_if! { } async fn delete_item(&self, metadata: &LocalMetadataItem) { - let mut write = self.data.write().await; - if let Some(removed) = write.remove(metadata.uid()) { - removed.delete(); - drop(write); + let removed = { + let mut write = self.data.write(); + if let Some(removed) = write.remove(metadata.uid()) { + removed.delete(); + drop(write); + Some(removed) + } else { + None + } + }; + + if let Some(removed) = removed { self.send_update(SpecUpdate::Delete(removed)).await; } } @@ -468,20 +476,24 @@ cfg_if::cfg_if! { S: Spec + Serialize, { let id = value.ctx().item().uid().to_owned(); - let mut write = self.data.write().await; - if let Some(prev) = write.get(&id) { - let prev_meta = prev.downcast_ref::()?.ctx().item(); - let prev_rev = prev_meta.revision; - if prev_meta.is_newer(value.ctx().item()) { - let new_rev = value.ctx().item().revision; - anyhow::bail!("attempt to update by stale value: current version: {prev_rev}, proposed: {new_rev}"); - } - value.ctx_mut().item_mut().revision = prev_rev + 1; + let pointer = + { + let mut write = self.data.write(); + if let Some(prev) = write.get(&id) { + let prev_meta = prev.downcast_ref::()?.ctx().item(); + let prev_rev = prev_meta.revision; + if prev_meta.is_newer(value.ctx().item()) { + let new_rev = value.ctx().item().revision; + anyhow::bail!("attempt to update by stale value: current version: {prev_rev}, proposed: {new_rev}"); + } + value.ctx_mut().item_mut().revision = prev_rev + 1; + }; + let pointer = SpecPointer::new(self.spec_file_name(&id), value); + write.insert(id, pointer.clone()); + pointer.flush::()?; + drop(write); + pointer }; - let pointer = SpecPointer::new(self.spec_file_name(&id), value); - write.insert(id, pointer.clone()); - pointer.flush::()?; - drop(write); self.send_update(SpecUpdate::Mod(pointer)).await; Ok(()) } @@ -522,7 +534,7 @@ cfg_if::cfg_if! { where F: Fn(&mut LocalStoreObject), { - if let Some(spec) = self.data.write().await.get_mut(key) { + if let Some(spec) = self.data.write().get_mut(key) { let mut obj = spec.downcast::()?; func(&mut obj); spec.set(obj); diff --git a/makefiles/test.mk b/makefiles/test.mk index 5f4734e8fb..9c32d4a946 100644 --- a/makefiles/test.mk +++ b/makefiles/test.mk @@ -181,6 +181,9 @@ ifeq (${CI},true) # In CI, we expect all artifacts to already be built and loaded for the script upgrade-test: ./tests/upgrade-test.sh +else ifeq (${FLUVIO_MODE},local) +upgrade-test: build-cli + ./tests/upgrade-test.sh else # When not in CI (i.e. development), load the dev k8 image before running test upgrade-test: build-cli build_k8_image diff --git a/tests/upgrade-test.sh b/tests/upgrade-test.sh index df0e9f0a4e..3a654f8b52 100755 --- a/tests/upgrade-test.sh +++ b/tests/upgrade-test.sh @@ -52,11 +52,7 @@ function cleanup() { # If we're in CI, we want to slow down execution # to give CPU some time to rest, so we don't time out function ci_check() { - if [[ "$FLUVIO_MODE" == "local" ]]; then - sleep $CI_SLEEP - else : - fi } # This function is intended to be run second after the Stable-1 validation