Skip to content

Commit 390ae87

Browse files
authored
feat(services/compfs): compio runtime and compfs structure (#4534)
* feat(services/compfs): compio thread * feat(services/compfs): basic structure for compfs * feat(services/compfs): scheme impl * fix naming
1 parent 95e6a0e commit 390ae87

File tree

8 files changed

+233
-0
lines changed

8 files changed

+233
-0
lines changed

core/src/services/compfs/backend.rs

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use super::core::CompioThread;
19+
use crate::raw::*;
20+
use crate::*;
21+
use async_trait::async_trait;
22+
use std::collections::HashMap;
23+
use std::path::PathBuf;
24+
25+
/// [`compio`]-based file system support.
26+
#[derive(Debug, Clone, Default)]
27+
pub struct CompfsBuilder {
28+
root: Option<PathBuf>,
29+
}
30+
31+
impl CompfsBuilder {
32+
/// Set root for Compfs
33+
pub fn root(&mut self, root: &str) -> &mut Self {
34+
self.root = if root.is_empty() {
35+
None
36+
} else {
37+
Some(PathBuf::from(root))
38+
};
39+
40+
self
41+
}
42+
}
43+
44+
impl Builder for CompfsBuilder {
45+
const SCHEME: Scheme = Scheme::Compfs;
46+
type Accessor = ();
47+
48+
fn from_map(map: HashMap<String, String>) -> Self {
49+
let mut builder = CompfsBuilder::default();
50+
51+
map.get("root").map(|v| builder.root(v));
52+
53+
builder
54+
}
55+
56+
fn build(&mut self) -> Result<Self::Accessor> {
57+
todo!()
58+
}
59+
}
60+
61+
#[derive(Debug)]
62+
pub struct CompfsBackend {
63+
rt: CompioThread,
64+
}
65+
66+
#[async_trait]
67+
impl Accessor for CompfsBackend {
68+
type Reader = ();
69+
type Writer = ();
70+
type Lister = ();
71+
type BlockingReader = ();
72+
type BlockingWriter = ();
73+
type BlockingLister = ();
74+
75+
fn info(&self) -> AccessorInfo {
76+
todo!()
77+
}
78+
}

core/src/services/compfs/core.rs

+91
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,100 @@
1616
// under the License.
1717

1818
use compio::buf::IoBuf;
19+
use compio::runtime::RuntimeBuilder;
20+
use futures::channel::mpsc::SendError;
21+
use futures::channel::{mpsc, oneshot};
22+
use futures::future::LocalBoxFuture;
23+
use futures::{SinkExt, StreamExt};
24+
use std::future::Future;
25+
use std::thread::JoinHandle;
1926

2027
use crate::Buffer;
2128

29+
/// This is arbitrary, but since all tasks are spawned instantly, we shouldn't need a too big buffer.
30+
const CHANNEL_SIZE: usize = 4;
31+
32+
fn task<F, Fut, T>(func: F) -> (Task<T>, SpawnTask)
33+
where
34+
F: (FnOnce() -> Fut) + Send + 'static,
35+
Fut: Future<Output = T>,
36+
T: Send + 'static,
37+
{
38+
let (tx, recv) = oneshot::channel();
39+
40+
let boxed = Box::new(|| {
41+
Box::pin(async move {
42+
let res = func().await;
43+
tx.send(res).ok();
44+
}) as _
45+
});
46+
47+
(Task(recv), SpawnTask(boxed))
48+
}
49+
50+
/// A task handle that can be used to retrieve result spawned into [`CompioThread`].
51+
pub struct Task<T>(oneshot::Receiver<T>);
52+
53+
/// Type erased task that can be spawned into a [`CompioThread`].
54+
struct SpawnTask(Box<dyn (FnOnce() -> LocalBoxFuture<'static, ()>) + Send>);
55+
56+
impl SpawnTask {
57+
fn call(self) -> LocalBoxFuture<'static, ()> {
58+
(self.0)()
59+
}
60+
}
61+
62+
#[derive(Debug)]
63+
pub struct CompioThread {
64+
thread: JoinHandle<()>,
65+
handle: SpawnHandle,
66+
}
67+
68+
impl CompioThread {
69+
pub fn new(builder: RuntimeBuilder) -> Self {
70+
let (send, mut recv) = mpsc::channel(CHANNEL_SIZE);
71+
let handle = SpawnHandle(send);
72+
let thread = std::thread::spawn(move || {
73+
let rt = builder.build().expect("failed to create runtime");
74+
rt.block_on(async {
75+
while let Some(task) = recv.next().await {
76+
rt.spawn(task.call()).detach();
77+
}
78+
});
79+
});
80+
Self { thread, handle }
81+
}
82+
83+
pub async fn spawn<F, Fut, T>(&self, func: F) -> Result<Task<T>, SendError>
84+
where
85+
F: (FnOnce() -> Fut) + Send + 'static,
86+
Fut: Future<Output = T>,
87+
T: Send + 'static,
88+
{
89+
self.handle.clone().spawn(func).await
90+
}
91+
92+
pub fn handle(&self) -> SpawnHandle {
93+
self.handle.clone()
94+
}
95+
}
96+
97+
#[derive(Debug, Clone)]
98+
pub struct SpawnHandle(mpsc::Sender<SpawnTask>);
99+
100+
impl SpawnHandle {
101+
pub async fn spawn<F, Fut, T>(&mut self, func: F) -> Result<Task<T>, SendError>
102+
where
103+
F: (FnOnce() -> Fut) + Send + 'static,
104+
Fut: Future<Output = T>,
105+
T: Send + 'static,
106+
{
107+
let (task, spawn) = task(func);
108+
self.0.send(spawn).await?;
109+
Ok(task)
110+
}
111+
}
112+
22113
unsafe impl IoBuf for Buffer {
23114
fn as_buf_ptr(&self) -> *const u8 {
24115
self.current().as_ptr()

core/src/services/compfs/lister.rs

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.

core/src/services/compfs/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#![allow(dead_code)] // TODO: Remove this after backend is implemented
19+
20+
mod backend;
1821
mod core;
22+
mod lister;
23+
mod reader;
24+
mod writer;
25+
26+
pub use backend::CompfsBuilder as Compfs;

core/src/services/compfs/reader.rs

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.

core/src/services/compfs/writer.rs

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.

core/src/services/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -409,3 +409,5 @@ pub use surrealdb::SurrealdbConfig;
409409

410410
#[cfg(feature = "services-compfs")]
411411
mod compfs;
412+
#[cfg(feature = "services-compfs")]
413+
pub use compfs::Compfs;

core/src/types/scheme.rs

+6
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ pub enum Scheme {
4040
Azdls,
4141
/// [B2][crate::services::B2]: Backblaze B2 Services.
4242
B2,
43+
/// [Compfs][crate::services::Compfs]: Compio fs Services.
44+
Compfs,
4345
/// [Seafile][crate::services::Seafile]: Seafile Services.
4446
Seafile,
4547
/// [Upyun][crate::services::Upyun]: Upyun Services.
@@ -205,6 +207,8 @@ impl Scheme {
205207
Scheme::Cacache,
206208
#[cfg(feature = "services-cos")]
207209
Scheme::Cos,
210+
#[cfg(feature = "services-compfs")]
211+
Scheme::Compfs,
208212
#[cfg(feature = "services-dashmap")]
209213
Scheme::Dashmap,
210214
#[cfg(feature = "services-dropbox")]
@@ -332,6 +336,7 @@ impl FromStr for Scheme {
332336
"b2" => Ok(Scheme::B2),
333337
"chainsafe" => Ok(Scheme::Chainsafe),
334338
"cacache" => Ok(Scheme::Cacache),
339+
"compfs" => Ok(Scheme::Compfs),
335340
"cloudflare_kv" => Ok(Scheme::CloudflareKv),
336341
"cos" => Ok(Scheme::Cos),
337342
"d1" => Ok(Scheme::D1),
@@ -402,6 +407,7 @@ impl From<Scheme> for &'static str {
402407
Scheme::Cacache => "cacache",
403408
Scheme::CloudflareKv => "cloudflare_kv",
404409
Scheme::Cos => "cos",
410+
Scheme::Compfs => "compfs",
405411
Scheme::D1 => "d1",
406412
Scheme::Dashmap => "dashmap",
407413
Scheme::Etcd => "etcd",

0 commit comments

Comments
 (0)