diff --git a/.github/actions/test_behavior_integration_object_store/action.yml b/.github/actions/test_behavior_integration_object_store/action.yml new file mode 100644 index 000000000000..707010b4db49 --- /dev/null +++ b/.github/actions/test_behavior_integration_object_store/action.yml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Test Integration Object Store +description: 'Test Integration Object Store with given setup and service' +inputs: + setup: + description: "The setup action for test" + service: + description: "The service to test" + feature: + description: "The feature to test" + +runs: + using: "composite" + steps: + - name: Setup + shell: bash + run: | + mkdir -p ./dynamic_test_integration_object_store && + cat <./dynamic_test_integration_object_store/action.yml + runs: + using: composite + steps: + - name: Setup Test + uses: ./.github/services/${{ inputs.service }}/${{ inputs.setup }} + - name: Run Test Integration Object Store + shell: bash + working-directory: integrations/object_store + run: cargo test behavior + env: + OPENDAL_TEST: ${{ inputs.service }} + EOF + - name: Run + uses: ./dynamic_test_integration_object_store diff --git a/.github/scripts/test_behavior/plan.py b/.github/scripts/test_behavior/plan.py index 47f32aadf347..0f32ec6e4f53 100755 --- a/.github/scripts/test_behavior/plan.py +++ b/.github/scripts/test_behavior/plan.py @@ -35,6 +35,8 @@ BIN = ["ofs"] +INTEGRATIONS = ["object_store"] + def provided_cases() -> list[dict[str, str]]: root_dir = f"{GITHUB_DIR}/services" @@ -86,6 +88,8 @@ class Hint: binding_nodejs: bool = field(default=False, init=False) # Is bin ofs affected? bin_ofs: bool = field(default=False, init=False) + # Is integration object_store affected ? + integration_object_store: bool = field(default=False, init=False) # Should we run all services tests? all_service: bool = field(default=False, init=False) @@ -105,6 +109,8 @@ def calculate_hint(changed_files: list[str]) -> Hint: hint.core = True for language in LANGUAGE_BINDING: setattr(hint, f"binding_{language}", True) + for integration in INTEGRATIONS: + setattr(hint, f"integration_{integration}", True) hint.all_service = True if p == ".github/workflows/test_behavior_core.yml": @@ -115,11 +121,17 @@ def calculate_hint(changed_files: list[str]) -> Hint: if p == f".github/workflows/test_behavior_binding_{language}.yml": setattr(hint, f"binding_{language}", True) hint.all_service = True + for bin in BIN: if p == f".github/workflows/test_behavior_bin_{bin}.yml": setattr(hint, f"bin_{bin}", True) hint.all_service = True + for integration in INTEGRATIONS: + if p == f".github/workflows/test_behavior_integration_{integration}.yml": + setattr(hint, f"integration_{integration}", True) + hint.all_service = True + # core affected if ( p.startswith("core/") @@ -133,6 +145,8 @@ def calculate_hint(changed_files: list[str]) -> Hint: hint.binding_python = True hint.binding_nodejs = True hint.bin_ofs = True + for integration in INTEGRATIONS: + setattr(hint, f"integration_{integration}", True) hint.all_service = True # language binding affected @@ -147,6 +161,12 @@ def calculate_hint(changed_files: list[str]) -> Hint: setattr(hint, f"bin_{bin}", True) hint.all_service = True + # integration affected + for integration in INTEGRATIONS: + if p.startswith(f"integrations/{integration}"): + setattr(hint, f"integration_{integration}", True) + hint.all_service = True + # core service affected match = re.search(r"core/src/services/([^/]+)/", p) if match: @@ -155,6 +175,8 @@ def calculate_hint(changed_files: list[str]) -> Hint: setattr(hint, f"binding_{language}", True) for bin in BIN: setattr(hint, f"bin_{bin}", True) + for integration in INTEGRATIONS: + setattr(hint, f"integration_{integration}", True) hint.services.add(match.group(1)) # core test affected @@ -165,6 +187,8 @@ def calculate_hint(changed_files: list[str]) -> Hint: setattr(hint, f"binding_{language}", True) for bin in BIN: setattr(hint, f"bin_{bin}", True) + for integration in INTEGRATIONS: + setattr(hint, f"integration_{integration}", True) hint.services.add(match.group(1)) # fixture affected @@ -175,6 +199,8 @@ def calculate_hint(changed_files: list[str]) -> Hint: setattr(hint, f"binding_{language}", True) for bin in BIN: setattr(hint, f"bin_{bin}", True) + for integration in INTEGRATIONS: + setattr(hint, f"integration_{integration}", True) hint.services.add(match.group(1)) return hint @@ -243,7 +269,7 @@ def generate_language_binding_cases( if hint.all_service: return cases - # Filter all cases that not shown un in changed files + # Filter all cases that not shown up in changed files cases = [v for v in cases if v["service"] in hint.services] return cases @@ -265,7 +291,30 @@ def generate_bin_cases( if hint.all_service: return cases - # Filter all cases that not shown un in changed files + # Filter all cases that not shown up in changed files + cases = [v for v in cases if v["service"] in hint.services] + + return cases + + +def generate_integration_cases( + cases: list[dict[str, str]], hint: Hint, integration: str + ) -> list[dict[str, str]]: + # Return empty if this integration is False + if not getattr(hint, f"integration_{integration}"): + return [] + + cases = unique_cases(cases) + + if integration == "object_store": + supported_services = ["fs", "s3"] + cases = [v for v in cases if v["service"] in supported_services] + + # Return all services if all_service is True + if hint.all_service: + return cases + + # Filter all cases that not shown up in changed files cases = [v for v in cases if v["service"] in hint.services] return cases @@ -315,6 +364,17 @@ def plan(changed_files: list[str]) -> dict[str, Any]: if len(bin_cases) > 0: jobs["components"][f"bin_{bin}"] = True jobs[f"bin_{bin}"].append({"os": "ubuntu-latest", "cases": bin_cases}) + + for integration in INTEGRATIONS: + jobs[f"integration_{integration}"] = [] + jobs["components"][f"integration_{integration}"] = False + integration_cases = generate_integration_cases(cases, hint, integration) + if len(integration_cases) > 0: + jobs["components"][f"integration_{integration}"] = True + jobs[f"integration_{integration}"].append( + {"os": "ubuntu-latest", "cases": integration_cases} + ) + return jobs diff --git a/.github/scripts/test_behavior/test_plan.py b/.github/scripts/test_behavior/test_plan.py index 0e3f33069456..aaa72e21769f 100644 --- a/.github/scripts/test_behavior/test_plan.py +++ b/.github/scripts/test_behavior/test_plan.py @@ -60,6 +60,16 @@ def test_bin_ofs(self): # Should contain ofs self.assertTrue("fs" in cases) + def test_integration_object_store(self): + result = plan(["integrations/object_store/Cargo.toml"]) + self.assertTrue(result["components"]["integration_object_store"]) + self.assertTrue(len(result["integration_object_store"]) > 0) + + result = plan(["core/src/services/fs/mod.rs"]) + cases = [v["service"] for v in result["integration_object_store"][0]["cases"]] + # Should contain fs + self.assertTrue("fs" in cases) + if __name__ == "__main__": unittest.main() diff --git a/.github/workflows/test_behavior.yml b/.github/workflows/test_behavior.yml index 4f53ea9abf8e..0c087d24e1cc 100644 --- a/.github/workflows/test_behavior.yml +++ b/.github/workflows/test_behavior.yml @@ -145,3 +145,17 @@ jobs: with: os: ${{ matrix.os }} cases: ${{ toJson(matrix.cases) }} + + test_integration_object_store: + name: integration_object_store / ${{ matrix.os }} + needs: [ plan ] + if: fromJson(needs.plan.outputs.plan).components.integration_object_store + secrets: inherit + strategy: + fail-fast: false + matrix: + include: ${{ fromJson(needs.plan.outputs.plan).integration_object_store }} + uses: ./.github/workflows/test_behavior_integration_object_store.yml + with: + os: ${{ matrix.os }} + cases: ${{ toJson(matrix.cases) }} diff --git a/.github/workflows/test_behavior_integration_object_store.yml b/.github/workflows/test_behavior_integration_object_store.yml new file mode 100644 index 000000000000..7f05262b4b30 --- /dev/null +++ b/.github/workflows/test_behavior_integration_object_store.yml @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Behavior Test Integration Object Store + +on: + workflow_call: + inputs: + os: + required: true + type: string + cases: + required: true + type: string + +jobs: + test: + name: ${{ matrix.cases.service }} / ${{ matrix.cases.setup }} + runs-on: ${{ inputs.os }} + strategy: + fail-fast: false + matrix: + cases: ${{ fromJson(inputs.cases) }} + steps: + - uses: actions/checkout@v4 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + with: + need-nextest: true + need-protoc: true + need-rocksdb: true + github-token: ${{ secrets.GITHUB_TOKEN }} + + # TODO: 1password is only supported on linux + # + # Waiting for https://github.com/1Password/load-secrets-action/issues/46 + - name: Setup 1Password Connect + if: runner.os == 'Linux' + uses: 1password/load-secrets-action/configure@v1 + with: + connect-host: ${{ secrets.OP_CONNECT_HOST }} + connect-token: ${{ secrets.OP_CONNECT_TOKEN }} + + - name: Test Core + uses: ./.github/actions/test_behavior_integration_object_store + with: + setup: ${{ matrix.cases.setup }} + service: ${{ matrix.cases.service }} + feature: ${{ matrix.cases.feature }} diff --git a/core/src/types/read/buffer_stream.rs b/core/src/types/read/buffer_stream.rs index eff0287e5e6d..304a5c6b411b 100644 --- a/core/src/types/read/buffer_stream.rs +++ b/core/src/types/read/buffer_stream.rs @@ -85,7 +85,7 @@ impl ChunkedReader { /// /// # Notes /// - /// We don't need to handle `Executor::timeout` since we are outside of the layer. + /// We don't need to handle `Executor::timeout` since we are outside the layer. fn new(ctx: Arc, range: BytesRange) -> Self { let tasks = ConcurrentTasks::new( ctx.args().executor().cloned().unwrap_or_default(), diff --git a/integrations/object_store/Cargo.toml b/integrations/object_store/Cargo.toml index 2afc0abd0a07..d7c78cd1598e 100644 --- a/integrations/object_store/Cargo.toml +++ b/integrations/object_store/Cargo.toml @@ -30,6 +30,11 @@ version = "0.49.0" [features] send_wrapper = ["dep:send_wrapper"] +[[test]] +harness = false +name = "behavior" +path = "tests/behavior/main.rs" + [dependencies] async-trait = "0.1" bytes = "1" @@ -46,6 +51,10 @@ tokio = { version = "1", default-features = false } opendal = { version = "0.51.0", path = "../../core", features = [ "services-memory", "services-s3", + "tests", ] } rand = "0.8.5" tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } +anyhow = "1.0.86" +libtest-mimic = "0.7.3" +uuid = "1.11.0" diff --git a/integrations/object_store/src/store.rs b/integrations/object_store/src/store.rs index 9ee32889f1ed..aaef7caa77bd 100644 --- a/integrations/object_store/src/store.rs +++ b/integrations/object_store/src/store.rs @@ -90,6 +90,7 @@ use tokio::sync::{Mutex, Notify}; /// assert_eq!(content, bytes); /// } /// ``` +#[derive(Clone)] pub struct OpendalStore { info: Arc, inner: Operator, diff --git a/integrations/object_store/tests/behavior/delete.rs b/integrations/object_store/tests/behavior/delete.rs new file mode 100644 index 000000000000..c971ec890d54 --- /dev/null +++ b/integrations/object_store/tests/behavior/delete.rs @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::build_trail; +use bytes::Bytes; +use libtest_mimic::Trial; +use object_store::path::Path; +use object_store::ObjectStore; +use object_store_opendal::OpendalStore; + +pub fn tests(store: &OpendalStore, tests: &mut Vec) { + tests.push(build_trail("test_delete", store, test_delete)); +} + +pub async fn test_delete(store: OpendalStore) -> anyhow::Result<()> { + let location = Path::from("data/test.txt"); + let value = Bytes::from("Hello, world!"); + store.put(&location, value.clone().into()).await?; + + store.delete(&location).await?; + + let err = store.get(&location).await.err().unwrap(); + assert!(matches!(err, object_store::Error::NotFound { .. })); + + Ok(()) +} diff --git a/integrations/object_store/tests/behavior/get.rs b/integrations/object_store/tests/behavior/get.rs new file mode 100644 index 000000000000..3b241864f61c --- /dev/null +++ b/integrations/object_store/tests/behavior/get.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::{build_trail, new_file_path}; +use anyhow::Result; +use bytes::Bytes; +use libtest_mimic::Trial; +use object_store::ObjectStore; +use object_store_opendal::OpendalStore; + +pub fn tests(store: &OpendalStore, tests: &mut Vec) { + tests.push(build_trail("test_basic_get", store, test_basic_get)); + tests.push(build_trail("test_head", store, test_head)); + tests.push(build_trail("test_get_range", store, test_get_range)); +} + +pub async fn test_basic_get(store: OpendalStore) -> Result<()> { + let location = new_file_path("data").into(); + let value = Bytes::from_static(b"Hello, world!"); + + store.put(&location, value.clone().into()).await?; + + let ret = store.get(&location).await?; + + assert_eq!(0..value.len(), ret.range); + let data = ret.bytes().await?; + assert_eq!(value, data); + + let ret = store.get(&"not_exist".into()).await; + assert!(ret.is_err()); + assert!(matches!( + ret.err().unwrap(), + object_store::Error::NotFound { .. } + )); + + store.delete(&location).await?; + Ok(()) +} + +pub async fn test_head(store: OpendalStore) -> Result<()> { + let location = new_file_path("data").into(); + let value = Bytes::from_static(b"Hello, world!"); + + store.put(&location, value.clone().into()).await?; + + let meta = store.head(&location).await?; + + assert_eq!(meta.size, value.len()); + assert_eq!(meta.location, location); + + store.delete(&location).await?; + Ok(()) +} + +pub async fn test_get_range(store: OpendalStore) -> Result<()> { + let location = new_file_path("data").into(); + let value = Bytes::from_static(b"Hello, world!"); + + store.put(&location, value.clone().into()).await?; + + let ret = store.get_range(&location, 0..5).await?; + assert_eq!(Bytes::from_static(b"Hello"), ret); + + store.delete(&location).await?; + Ok(()) +} diff --git a/integrations/object_store/tests/behavior/main.rs b/integrations/object_store/tests/behavior/main.rs new file mode 100644 index 000000000000..1429eb5f505b --- /dev/null +++ b/integrations/object_store/tests/behavior/main.rs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod delete; +mod get; +mod put; +mod utils; + +use libtest_mimic::Arguments; + +fn main() -> anyhow::Result<()> { + let Ok(Some(op)) = opendal::raw::tests::init_test_service() else { + return Ok(()); + }; + let store = object_store_opendal::OpendalStore::new(op); + + let mut tests = Vec::new(); + delete::tests(&store, &mut tests); + put::tests(&store, &mut tests); + get::tests(&store, &mut tests); + + let args = Arguments::from_args(); + let conclusion = libtest_mimic::run(&args, tests); + conclusion.exit(); +} diff --git a/integrations/object_store/tests/behavior/put.rs b/integrations/object_store/tests/behavior/put.rs new file mode 100644 index 000000000000..bbafd58b16c1 --- /dev/null +++ b/integrations/object_store/tests/behavior/put.rs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::{build_trail, new_file_path}; +use anyhow::Result; +use bytes::Bytes; +use libtest_mimic::Trial; +use object_store::ObjectStore; +use object_store_opendal::OpendalStore; + +pub fn tests(store: &OpendalStore, tests: &mut Vec) { + tests.push(build_trail("test_basic_put", store, test_basic_put)); +} + +pub async fn test_basic_put(store: OpendalStore) -> Result<()> { + let location = new_file_path("data").into(); + let value = Bytes::from("Hello, world!"); + store.put(&location, value.clone().into()).await?; + + let data = store.get(&location).await?.bytes().await?; + assert_eq!(value, data); + + store.delete(&location).await?; + Ok(()) +} diff --git a/integrations/object_store/tests/behavior/utils.rs b/integrations/object_store/tests/behavior/utils.rs new file mode 100644 index 000000000000..8b75d96aeb21 --- /dev/null +++ b/integrations/object_store/tests/behavior/utils.rs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use libtest_mimic::{Failed, Trial}; +use object_store_opendal::OpendalStore; +use opendal::raw::tests::TEST_RUNTIME; +use opendal::raw::MaybeSend; +use std::future::Future; + +pub fn build_trail(name: &str, store: &OpendalStore, f: F) -> Trial +where + F: FnOnce(OpendalStore) -> Fut + MaybeSend + 'static, + Fut: Future>, +{ + let handle = TEST_RUNTIME.handle().clone(); + + let store = store.clone(); + Trial::test(format!("behavior::{name}"), move || { + handle + .block_on(f(store)) + .map_err(|err| Failed::from(err.to_string())) + }) +} + +pub fn new_file_path(dir: &str) -> String { + format!("{}/{}", dir, uuid::Uuid::new_v4()) +}