Skip to content

Commit 48358e1

Browse files
authored
Merge pull request #185 from Azure/dev
Release v0.10.0
2 parents 692746c + a9397d1 commit 48358e1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+14673
-2271
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,4 @@ __pycache__/
291291
*.xsd.cs
292292

293293
vendor/
294+
*.env

.travis.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
language: go
22
go:
3-
- "1.12.1"
3+
- "1.13"
44
script:
55
- export GO111MODULE=on
66
- GOOS=linux go build ./azblob
77
- GOOS=darwin go build ./azblob
88
- GOOS=windows go build ./azblob
9+
- GOOS=solaris go build ./azblob
910
- go test -race -short -cover -v ./azblob

ChangeLog.md

+10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22

33
> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.
44
5+
## Version 0.10.0:
6+
- Added support for CopyBlobFromURL (sync) and upgrade version to 2019-02-02.
7+
- Provided default values for UploadStreamToBlockBlobOptions and refactored UploadStreamToBlockBlob.
8+
- Added support for multiple start/expiry time formats.
9+
- Added Solaris support.
10+
- Enabled recovering from a unexpectedEOF error.
11+
12+
## Version 0.9.0:
13+
- Updated go.mod to fix dependency issues.
14+
515
## Version 0.8.0:
616
- Fixed error handling in high-level function DoBatchTransfer, and made it public for easy customization
717

azblob/atomicmorph.go

-69
This file was deleted.

azblob/chunkwriting.go

+238
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package azblob
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/base64"
7+
"encoding/binary"
8+
"errors"
9+
"fmt"
10+
"io"
11+
"sync"
12+
13+
guuid "github.com/google/uuid"
14+
)
15+
16+
// blockWriter provides methods to upload blocks that represent a file to a server and commit them.
17+
// This allows us to provide a local implementation that fakes the server for hermetic testing.
18+
type blockWriter interface {
19+
StageBlock(context.Context, string, io.ReadSeeker, LeaseAccessConditions, []byte) (*BlockBlobStageBlockResponse, error)
20+
CommitBlockList(context.Context, []string, BlobHTTPHeaders, Metadata, BlobAccessConditions) (*BlockBlobCommitBlockListResponse, error)
21+
}
22+
23+
// copyFromReader copies a source io.Reader to blob storage using concurrent uploads.
24+
// TODO(someone): The existing model provides a buffer size and buffer limit as limiting factors. The buffer size is probably
25+
// useless other than needing to be above some number, as the network stack is going to hack up the buffer over some size. The
26+
// max buffers is providing a cap on how much memory we use (by multiplying it times the buffer size) and how many go routines can upload
27+
// at a time. I think having a single max memory dial would be more efficient. We can choose an internal buffer size that works
28+
// well, 4 MiB or 8 MiB, and autoscale to as many goroutines within the memory limit. This gives a single dial to tweak and we can
29+
// choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model).
30+
// We can even provide a utility to dial this number in for customer networks to optimize their copies.
31+
func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o UploadStreamToBlockBlobOptions) (*BlockBlobCommitBlockListResponse, error) {
32+
o.defaults()
33+
34+
ctx, cancel := context.WithCancel(ctx)
35+
defer cancel()
36+
37+
cp := &copier{
38+
ctx: ctx,
39+
cancel: cancel,
40+
reader: from,
41+
to: to,
42+
id: newID(),
43+
o: o,
44+
ch: make(chan copierChunk, 1),
45+
errCh: make(chan error, 1),
46+
buffers: sync.Pool{
47+
New: func() interface{} {
48+
return make([]byte, o.BufferSize)
49+
},
50+
},
51+
}
52+
53+
// Starts the pools of concurrent writers.
54+
cp.wg.Add(o.MaxBuffers)
55+
for i := 0; i < o.MaxBuffers; i++ {
56+
go cp.writer()
57+
}
58+
59+
// Send all our chunks until we get an error.
60+
var err error
61+
for {
62+
if err = cp.sendChunk(); err != nil {
63+
break
64+
}
65+
}
66+
// If the error is not EOF, then we have a problem.
67+
if err != nil && !errors.Is(err, io.EOF) {
68+
return nil, err
69+
}
70+
71+
// Close out our upload.
72+
if err := cp.close(); err != nil {
73+
return nil, err
74+
}
75+
76+
return cp.result, nil
77+
}
78+
79+
// copier streams a file via chunks in parallel from a reader representing a file.
80+
// Do not use directly, instead use copyFromReader().
81+
type copier struct {
82+
// ctx holds the context of a copier. This is normally a faux pas to store a Context in a struct. In this case,
83+
// the copier has the lifetime of a function call, so its fine.
84+
ctx context.Context
85+
cancel context.CancelFunc
86+
87+
// reader is the source to be written to storage.
88+
reader io.Reader
89+
// to is the location we are writing our chunks to.
90+
to blockWriter
91+
92+
id *id
93+
o UploadStreamToBlockBlobOptions
94+
95+
// num is the current chunk we are on.
96+
num int32
97+
// ch is used to pass the next chunk of data from our reader to one of the writers.
98+
ch chan copierChunk
99+
// errCh is used to hold the first error from our concurrent writers.
100+
errCh chan error
101+
// wg provides a count of how many writers we are waiting to finish.
102+
wg sync.WaitGroup
103+
// buffers provides a pool of chunks that can be reused.
104+
buffers sync.Pool
105+
106+
// result holds the final result from blob storage after we have submitted all chunks.
107+
result *BlockBlobCommitBlockListResponse
108+
}
109+
110+
type copierChunk struct {
111+
buffer []byte
112+
id string
113+
}
114+
115+
// getErr returns an error by priority. First, if a function set an error, it returns that error. Next, if the Context has an error
116+
// it returns that error. Otherwise it is nil. getErr supports only returning an error once per copier.
117+
func (c *copier) getErr() error {
118+
select {
119+
case err := <-c.errCh:
120+
return err
121+
default:
122+
}
123+
return c.ctx.Err()
124+
}
125+
126+
// sendChunk reads data from out internal reader, creates a chunk, and sends it to be written via a channel.
127+
// sendChunk returns io.EOF when the reader returns an io.EOF or io.ErrUnexpectedEOF.
128+
func (c *copier) sendChunk() error {
129+
if err := c.getErr(); err != nil {
130+
return err
131+
}
132+
133+
buffer := c.buffers.Get().([]byte)
134+
n, err := io.ReadFull(c.reader, buffer)
135+
switch {
136+
case err == nil && n == 0:
137+
return nil
138+
case err == nil:
139+
c.ch <- copierChunk{
140+
buffer: buffer[0:n],
141+
id: c.id.next(),
142+
}
143+
return nil
144+
case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0:
145+
return io.EOF
146+
}
147+
148+
if err == io.EOF || err == io.ErrUnexpectedEOF {
149+
c.ch <- copierChunk{
150+
buffer: buffer[0:n],
151+
id: c.id.next(),
152+
}
153+
return io.EOF
154+
}
155+
if err := c.getErr(); err != nil {
156+
return err
157+
}
158+
return err
159+
}
160+
161+
// writer writes chunks sent on a channel.
162+
func (c *copier) writer() {
163+
defer c.wg.Done()
164+
165+
for chunk := range c.ch {
166+
if err := c.write(chunk); err != nil {
167+
if !errors.Is(err, context.Canceled) {
168+
select {
169+
case c.errCh <- err:
170+
c.cancel()
171+
default:
172+
}
173+
return
174+
}
175+
}
176+
}
177+
}
178+
179+
// write uploads a chunk to blob storage.
180+
func (c *copier) write(chunk copierChunk) error {
181+
defer c.buffers.Put(chunk.buffer)
182+
183+
if err := c.ctx.Err(); err != nil {
184+
return err
185+
}
186+
187+
_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), LeaseAccessConditions{}, nil)
188+
if err != nil {
189+
return fmt.Errorf("write error: %w", err)
190+
}
191+
return nil
192+
}
193+
194+
// close commits our blocks to blob storage and closes our writer.
195+
func (c *copier) close() error {
196+
close(c.ch)
197+
c.wg.Wait()
198+
199+
if err := c.getErr(); err != nil {
200+
return err
201+
}
202+
203+
var err error
204+
c.result, err = c.to.CommitBlockList(c.ctx, c.id.issued(), c.o.BlobHTTPHeaders, c.o.Metadata, c.o.AccessConditions)
205+
return err
206+
}
207+
208+
// id allows the creation of unique IDs based on UUID4 + an int32. This autoincrements.
209+
type id struct {
210+
u [64]byte
211+
num uint32
212+
all []string
213+
}
214+
215+
// newID constructs a new id.
216+
func newID() *id {
217+
uu := guuid.New()
218+
u := [64]byte{}
219+
copy(u[:], uu[:])
220+
return &id{u: u}
221+
}
222+
223+
// next returns the next ID. This is not thread-safe.
224+
func (id *id) next() string {
225+
defer func() { id.num++ }()
226+
227+
binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), id.num)
228+
str := base64.StdEncoding.EncodeToString(id.u[:])
229+
id.all = append(id.all, str)
230+
231+
return str
232+
}
233+
234+
// issued returns all ids that have been issued. This returned value shares the internal slice so it is not safe to modify the return.
235+
// The value is only valid until the next time next() is called.
236+
func (id *id) issued() []string {
237+
return id.all
238+
}

0 commit comments

Comments
 (0)