Skip to content

Commit c06eab8

Browse files
committed
Write blob parts directly to blobstore
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
1 parent 41aa50b commit c06eab8

File tree

9 files changed

+374
-107
lines changed

9 files changed

+374
-107
lines changed

pkg/storage/fs/ocis/blobstore/blobstore.go

+53-10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package blobstore
2020

2121
import (
2222
"bufio"
23+
"context"
24+
"fmt"
2325
"io"
2426
"os"
2527
"path/filepath"
@@ -49,8 +51,7 @@ func New(root string) (*Blobstore, error) {
4951

5052
// Upload stores some data in the blobstore under the given key
5153
func (bs *Blobstore) Upload(node *node.Node, source string) error {
52-
53-
dest := bs.path(node)
54+
dest := bs.pathFromNode(node)
5455
// ensure parent path exists
5556
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
5657
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
@@ -69,40 +70,82 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
6970

7071
f, err := os.OpenFile(dest, os.O_CREATE|os.O_WRONLY, 0700)
7172
if err != nil {
72-
return errors.Wrapf(err, "could not open blob '%s' for writing", bs.path(node))
73+
return errors.Wrapf(err, "could not open blob '%s' for writing", dest)
7374
}
7475

7576
w := bufio.NewWriter(f)
7677
_, err = w.ReadFrom(file)
7778
if err != nil {
78-
return errors.Wrapf(err, "could not write blob '%s'", bs.path(node))
79+
return errors.Wrapf(err, "could not write blob '%s'", dest)
80+
}
81+
82+
return w.Flush()
83+
}
84+
85+
func (bs *Blobstore) StreamingUpload(ctx context.Context, spaceid, blobid string, offset, objectSize int64, reader io.Reader, userMetadata map[string]string) error {
86+
dest := bs.path(spaceid, blobid)
87+
if offset > 0 {
88+
// write a part
89+
dest = fmt.Sprintf("%s:%d-%d", dest, offset, offset+objectSize)
90+
}
91+
92+
// ensure parent path exists
93+
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
94+
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
95+
}
96+
97+
f, err := os.OpenFile(dest, os.O_CREATE|os.O_WRONLY, 0700)
98+
if err != nil {
99+
return errors.Wrapf(err, "could not open blob '%s' for stream writing", dest)
100+
}
101+
102+
w := bufio.NewWriter(f)
103+
_, err = w.ReadFrom(reader)
104+
if err != nil {
105+
return errors.Wrapf(err, "could not stream write blob '%s'", dest)
79106
}
80107

81108
return w.Flush()
82109
}
110+
func (bs *Blobstore) StreamingDownload(ctx context.Context, spaceid, blobid string, offset, objectSize int64) (io.ReadCloser, error) {
111+
dest := bs.path(spaceid, blobid)
112+
if offset > 0 {
113+
// read a part
114+
dest = fmt.Sprintf("%s:%d-%d", dest, offset, offset+objectSize)
115+
}
116+
file, err := os.Open(dest)
117+
if err != nil {
118+
return nil, errors.Wrapf(err, "could not read blob '%s'", dest)
119+
}
120+
return file, nil
121+
}
83122

84123
// Download retrieves a blob from the blobstore for reading
85124
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
86-
file, err := os.Open(bs.path(node))
125+
file, err := os.Open(bs.pathFromNode(node))
87126
if err != nil {
88-
return nil, errors.Wrapf(err, "could not read blob '%s'", bs.path(node))
127+
return nil, errors.Wrapf(err, "could not read blob '%s'", bs.pathFromNode(node))
89128
}
90129
return file, nil
91130
}
92131

93132
// Delete deletes a blob from the blobstore
94133
func (bs *Blobstore) Delete(node *node.Node) error {
95-
if err := utils.RemoveItem(bs.path(node)); err != nil {
96-
return errors.Wrapf(err, "could not delete blob '%s'", bs.path(node))
134+
if err := utils.RemoveItem(bs.pathFromNode(node)); err != nil {
135+
return errors.Wrapf(err, "could not delete blob '%s'", bs.pathFromNode(node))
97136
}
98137
return nil
99138
}
100139

101-
func (bs *Blobstore) path(node *node.Node) string {
140+
func (bs *Blobstore) pathFromNode(node *node.Node) string {
141+
return bs.path(node.SpaceID, node.BlobID)
142+
}
143+
144+
func (bs *Blobstore) path(spaceid, blobid string) string {
102145
return filepath.Join(
103146
bs.root,
104147
filepath.Clean(filepath.Join(
105-
"/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)),
148+
"/", "spaces", lookup.Pathify(spaceid, 1, 2), "blobs", lookup.Pathify(blobid, 4, 2)),
106149
),
107150
)
108151
}

pkg/storage/fs/s3ng/blobstore/blobstore.go

+36-9
Original file line numberDiff line numberDiff line change
@@ -71,24 +71,47 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
7171
}
7272
defer reader.Close()
7373

74-
_, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ContentType: "application/octet-stream", SendContentMd5: true})
74+
_, err = bs.client.PutObject(context.Background(), bs.bucket, bs.pathFromNode(node), reader, node.Blobsize, minio.PutObjectOptions{ContentType: "application/octet-stream", SendContentMd5: true})
7575

7676
if err != nil {
77-
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket)
77+
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.pathFromNode(node), bs.bucket)
7878
}
7979
return nil
8080
}
8181

82+
func (bs *Blobstore) StreamingUpload(ctx context.Context, spaceid, blobid string, offset, objectSize int64, reader io.Reader, userMetadata map[string]string) error {
83+
key := bs.path(spaceid, blobid)
84+
if offset > 0 {
85+
// write a part
86+
key = fmt.Sprintf("%s:%d-%d", key, offset, offset+objectSize)
87+
}
88+
_, err := bs.client.PutObject(ctx, bs.bucket, key, reader, objectSize, minio.PutObjectOptions{ContentType: "application/octet-stream", SendContentMd5: true, UserMetadata: userMetadata})
89+
90+
if err != nil {
91+
return errors.Wrapf(err, "could not stream object '%s' into bucket '%s'", key, bs.bucket)
92+
}
93+
return nil
94+
}
95+
96+
func (bs *Blobstore) StreamingDownload(ctx context.Context, spaceid, blobid string, offset, objectSize int64) (io.ReadCloser, error) {
97+
key := bs.path(spaceid, blobid)
98+
if offset > 0 {
99+
// read a part
100+
key = fmt.Sprintf("%s:%d-%d", key, offset, offset+objectSize)
101+
}
102+
return bs.client.GetObject(ctx, bs.bucket, key, minio.GetObjectOptions{})
103+
}
104+
82105
// Download retrieves a blob from the blobstore for reading
83106
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
84-
reader, err := bs.client.GetObject(context.Background(), bs.bucket, bs.path(node), minio.GetObjectOptions{})
107+
reader, err := bs.client.GetObject(context.Background(), bs.bucket, bs.pathFromNode(node), minio.GetObjectOptions{})
85108
if err != nil {
86-
return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", bs.path(node), bs.bucket)
109+
return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", bs.pathFromNode(node), bs.bucket)
87110
}
88111

89112
stat, err := reader.Stat()
90113
if err != nil {
91-
return nil, errors.Wrapf(err, "blob path: %s", bs.path(node))
114+
return nil, errors.Wrapf(err, "blob path: %s", bs.pathFromNode(node))
92115
}
93116

94117
if node.Blobsize != stat.Size {
@@ -100,14 +123,18 @@ func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
100123

101124
// Delete deletes a blob from the blobstore
102125
func (bs *Blobstore) Delete(node *node.Node) error {
103-
err := bs.client.RemoveObject(context.Background(), bs.bucket, bs.path(node), minio.RemoveObjectOptions{})
126+
err := bs.client.RemoveObject(context.Background(), bs.bucket, bs.pathFromNode(node), minio.RemoveObjectOptions{})
104127
if err != nil {
105-
return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", bs.path(node), bs.bucket)
128+
return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", bs.pathFromNode(node), bs.bucket)
106129
}
107130
return nil
108131
}
109132

110-
func (bs *Blobstore) path(node *node.Node) string {
133+
func (bs *Blobstore) pathFromNode(node *node.Node) string {
134+
return bs.path(node.SpaceID, node.BlobID)
135+
}
136+
137+
func (bs *Blobstore) path(spaceid, blobid string) string {
111138
// https://aws.amazon.com/de/premiumsupport/knowledge-center/s3-prefix-nested-folders-difference/
112139
// Prefixes are used to partion a bucket. A prefix is everything except the filename.
113140
// For a file `BucketName/foo/bar/lorem.ipsum`, `BucketName/foo/bar/` is the prefix.
@@ -116,5 +143,5 @@ func (bs *Blobstore) path(node *node.Node) string {
116143
//
117144
// Since the spaceID is always the same for a space, we don't need to pathify that, because it would
118145
// not yield any performance gains
119-
return filepath.Clean(filepath.Join(node.SpaceID, lookup.Pathify(node.BlobID, 4, 2)))
146+
return filepath.Clean(filepath.Join(spaceid, lookup.Pathify(blobid, 4, 2)))
120147
}

pkg/storage/utils/decomposedfs/decomposedfs.go

+36-3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
4949
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
5050
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
51+
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
5152
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator"
5253
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
5354
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
@@ -100,6 +101,9 @@ type Tree interface {
100101
ReadBlob(node *node.Node) (io.ReadCloser, error)
101102
DeleteBlob(node *node.Node) error
102103

104+
StreamBlob(ctx context.Context, spaceid, blobid string, offset, objectSize int64, reader io.Reader, userMetadata map[string]string) error
105+
BlobReader(ctx context.Context, spaceid, blobid string, offset, objectSize int64) (io.ReadCloser, error)
106+
103107
Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error)
104108
}
105109

@@ -1027,11 +1031,40 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (
10271031
if currentEtag != expectedEtag {
10281032
return nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag))
10291033
}
1030-
reader, err := fs.tp.ReadBlob(n)
1034+
1035+
val, err := n.XattrString(ctx, prefixes.BlobOffsetsAttr)
1036+
var blobOffsets []int64
10311037
if err != nil {
1032-
return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'")
1038+
// not set, single blob
1039+
blobOffsets = []int64{0}
1040+
} else {
1041+
offsetStrings := strings.Split(val, ",")
1042+
blobOffsets = make([]int64, 0, len(offsetStrings))
1043+
for _, s := range offsetStrings {
1044+
offset, err := strconv.ParseInt(s, 10, 64)
1045+
if err != nil {
1046+
return nil, errors.Wrap(err, "Decomposedfs: malformed part offset '"+s+"' in node '"+n.ID+"'")
1047+
}
1048+
blobOffsets = append(blobOffsets, offset)
1049+
}
1050+
}
1051+
1052+
blobparts := len(blobOffsets)
1053+
readers := make([]io.Reader, 0, blobparts)
1054+
for i, offset := range blobOffsets {
1055+
var partSize int64
1056+
if i < blobparts && blobparts > 1 {
1057+
partSize = upload.OptimalPartSize
1058+
} else {
1059+
partSize = n.Blobsize - offset
1060+
}
1061+
blobReader, err := fs.tp.BlobReader(ctx, n.SpaceID, n.BlobID, offset, partSize)
1062+
if err != nil {
1063+
return nil, err
1064+
}
1065+
readers = append(readers, upload.CloseOnEOFReader(blobReader))
10331066
}
1034-
return reader, nil
1067+
return io.NopCloser(io.MultiReader(readers...)), nil
10351068
}
10361069

10371070
// GetLock returns an existing lock on the given reference

pkg/storage/utils/decomposedfs/metadata/prefixes/prefixes.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ const (
3636
// updated when the file is renamed or moved
3737
NameAttr string = OcisPrefix + "name"
3838

39-
BlobIDAttr string = OcisPrefix + "blobid"
40-
BlobsizeAttr string = OcisPrefix + "blobsize"
39+
BlobIDAttr string = OcisPrefix + "blobid"
40+
BlobsizeAttr string = OcisPrefix + "blobsize"
41+
BlobOffsetsAttr string = OcisPrefix + "bloboffsets"
4142

4243
// statusPrefix is the prefix for the node status
4344
StatusPrefix string = OcisPrefix + "nodestatus"

pkg/storage/utils/decomposedfs/revisions.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
247247
attributeName == prefixes.TypeAttr ||
248248
attributeName == prefixes.BlobIDAttr ||
249249
attributeName == prefixes.BlobsizeAttr ||
250+
attributeName == prefixes.BlobOffsetsAttr ||
250251
attributeName == prefixes.MTimeAttr // FIXME somewhere I mix up the revision time and the mtime, causing the restore to overwrite the other existing revisien
251252
}, f, true)
252253
if err != nil {
@@ -270,7 +271,8 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
270271
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
271272
attributeName == prefixes.TypeAttr ||
272273
attributeName == prefixes.BlobIDAttr ||
273-
attributeName == prefixes.BlobsizeAttr
274+
attributeName == prefixes.BlobsizeAttr ||
275+
attributeName == prefixes.BlobOffsetsAttr
274276
}, false)
275277
if err != nil {
276278
return errtypes.InternalError("failed to copy blob xattrs to old revision to node: " + err.Error())

pkg/storage/utils/decomposedfs/tree/tree.go

+11
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type Blobstore interface {
6262
Upload(node *node.Node, source string) error
6363
Download(node *node.Node) (io.ReadCloser, error)
6464
Delete(node *node.Node) error
65+
66+
StreamingUpload(ctx context.Context, spaceid, blobid string, offset, objectSize int64, reader io.Reader, userMetadata map[string]string) error
67+
StreamingDownload(ctx context.Context, spaceid, blobid string, offset, objectSize int64) (io.ReadCloser, error)
6568
}
6669

6770
// Tree manages a hierarchical tree
@@ -712,6 +715,14 @@ func (t *Tree) WriteBlob(node *node.Node, source string) error {
712715
return t.blobstore.Upload(node, source)
713716
}
714717

718+
// StreamBlob streams a blob to the blobstore
719+
func (t *Tree) StreamBlob(ctx context.Context, spaceid, blobid string, offset, objectSize int64, reader io.Reader, userMetadata map[string]string) error {
720+
return t.blobstore.StreamingUpload(ctx, spaceid, blobid, offset, objectSize, reader, userMetadata)
721+
}
722+
func (t *Tree) BlobReader(ctx context.Context, spaceid, blobid string, offset, objectSize int64) (io.ReadCloser, error) {
723+
return t.blobstore.StreamingDownload(ctx, spaceid, blobid, offset, objectSize)
724+
}
725+
715726
// ReadBlob reads a blob from the blobstore
716727
func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) {
717728
if node.BlobID == "" {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package upload
2+
3+
import (
4+
"bytes"
5+
"io"
6+
)
7+
8+
// partProducer converts a stream of bytes from the reader into a stream memory of buffers
9+
type partProducer struct {
10+
parts chan<- *bytes.Buffer
11+
done chan struct{}
12+
err error
13+
r io.Reader
14+
}
15+
16+
func (spp *partProducer) produce(partSize int64) {
17+
for {
18+
file, err := spp.nextPart(partSize)
19+
if err != nil {
20+
spp.err = err
21+
close(spp.parts)
22+
return
23+
}
24+
if file == nil {
25+
close(spp.parts)
26+
return
27+
}
28+
select {
29+
case spp.parts <- file:
30+
case <-spp.done:
31+
close(spp.parts)
32+
return
33+
}
34+
}
35+
}
36+
37+
func (spp *partProducer) nextPart(size int64) (*bytes.Buffer, error) {
38+
buf := new(bytes.Buffer)
39+
40+
limitedReader := io.LimitReader(spp.r, size)
41+
n, err := buf.ReadFrom(limitedReader)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
// If the entire request body is read and no more data is available,
47+
// buf.ReadFrom returns 0 since it is unable to read any bytes. In that
48+
// case, we can close the partProducer.
49+
if n == 0 {
50+
return nil, nil
51+
}
52+
53+
return buf, nil
54+
}

0 commit comments

Comments
 (0)