Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rcore-fs-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ extern crate log;
#[cfg(feature = "use_fuse")]
pub mod fuse;
pub mod zip;
mod thread_pool;
79 changes: 79 additions & 0 deletions rcore-fs-cli/src/thread_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::sync::mpsc;

type Job = Box<dyn FnOnce() + Send + 'static>;

enum Message {
NewJob(Job),
Shutdown,
}

pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}

impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0, "Thread pool size must be greater than zero");

let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));

let mut workers = Vec::with_capacity(size);

for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool { workers, sender }
}

pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static {
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}

pub fn shutdown(&mut self) {
for _ in &self.workers {
self.sender.send(Message::Shutdown).unwrap();
}

for worker in &mut self.workers {
worker.thread.take().unwrap().join().unwrap();
}

self.workers.clear();
}
}

struct Worker {
id: usize,
thread: Option<JoinHandle<()>>,
}

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();

match message {
Message::NewJob(job) => {
job();
}
Message::Shutdown => {
break;
}
}
});

Worker { id, thread: Some(thread) }
}
}

impl Drop for ThreadPool {
fn drop(&mut self) {
self.shutdown();
}
}
173 changes: 156 additions & 17 deletions rcore-fs-cli/src/zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,86 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::str;
use std::sync::Arc;
use std::env;

use rcore_fs::vfs::{FileType, INode, PATH_MAX};
use rcore_fs::vfs::{FileType, DirEntryData, INode, PATH_MAX};
use rcore_fs::vfs::FsError;

use crate::thread_pool::ThreadPool;

const BUF_SIZE: usize = 0x10000;
const S_IMASK: u32 = 0o777;

struct FileTask {
inode: Arc<dyn INode>,
entry: fs::DirEntry,
}

pub fn zip_dir(path: &Path, inode: Arc<dyn INode>) -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(30);
zip_dir_task(path, inode, &pool)?;
Ok(())
}

pub fn zip_dir_task(path: &Path, inode: Arc<dyn INode>, pool: &ThreadPool) -> Result<(), Box<dyn Error>> {
let mut entries: Vec<fs::DirEntry> = fs::read_dir(path)?.map(|dir| dir.unwrap()).collect();
entries.sort_by_key(|entry| entry.file_name());

let mut dir_entries: Vec<DirEntryData> = Vec::new();
let mut file_tasks: Vec<FileTask> = Vec::new();

for entry in entries {
let name_ = entry.file_name();
let name = name_.to_str().unwrap();
let metadata = fs::symlink_metadata(entry.path())?;
let type_ = metadata.file_type();
let mode = (metadata.permissions().mode() & S_IMASK) as u16;
//println!("zip: name: {:?}, mode: {:#o}", entry.path(), mode);

if type_.is_file() {
let inode = inode.create(name, FileType::File, mode)?;
let mut file = fs::File::open(entry.path())?;
inode.resize(file.metadata()?.len() as usize)?;
let mut buf = unsafe { Box::<[u8; BUF_SIZE]>::new_uninit().assume_init() };
let mut offset = 0usize;
let mut len = BUF_SIZE;
while len == BUF_SIZE {
len = file.read(buf.as_mut())?;
inode.write_at(offset, &buf[..len])?;
offset += len;
}
} else if type_.is_dir() {
let inode = inode.create(name, FileType::Dir, mode)?;
zip_dir(entry.path().as_path(), inode)?;
let inode = inode.create_for_zip(name, FileType::File, mode)?;
dir_entries.push(DirEntryData { inode: Arc::clone(&inode), name: String::from(name), file_type: FileType::File });
file_tasks.push(FileTask { inode, entry });
} else if type_.is_symlink() {
let target = fs::read_link(entry.path())?;
let inode = inode.create(name, FileType::SymLink, mode)?;
let inode = inode.create_for_zip(name, FileType::SymLink, mode)?;
dir_entries.push(DirEntryData { inode: Arc::clone(&inode), name: String::from(name), file_type: FileType::SymLink });
let data = target.as_os_str().as_bytes();
inode.resize(data.len())?;
inode.write_at(0, data)?;
} else if type_.is_dir() {
let inode = inode.create_for_zip(name, FileType::Dir, mode)?;
dir_entries.push(DirEntryData { inode: Arc::clone(&inode), name: String::from(name), file_type: FileType::Dir });
zip_dir_task(entry.path().as_path(), inode, &pool)?;
}
}

if dir_entries.len() > 0 {
process_sync(inode, dir_entries, file_tasks, &pool);
}
Ok(())
}

fn process_sync(dir_inode: Arc<dyn INode>, dir_entries: Vec<DirEntryData>, file_tasks: Vec<FileTask>, pool: &ThreadPool) {
pool.execute(move || {
if let Err(e) = dir_inode.write_all_direntry(dir_entries) {
eprintln!("Failed to write direntry: {}", e);
}
if let Err(e) = process_files_task(&file_tasks) {
eprintln!("Failed to process files: {}", e);
}
});
}

fn process_files_task(file_tasks: &[FileTask]) -> Result<(), FsError> {
for task in file_tasks {
let mut file = fs::File::open(task.entry.path())?;
let mut buf = unsafe { Box::<[u8; BUF_SIZE]>::new_uninit().assume_init() };
let mut offset = 0usize;
let mut len = BUF_SIZE;
while len == BUF_SIZE {
len = file.read(buf.as_mut())?;
task.inode.write_at(offset, &buf[..len])?;
offset += len;
}
}
Ok(())
Expand Down Expand Up @@ -86,3 +129,99 @@ pub fn unzip_dir(path: &Path, inode: Arc<dyn INode>) -> Result<(), Box<dyn Error
}
Ok(())
}

pub fn inc_zip_dir(root_path: &Path, inc_path: &Path, root_inode: Arc<dyn INode>) -> Result<(), Box<dyn Error>> {
// Convert to absolute path
let abs_root = if root_path.is_absolute() {
root_path.to_path_buf()
} else {
env::current_dir()?.join(root_path)
}.canonicalize()?;

let abs_inc = if inc_path.is_absolute() {
inc_path.to_path_buf()
} else {
env::current_dir()?.join(inc_path)
}.canonicalize()?;

if !abs_inc.starts_with(&abs_root) {
return Err(format!("{} is not under root {}",
abs_inc.display(), abs_root.display()).into());
}

let relative_path = abs_inc.strip_prefix(&abs_root)?;
let mut components = Vec::new();

for c in relative_path.components() {
let os_str = c.as_os_str();
let s = os_str.to_str()
.ok_or_else(|| format!("Invalid UTF-8 component: {:?}", os_str))?;
components.push(s.to_string());
}

let mut current_inode = root_inode;
for (i, name) in components.iter().enumerate() {
let is_last = i == components.len() - 1;

// Process the directory in the middle of the path
if !is_last {
current_inode = match current_inode.lookup_follow(name, 1) {
Ok(inode) => {
if inode.metadata()?.type_ != FileType::Dir {
return Err(format!("'{}' exists but is not directory", name).into());
}
inode
}
Err(_) => current_inode.create(name, FileType::Dir, 0o755)?,
};
continue;
}

// Process the final path
if let Ok(_) = current_inode.lookup_follow(name, 1) {
if let Err(e) = current_inode.unlink_recursive(name) {
return Err(Box::new(e));
}
}

if !abs_inc.exists() { return Ok(()); }

let meta = fs::symlink_metadata(&abs_inc)?;
let mode = (meta.permissions().mode() & S_IMASK) as u16;

match meta.file_type() {
ft if ft.is_file() => {
let inode = current_inode.create(name, FileType::File, mode)?;
let mut file = fs::File::open(&abs_inc)?;
let size = file.metadata()?.len() as usize;
inode.resize(size)?;

let mut buf = vec![0u8; 4096];
let mut offset = 0;
while let Ok(len) = file.read(&mut buf) {
if len == 0 { break; }
inode.write_at(offset, &buf[..len])?;
offset += len;
}
}
ft if ft.is_dir() => {
let inode = current_inode.create(name, FileType::Dir, mode)?;
for entry in fs::read_dir(&abs_inc)? {
let entry = entry?;
let path = entry.path();
inc_zip_dir(&abs_root, &path, inode.clone())?;
}
}
ft if ft.is_symlink() => {
let target = fs::read_link(&abs_inc)?;
let inode = current_inode.create(name, FileType::SymLink, mode)?;
let data = target.as_os_str().as_bytes();
inode.resize(data.len())?;
inode.write_at(0, data)?;
}
_ => return Err("Unsupported file type".into()),
}
}

Ok(())
}
Loading