Skip to content

Commit

Permalink
vfs: support messagedWrite
Browse files Browse the repository at this point in the history
  • Loading branch information
NyaMisty committed Jun 17, 2021
1 parent f188d0b commit 6826c23
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 22 deletions.
17 changes: 15 additions & 2 deletions vfs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type File struct {
pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written
pendingRenameFun func(ctx context.Context) error // will be run/renamed after all writers close
appendMode bool // file was opened with O_APPEND
sys atomic.Value // user defined info to be attached here
messagedWrite bool
sys atomic.Value // user defined info to be attached here

muRW sync.Mutex // synchronize RWFileHandle.openPending(), RWFileHandle.close() and File.Remove
}
Expand Down Expand Up @@ -505,8 +506,17 @@ func (f *File) waitForValidObject() (o fs.Object, err error) {
}

// openRead open the file for read
func (f *File) openRead() (fh *ReadFileHandle, err error) {
func (f *File) openRead() (fh Handle, err error) {
// if o is nil it isn't valid yet
if f.messagedWrite {
fh, err = newStubReadFileHandle(f)
if err != nil {
fs.Debugf(f.Path(), "File.openRead failed: %v", err)
return nil, err
}
return fh, nil
}

_, err = f.waitForValidObject()
if err != nil {
return nil, err
Expand All @@ -532,6 +542,9 @@ func (f *File) openWrite(flags int) (fh *WriteFileHandle, err error) {
}
// fs.Debugf(f.Path(), "File.openWrite")

if d.vfs.Opt.MessagedWrite {
f.messagedWrite = true
}
fh, err = newWriteFileHandle(d, f, f.Path(), flags)
if err != nil {
fs.Debugf(f.Path(), "File.openWrite failed: %v", err)
Expand Down
58 changes: 58 additions & 0 deletions vfs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,64 @@ import (
"github.com/rclone/rclone/fs/hash"
)

type StubReadFileHandle struct {
baseHandle
file *File
size int64 // size of the object (0 for unknown length)
}

func newStubReadFileHandle(f *File) (*StubReadFileHandle, error) {
o := f.getObject()
return &StubReadFileHandle{
file: f,
size: nonNegative(o.Size()),
}, nil
}

// String converts it to printable
func (fh *StubReadFileHandle) String() string {
return "<*StubReadFileHandle>"
}

// Node returns the Node associated with this - satisfies Noder interface
func (fh *StubReadFileHandle) Node() Node {
return fh.file
}

func (fh *StubReadFileHandle) Seek(offset int64, whence int) (n int64, err error) {
return 0, nil
}

func (fh *StubReadFileHandle) ReadAt(p []byte, off int64) (n int, err error) {
return len(p), nil
}

func (fh *StubReadFileHandle) Read(p []byte) (n int, err error) {
return len(p), nil
}

func (fh *StubReadFileHandle) Close() error {
return nil
}

func (fh *StubReadFileHandle) Flush() error {
return nil
}

func (fh *StubReadFileHandle) Release() error {
return nil
}

// Size returns the size of the underlying file
func (fh *StubReadFileHandle) Size() int64 {
return fh.size
}

// Stat returns info about the file
func (fh *StubReadFileHandle) Stat() (os.FileInfo, error) {
return fh.file, nil
}

// ReadFileHandle is an open for read file handle on a File
type ReadFileHandle struct {
baseHandle
Expand Down
2 changes: 2 additions & 0 deletions vfs/vfscommon/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Options struct {
CacheMaxAge time.Duration
CacheMaxSize fs.SizeSuffix
CachePollInterval time.Duration
MessagedWrite bool
CaseInsensitive bool
WriteWait time.Duration // time to wait for in-sequence write
ReadWait time.Duration // time to wait for in-sequence read
Expand Down Expand Up @@ -55,6 +56,7 @@ var DefaultOpt = Options{
ChunkSizeLimit: -1,
CacheMaxSize: -1,
CaseInsensitive: runtime.GOOS == "windows" || runtime.GOOS == "darwin", // default to true on Windows and Mac, false otherwise
MessagedWrite: false,
WriteWait: 1000 * time.Millisecond,
ReadWait: 20 * time.Millisecond,
WriteBack: 5 * time.Second,
Expand Down
1 change: 1 addition & 0 deletions vfs/vfsflags/vfsflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.DurationVarP(flagSet, &Opt.CachePollInterval, "vfs-cache-poll-interval", "", Opt.CachePollInterval, "Interval to poll the cache for stale objects.")
flags.DurationVarP(flagSet, &Opt.CacheMaxAge, "vfs-cache-max-age", "", Opt.CacheMaxAge, "Max age of objects in the cache.")
flags.FVarP(flagSet, &Opt.CacheMaxSize, "vfs-cache-max-size", "", "Max total size of objects in the cache.")
flags.BoolVarP(flagSet, &Opt.MessagedWrite, "vfs-messaged-write", "", Opt.MessagedWrite, "On vfs-cache-mode <= minimal, serialize write-only files' write operation into messages to support seek.")
flags.FVarP(flagSet, &Opt.ChunkSize, "vfs-read-chunk-size", "", "Read the source objects in chunks.")
flags.FVarP(flagSet, &Opt.ChunkSizeLimit, "vfs-read-chunk-size-limit", "", "If greater than --vfs-read-chunk-size, double the chunk size after each chunk read, until the limit is reached. 'off' is unlimited.")
flags.FVarP(flagSet, DirPerms, "dir-perms", "", "Directory permissions")
Expand Down
Loading

0 comments on commit 6826c23

Please sign in to comment.