Skip to content

Commit

Permalink
fix: do not exit go-routine upon ctx.Done() on receiver side (#1679)
Browse files Browse the repository at this point in the history
If there are concurrent callers sharing a context, it may
happen that while the "reqCh" gets a "send" attempt, context
might be closed and returning.
  • Loading branch information
harshavardhana authored Jul 24, 2022
1 parent a2b5454 commit 42c18ba
Showing 1 changed file with 138 additions and 149 deletions.
287 changes: 138 additions & 149 deletions api-get-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
// Detect if snowball is server location we are talking to.
var snowball bool
if location, ok := c.bucketLocCache.Get(bucketName); ok {
if location == "snowball" {
snowball = true
}
snowball = location == "snowball"
}

var (
Expand All @@ -66,163 +64,45 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
go func() {
defer close(reqCh)
defer close(resCh)
defer func() {
// Close the http response body before returning.
// This ends the connection with the server.
if httpReader != nil {
httpReader.Close()
}
}()
defer cancel()

// Used to verify if etag of object has changed since last read.
var etag string

// Loop through the incoming control messages and read data.
for {
select {
// When context is closed exit our routine.
case <-gctx.Done():
// Close the http response body before returning.
// This ends the connection with the server.
if httpReader != nil {
httpReader.Close()
}
return

// Gather incoming request.
case req, ok := <-reqCh:
if !ok {
return
}
// If this is the first request we may not need to do a getObject request yet.
if req.isFirstReq {
// First request is a Read/ReadAt.
if req.isReadOp {
// Differentiate between wanting the whole object and just a range.
if req.isReadAt {
// If this is a ReadAt request only get the specified range.
// Range is set with respect to the offset and length of the buffer requested.
// Do not set objectInfo from the first readAt request because it will not get
// the whole object.
opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
} else if req.Offset > 0 {
opts.SetRange(req.Offset, 0)
}
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{Error: err}
return
}
etag = objectInfo.ETag
// Read at least firstReq.Buffer bytes, if not we have
// reached our EOF.
size, err := readFull(httpReader, req.Buffer)
totalRead += size
if size > 0 && err == io.ErrUnexpectedEOF {
if int64(size) < objectInfo.Size {
// In situations when returned size
// is less than the expected content
// length set by the server, make sure
// we return io.ErrUnexpectedEOF
err = io.ErrUnexpectedEOF
} else {
// If an EOF happens after reading some but not
// all the bytes ReadFull returns ErrUnexpectedEOF
err = io.EOF
}
} else if size == 0 && err == io.EOF && objectInfo.Size > 0 {
// Special cases when server writes more data
// than the content-length, net/http response
// body returns an error, instead of converting
// it to io.EOF - return unexpected EOF.
err = io.ErrUnexpectedEOF
}
// Send back the first response.
resCh <- getResponse{
objectInfo: objectInfo,
Size: size,
Error: err,
didRead: true,
}
} else {
// First request is a Stat or Seek call.
// Only need to run a StatObject until an actual Read or ReadAt request comes through.

// Remove range header if already set, for stat Operations to get original file size.
delete(opts.headers, "Range")
objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
if err != nil {
resCh <- getResponse{
Error: err,
}
// Exit the go-routine.
return
}
etag = objectInfo.ETag
// Send back the first response.
resCh <- getResponse{
objectInfo: objectInfo,
}
for req := range reqCh {
// If this is the first request we may not need to do a getObject request yet.
if req.isFirstReq {
// First request is a Read/ReadAt.
if req.isReadOp {
// Differentiate between wanting the whole object and just a range.
if req.isReadAt {
// If this is a ReadAt request only get the specified range.
// Range is set with respect to the offset and length of the buffer requested.
// Do not set objectInfo from the first readAt request because it will not get
// the whole object.
opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
} else if req.Offset > 0 {
opts.SetRange(req.Offset, 0)
}
} else if req.settingObjectInfo { // Request is just to get objectInfo.
// Remove range header if already set, for stat Operations to get original file size.
delete(opts.headers, "Range")
// Check whether this is snowball
// if yes do not use If-Match feature
// it doesn't work.
if etag != "" && !snowball {
opts.SetMatchETag(etag)
}
objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{
Error: err,
}
// Exit the goroutine.
resCh <- getResponse{Error: err}
return
}
// Send back the objectInfo.
resCh <- getResponse{
objectInfo: objectInfo,
}
} else {
// Offset changes fetch the new object at an Offset.
// Because the httpReader may not be set by the first
// request if it was a stat or seek it must be checked
// if the object has been read or not to only initialize
// new ones when they haven't been already.
// All readAt requests are new requests.
if req.DidOffsetChange || !req.beenRead {
// Check whether this is snowball
// if yes do not use If-Match feature
// it doesn't work.
if etag != "" && !snowball {
opts.SetMatchETag(etag)
}
if httpReader != nil {
// Close previously opened http reader.
httpReader.Close()
}
// If this request is a readAt only get the specified range.
if req.isReadAt {
// Range is set with respect to the offset and length of the buffer requested.
opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
} else if req.Offset > 0 { // Range is set with respect to the offset.
opts.SetRange(req.Offset, 0)
} else {
// Remove range header if already set
delete(opts.headers, "Range")
}
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{
Error: err,
}
return
}
totalRead = 0
}

// Read at least req.Buffer bytes, if not we have
etag = objectInfo.ETag
// Read at least firstReq.Buffer bytes, if not we have
// reached our EOF.
size, err := readFull(httpReader, req.Buffer)
totalRead += size
if size > 0 && err == io.ErrUnexpectedEOF {
if int64(totalRead) < objectInfo.Size {
if int64(size) < objectInfo.Size {
// In situations when returned size
// is less than the expected content
// length set by the server, make sure
Expand All @@ -240,15 +120,123 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
// it to io.EOF - return unexpected EOF.
err = io.ErrUnexpectedEOF
}

// Reply back how much was read.
// Send back the first response.
resCh <- getResponse{
objectInfo: objectInfo,
Size: size,
Error: err,
didRead: true,
}
} else {
// First request is a Stat or Seek call.
// Only need to run a StatObject until an actual Read or ReadAt request comes through.

// Remove range header if already set, for stat Operations to get original file size.
delete(opts.headers, "Range")
objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
if err != nil {
resCh <- getResponse{
Error: err,
}
// Exit the go-routine.
return
}
etag = objectInfo.ETag
// Send back the first response.
resCh <- getResponse{
objectInfo: objectInfo,
}
}
} else if req.settingObjectInfo { // Request is just to get objectInfo.
// Remove range header if already set, for stat Operations to get original file size.
delete(opts.headers, "Range")
// Check whether this is snowball
// if yes do not use If-Match feature
// it doesn't work.
if etag != "" && !snowball {
opts.SetMatchETag(etag)
}
objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
if err != nil {
resCh <- getResponse{
Error: err,
}
// Exit the goroutine.
return
}
// Send back the objectInfo.
resCh <- getResponse{
objectInfo: objectInfo,
}
} else {
// Offset changes fetch the new object at an Offset.
// Because the httpReader may not be set by the first
// request if it was a stat or seek it must be checked
// if the object has been read or not to only initialize
// new ones when they haven't been already.
// All readAt requests are new requests.
if req.DidOffsetChange || !req.beenRead {
// Check whether this is snowball
// if yes do not use If-Match feature
// it doesn't work.
if etag != "" && !snowball {
opts.SetMatchETag(etag)
}
if httpReader != nil {
// Close previously opened http reader.
httpReader.Close()
}
// If this request is a readAt only get the specified range.
if req.isReadAt {
// Range is set with respect to the offset and length of the buffer requested.
opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
} else if req.Offset > 0 { // Range is set with respect to the offset.
opts.SetRange(req.Offset, 0)
} else {
// Remove range header if already set
delete(opts.headers, "Range")
}
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{
Error: err,
}
return
}
totalRead = 0
}

// Read at least req.Buffer bytes, if not we have
// reached our EOF.
size, err := readFull(httpReader, req.Buffer)
totalRead += size
if size > 0 && err == io.ErrUnexpectedEOF {
if int64(totalRead) < objectInfo.Size {
// In situations when returned size
// is less than the expected content
// length set by the server, make sure
// we return io.ErrUnexpectedEOF
err = io.ErrUnexpectedEOF
} else {
// If an EOF happens after reading some but not
// all the bytes ReadFull returns ErrUnexpectedEOF
err = io.EOF
}
} else if size == 0 && err == io.EOF && objectInfo.Size > 0 {
// Special cases when server writes more data
// than the content-length, net/http response
// body returns an error, instead of converting
// it to io.EOF - return unexpected EOF.
err = io.ErrUnexpectedEOF
}

// Reply back how much was read.
resCh <- getResponse{
Size: size,
Error: err,
didRead: true,
objectInfo: objectInfo,
}
}
}
}()
Expand Down Expand Up @@ -615,6 +603,7 @@ func (o *Object) Close() (err error) {
if o == nil {
return errInvalidArgument("Object is nil")
}

// Locking.
o.mutex.Lock()
defer o.mutex.Unlock()
Expand Down

0 comments on commit 42c18ba

Please sign in to comment.