diff --git a/api-get-object.go b/api-get-object.go index 8706caaac..07f14581c 100644 --- a/api-get-object.go +++ b/api-get-object.go @@ -45,9 +45,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o // Detect if snowball is server location we are talking to. var snowball bool if location, ok := c.bucketLocCache.Get(bucketName); ok { - if location == "snowball" { - snowball = true - } + snowball = location == "snowball" } var ( @@ -66,163 +64,45 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o go func() { defer close(reqCh) defer close(resCh) + defer func() { + // Close the http response body before returning. + // This ends the connection with the server. + if httpReader != nil { + httpReader.Close() + } + }() defer cancel() // Used to verify if etag of object has changed since last read. var etag string - // Loop through the incoming control messages and read data. - for { - select { - // When context is closed exit our routine. - case <-gctx.Done(): - // Close the http response body before returning. - // This ends the connection with the server. - if httpReader != nil { - httpReader.Close() - } - return - - // Gather incoming request. - case req, ok := <-reqCh: - if !ok { - return - } - // If this is the first request we may not need to do a getObject request yet. - if req.isFirstReq { - // First request is a Read/ReadAt. - if req.isReadOp { - // Differentiate between wanting the whole object and just a range. - if req.isReadAt { - // If this is a ReadAt request only get the specified range. - // Range is set with respect to the offset and length of the buffer requested. - // Do not set objectInfo from the first readAt request because it will not get - // the whole object. - opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1) - } else if req.Offset > 0 { - opts.SetRange(req.Offset, 0) - } - httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts) - if err != nil { - resCh <- getResponse{Error: err} - return - } - etag = objectInfo.ETag - // Read at least firstReq.Buffer bytes, if not we have - // reached our EOF. - size, err := readFull(httpReader, req.Buffer) - totalRead += size - if size > 0 && err == io.ErrUnexpectedEOF { - if int64(size) < objectInfo.Size { - // In situations when returned size - // is less than the expected content - // length set by the server, make sure - // we return io.ErrUnexpectedEOF - err = io.ErrUnexpectedEOF - } else { - // If an EOF happens after reading some but not - // all the bytes ReadFull returns ErrUnexpectedEOF - err = io.EOF - } - } else if size == 0 && err == io.EOF && objectInfo.Size > 0 { - // Special cases when server writes more data - // than the content-length, net/http response - // body returns an error, instead of converting - // it to io.EOF - return unexpected EOF. - err = io.ErrUnexpectedEOF - } - // Send back the first response. - resCh <- getResponse{ - objectInfo: objectInfo, - Size: size, - Error: err, - didRead: true, - } - } else { - // First request is a Stat or Seek call. - // Only need to run a StatObject until an actual Read or ReadAt request comes through. - - // Remove range header if already set, for stat Operations to get original file size. - delete(opts.headers, "Range") - objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts)) - if err != nil { - resCh <- getResponse{ - Error: err, - } - // Exit the go-routine. - return - } - etag = objectInfo.ETag - // Send back the first response. - resCh <- getResponse{ - objectInfo: objectInfo, - } + for req := range reqCh { + // If this is the first request we may not need to do a getObject request yet. + if req.isFirstReq { + // First request is a Read/ReadAt. + if req.isReadOp { + // Differentiate between wanting the whole object and just a range. + if req.isReadAt { + // If this is a ReadAt request only get the specified range. + // Range is set with respect to the offset and length of the buffer requested. + // Do not set objectInfo from the first readAt request because it will not get + // the whole object. + opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1) + } else if req.Offset > 0 { + opts.SetRange(req.Offset, 0) } - } else if req.settingObjectInfo { // Request is just to get objectInfo. - // Remove range header if already set, for stat Operations to get original file size. - delete(opts.headers, "Range") - // Check whether this is snowball - // if yes do not use If-Match feature - // it doesn't work. - if etag != "" && !snowball { - opts.SetMatchETag(etag) - } - objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts)) + httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts) if err != nil { - resCh <- getResponse{ - Error: err, - } - // Exit the goroutine. + resCh <- getResponse{Error: err} return } - // Send back the objectInfo. - resCh <- getResponse{ - objectInfo: objectInfo, - } - } else { - // Offset changes fetch the new object at an Offset. - // Because the httpReader may not be set by the first - // request if it was a stat or seek it must be checked - // if the object has been read or not to only initialize - // new ones when they haven't been already. - // All readAt requests are new requests. - if req.DidOffsetChange || !req.beenRead { - // Check whether this is snowball - // if yes do not use If-Match feature - // it doesn't work. - if etag != "" && !snowball { - opts.SetMatchETag(etag) - } - if httpReader != nil { - // Close previously opened http reader. - httpReader.Close() - } - // If this request is a readAt only get the specified range. - if req.isReadAt { - // Range is set with respect to the offset and length of the buffer requested. - opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1) - } else if req.Offset > 0 { // Range is set with respect to the offset. - opts.SetRange(req.Offset, 0) - } else { - // Remove range header if already set - delete(opts.headers, "Range") - } - httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts) - if err != nil { - resCh <- getResponse{ - Error: err, - } - return - } - totalRead = 0 - } - - // Read at least req.Buffer bytes, if not we have + etag = objectInfo.ETag + // Read at least firstReq.Buffer bytes, if not we have // reached our EOF. size, err := readFull(httpReader, req.Buffer) totalRead += size if size > 0 && err == io.ErrUnexpectedEOF { - if int64(totalRead) < objectInfo.Size { + if int64(size) < objectInfo.Size { // In situations when returned size // is less than the expected content // length set by the server, make sure @@ -240,15 +120,123 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o // it to io.EOF - return unexpected EOF. err = io.ErrUnexpectedEOF } - - // Reply back how much was read. + // Send back the first response. resCh <- getResponse{ + objectInfo: objectInfo, Size: size, Error: err, didRead: true, + } + } else { + // First request is a Stat or Seek call. + // Only need to run a StatObject until an actual Read or ReadAt request comes through. + + // Remove range header if already set, for stat Operations to get original file size. + delete(opts.headers, "Range") + objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts)) + if err != nil { + resCh <- getResponse{ + Error: err, + } + // Exit the go-routine. + return + } + etag = objectInfo.ETag + // Send back the first response. + resCh <- getResponse{ objectInfo: objectInfo, } } + } else if req.settingObjectInfo { // Request is just to get objectInfo. + // Remove range header if already set, for stat Operations to get original file size. + delete(opts.headers, "Range") + // Check whether this is snowball + // if yes do not use If-Match feature + // it doesn't work. + if etag != "" && !snowball { + opts.SetMatchETag(etag) + } + objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts)) + if err != nil { + resCh <- getResponse{ + Error: err, + } + // Exit the goroutine. + return + } + // Send back the objectInfo. + resCh <- getResponse{ + objectInfo: objectInfo, + } + } else { + // Offset changes fetch the new object at an Offset. + // Because the httpReader may not be set by the first + // request if it was a stat or seek it must be checked + // if the object has been read or not to only initialize + // new ones when they haven't been already. + // All readAt requests are new requests. + if req.DidOffsetChange || !req.beenRead { + // Check whether this is snowball + // if yes do not use If-Match feature + // it doesn't work. + if etag != "" && !snowball { + opts.SetMatchETag(etag) + } + if httpReader != nil { + // Close previously opened http reader. + httpReader.Close() + } + // If this request is a readAt only get the specified range. + if req.isReadAt { + // Range is set with respect to the offset and length of the buffer requested. + opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1) + } else if req.Offset > 0 { // Range is set with respect to the offset. + opts.SetRange(req.Offset, 0) + } else { + // Remove range header if already set + delete(opts.headers, "Range") + } + httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts) + if err != nil { + resCh <- getResponse{ + Error: err, + } + return + } + totalRead = 0 + } + + // Read at least req.Buffer bytes, if not we have + // reached our EOF. + size, err := readFull(httpReader, req.Buffer) + totalRead += size + if size > 0 && err == io.ErrUnexpectedEOF { + if int64(totalRead) < objectInfo.Size { + // In situations when returned size + // is less than the expected content + // length set by the server, make sure + // we return io.ErrUnexpectedEOF + err = io.ErrUnexpectedEOF + } else { + // If an EOF happens after reading some but not + // all the bytes ReadFull returns ErrUnexpectedEOF + err = io.EOF + } + } else if size == 0 && err == io.EOF && objectInfo.Size > 0 { + // Special cases when server writes more data + // than the content-length, net/http response + // body returns an error, instead of converting + // it to io.EOF - return unexpected EOF. + err = io.ErrUnexpectedEOF + } + + // Reply back how much was read. + resCh <- getResponse{ + Size: size, + Error: err, + didRead: true, + objectInfo: objectInfo, + } } } }() @@ -615,6 +603,7 @@ func (o *Object) Close() (err error) { if o == nil { return errInvalidArgument("Object is nil") } + // Locking. o.mutex.Lock() defer o.mutex.Unlock()