Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rcore-fs-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ extern crate log;
#[cfg(feature = "use_fuse")]
pub mod fuse;
pub mod zip;
pub mod thread_pool;
10 changes: 8 additions & 2 deletions rcore-fs-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fs::OpenOptions;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};

use rcore_fs_cli::thread_pool::Pool;
use structopt::StructOpt;

use rcore_fs::dev::std_impl::StdTimeProvider;
Expand Down Expand Up @@ -38,6 +39,10 @@ enum Opt {
/// File system: [sfs | sefs | hostfs]
#[structopt(short = "f", long = "fs", default_value = "sfs")]
fs: String,

/// Number of threads
#[structopt(short="j", long, default_value="4")]
thread_num: usize,
},

/// Extract files from a fs image.
Expand Down Expand Up @@ -86,9 +91,10 @@ fn main() {
let opt = Opt::from_args();

match opt {
Opt::Zip { dir, image, fs } => {
Opt::Zip { dir, image, fs , thread_num} => {
let fs = open_fs(&fs, &image, true);
zip_dir(&dir, fs.root_inode()).expect("failed to zip fs");
let thread_pool = Pool::new(thread_num);
zip_dir(&dir, fs.root_inode(), &thread_pool).expect("failed to zip fs");
}
Opt::Unzip { dir, image, fs } => {
let fs = open_fs(&fs, &image, false);
Expand Down
81 changes: 81 additions & 0 deletions rcore-fs-cli/src/thread_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::{sync::{mpsc, Arc, Mutex}, thread::{JoinHandle, self}};

pub struct Pool {
workers: Vec<Worker>,
max_workers: usize,
sender: mpsc::Sender<Message>
}

impl Pool {
pub fn new(max_workers: usize) -> Pool {
if max_workers == 0 {
panic!("max_workers must not be zero!")
}
let (tx, rx) = mpsc::channel();

let mut workers = Vec::with_capacity(max_workers);
let receiver = Arc::new(Mutex::new(rx));
for i in 0..max_workers {
workers.push(Worker::new(i, Arc::clone(&receiver)));
}

Pool { workers: workers, max_workers: max_workers, sender: tx }
}

pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
{

let job = Message::NewJob(Box::new(f));
self.sender.send(job).unwrap();
}
}

impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::Close).unwrap();
}
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
t.join().unwrap();
}
}
}
}

struct Worker
{
_id: usize,
t: Option<JoinHandle<()>>,
}

type Job = Box<dyn FnOnce() + 'static + Send>;
enum Message {
Close,
NewJob(Job),
}

impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
job();
},
Message::Close => {
// println!("Close from worker[{}]", id);
break
},
}
}
});

Worker {
_id: id,
t: Some(t),
}
}
}
82 changes: 67 additions & 15 deletions rcore-fs-cli/src/zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,94 @@ use std::sync::Arc;

use rcore_fs::vfs::{FileType, INode, PATH_MAX};

use crate::thread_pool;

const BUF_SIZE: usize = 0x10000;
const S_IMASK: u32 = 0o777;

pub fn zip_dir(path: &Path, inode: Arc<dyn INode>) -> Result<(), Box<dyn Error>> {
pub fn zip_dir(path: &Path, inode: Arc<dyn INode>, thread_pool: &thread_pool::Pool, image_time: Option<i64>) -> Result<(), Box<dyn Error>> {
let mut entries: Vec<fs::DirEntry> = fs::read_dir(path)?.map(|dir| dir.unwrap()).collect();
entries.sort_by_key(|entry| entry.file_name());
let is_incremental = image_time.is_some();
let mut deleted_files: Vec<String> = Vec::new();
if is_incremental {
// at first, we record all the files in the image,
// existing files would be remove from the lsit later
deleted_files = inode.list()?;
let _ = deleted_files.drain(0..2);
}
for entry in entries {
let name_ = entry.file_name();
let name = name_.to_str().unwrap();
let metadata = fs::symlink_metadata(entry.path())?;
let type_ = metadata.file_type();
let mode = (metadata.permissions().mode() & S_IMASK) as u16;
//println!("zip: name: {:?}, mode: {:#o}", entry.path(), mode);
if type_.is_file() {
let inode = inode.create(name, FileType::File, mode)?;
let mut file = fs::File::open(entry.path())?;
inode.resize(file.metadata()?.len() as usize)?;
let mut buf = unsafe { Box::<[u8; BUF_SIZE]>::new_uninit().assume_init() };
let mut offset = 0usize;
let mut len = BUF_SIZE;
while len == BUF_SIZE {
len = file.read(buf.as_mut())?;
inode.write_at(offset, &buf[..len])?;
offset += len;
if is_incremental {
// if a file still exists, remove it from deleted_files
// we use a linear search here, because `inode.list()` should have
// same order with `entries`. we break at the first match,
// it would not cause a large overhead.
for (index, image_node_name) in deleted_files.iter().enumerate() {
if image_node_name == name {
deleted_files.remove(index);
break;
}
}
// skip the file not modified after image is created
if !type_.is_dir() {
if let Some(last_modify) = image_time {
use std::os::linux::fs::MetadataExt;
if metadata.st_ctime() < last_modify {
continue;
}
println!("{} needs to be updated", name);
}
}
}
if type_.is_file() {
let inode = if !is_incremental {
inode.create(name, FileType::File, mode)?
} else {
inode.find(name).or(inode.create(name, FileType::File, mode))?
};
// copy file content in another thread
thread_pool.execute(move ||{
let mut file = fs::File::open(entry.path()).unwrap();
inode.resize(file.metadata().unwrap().len() as usize).expect(format!("resize {} failed", entry.path().display()).as_str());
let mut buf = unsafe { Box::<[u8; BUF_SIZE]>::new_uninit().assume_init() };
let mut offset = 0usize;
let mut len = BUF_SIZE;
while len == BUF_SIZE {
len = file.read(buf.as_mut()).unwrap();
inode.write_at(offset, &buf[..len]).expect(format!("write {} failed", entry.path().display()).as_str());
offset += len;
};
});
} else if type_.is_dir() {
let inode = inode.create(name, FileType::Dir, mode)?;
zip_dir(entry.path().as_path(), inode)?;
let inode = if !is_incremental {
inode.create(name, FileType::Dir, mode)?
} else {
inode.find(name).or(inode.create(name, FileType::Dir, mode))?
};
zip_dir(entry.path().as_path(), inode, thread_pool, image_time)?;
} else if type_.is_symlink() {
let target = fs::read_link(entry.path())?;
let inode = inode.create(name, FileType::SymLink, mode)?;
let inode = if !is_incremental {
inode.create(name, FileType::SymLink, mode)?
} else {
inode.find(name).or(inode.create(name, FileType::SymLink, mode))?
};
let data = target.as_os_str().as_bytes();
inode.resize(data.len())?;
inode.write_at(0, data)?;
}
}
// Delete files that are not in the source directory
for file_name in deleted_files {
inode.unlink(&file_name).unwrap();
println!("{} deleted", file_name);
}
Ok(())
}

Expand Down
28 changes: 14 additions & 14 deletions rcore-fs-sefs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ impl INodeImpl {
let mut disk_inode = self.disk_inode.write();
if disk_inode.dirty() {
self.fs
.meta_file
.meta_file.write()
.write_block(self.id, disk_inode.as_buf())?;
disk_inode.sync();
}
self.fs.meta_file.flush()?;
self.fs.meta_file.write().flush()?;
Ok(())
}

Expand Down Expand Up @@ -843,7 +843,7 @@ pub struct SEFS {
/// device
device: Box<dyn Storage>,
/// metadata file
meta_file: Box<dyn File>,
meta_file: RwLock<Box<dyn File>>,
/// Time provider
time_provider: &'static dyn TimeProvider,
/// uuid provider
Expand All @@ -868,10 +868,10 @@ impl SEFS {
time_provider: &'static dyn TimeProvider,
uuid_provider: &'static dyn UuidProvider,
) -> vfs::Result<Arc<Self>> {
let meta_file = device.open(METAFILE_NAME)?;
let meta_file = RwLock::new(device.open(METAFILE_NAME)?);

// Load super block
let super_block = meta_file.load_struct::<SuperBlock>(BLKN_SUPER)?;
let super_block = meta_file.read().load_struct::<SuperBlock>(BLKN_SUPER)?;
if !super_block.check() {
return Err(FsError::WrongFs);
}
Expand All @@ -883,7 +883,7 @@ impl SEFS {
}
for i in 0..super_block.groups as usize {
let block_id = Self::get_freemap_block_id_of_group(i);
meta_file.read_block(
meta_file.read().read_block(
block_id,
&mut free_map.as_mut_slice()[BLKSIZE * i..BLKSIZE * (i + 1)],
)?;
Expand Down Expand Up @@ -926,8 +926,8 @@ impl SEFS {
};
// Clear the existing files in storage
device.clear()?;
let meta_file = device.create(METAFILE_NAME)?;
meta_file.set_len(blocks * BLKSIZE)?;
let meta_file = RwLock::new(device.create(METAFILE_NAME)?);
meta_file.write().set_len(blocks * BLKSIZE)?;

let sefs = SEFS {
super_block: RwLock::new(Dirty::new_dirty(super_block)),
Expand Down Expand Up @@ -972,21 +972,21 @@ impl SEFS {
let (mut free_map, mut super_block) = self.write_lock_free_map_and_super_block();
// Sync super block
if super_block.dirty() {
self.meta_file
self.meta_file.write()
.write_all_at(super_block.as_buf(), BLKSIZE * BLKN_SUPER)?;
super_block.sync();
}
// Sync free map
if free_map.dirty() {
for i in 0..super_block.groups as usize {
let slice = &free_map.as_slice()[BLKSIZE * i..BLKSIZE * (i + 1)];
self.meta_file
self.meta_file.write()
.write_all_at(slice, BLKSIZE * Self::get_freemap_block_id_of_group(i))?;
}
free_map.sync();
}
// Flush
self.meta_file.flush()?;
self.meta_file.write().flush()?;
Ok(())
}

Expand All @@ -999,7 +999,7 @@ impl SEFS {
super_block.groups += 1;
super_block.blocks += BLKBITS as u32;
super_block.unused_blocks += BLKBITS as u32 - 1;
self.meta_file
self.meta_file.write()
.set_len(super_block.groups as usize * BLKBITS * BLKSIZE)
.expect("failed to extend meta file");
free_map.extend(core::iter::repeat(true).take(BLKBITS));
Expand Down Expand Up @@ -1073,7 +1073,7 @@ impl SEFS {
}
}
// Load if not in set, or is weak ref.
let disk_inode = Dirty::new(self.meta_file.load_struct::<DiskINode>(id)?);
let disk_inode = Dirty::new(self.meta_file.read().load_struct::<DiskINode>(id)?);
self._new_inode(id, disk_inode, false)
}

Expand Down Expand Up @@ -1143,7 +1143,7 @@ impl vfs::FileSystem for SEFS {
}

fn root_mac(&self) -> vfs::FsMac {
self.meta_file.get_file_mac().unwrap().0
self.meta_file.read().get_file_mac().unwrap().0
}

fn info(&self) -> vfs::FsInfo {
Expand Down
Loading