Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/containers/image/image"
"github.com/containers/image/manifest"
"github.com/containers/image/pkg/compression"
"github.com/containers/image/signature"
"github.com/containers/image/transports"
"github.com/containers/image/types"
Expand Down Expand Up @@ -370,7 +371,7 @@ func (ic *imageCopier) copyLayer(srcInfo types.BlobInfo) (types.BlobInfo, digest
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(decompressorFunc) io.Writer // = nil
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

err := errors.New("Internal error: unexpected panic in copyLayer") // For pipeWriter.CloseWithError below
Expand All @@ -381,7 +382,7 @@ func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.Bl
pipeWriter.CloseWithError(err) // CloseWithError(nil) is equivalent to Close()
}()

getDiffIDRecorder = func(decompressor decompressorFunc) io.Writer {
getDiffIDRecorder = func(decompressor compression.DecompressorFunc) io.Writer {
// If this fails, e.g. because we have exited and due to pipeWriter.CloseWithError() above further
// reading from the pipe has failed, we don’t really care.
// We only read from diffIDChan if the rest of the flow has succeeded, and when we do read from it,
Expand All @@ -399,7 +400,7 @@ func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.Bl
}

// diffIDComputationGoroutine reads all input from layerStream, uncompresses using decompressor if necessary, and sends its digest, and status, if any, to dest.
func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadCloser, decompressor decompressorFunc) {
func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadCloser, decompressor compression.DecompressorFunc) {
result := diffIDResult{
digest: "",
err: errors.New("Internal error: unexpected panic in diffIDComputationGoroutine"),
Expand All @@ -411,7 +412,7 @@ func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadClo
}

// computeDiffID reads all input from layerStream, uncompresses it using decompressor if necessary, and returns its digest.
func computeDiffID(stream io.Reader, decompressor decompressorFunc) (digest.Digest, error) {
func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) (digest.Digest, error) {
if decompressor != nil {
s, err := decompressor(stream)
if err != nil {
Expand All @@ -428,7 +429,7 @@ func computeDiffID(stream io.Reader, decompressor decompressorFunc) (digest.Dige
// perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor decompressorFunc) io.Writer,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canCompress bool) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers.
// === Input: srcStream
Expand All @@ -446,8 +447,8 @@ func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.Blo
var destStream io.Reader = digestingReader

// === Detect compression of the input stream.
// This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by detectCompression.
decompressor, destStream, err := detectCompression(destStream) // We could skip this in some cases, but let's keep the code path uniform
// This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression.
decompressor, destStream, err := compression.DetectCompression(destStream) // We could skip this in some cases, but let's keep the code path uniform
if err != nil {
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
Expand Down
9 changes: 5 additions & 4 deletions copy/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pkg/errors"

"github.com/containers/image/pkg/compression"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestDigestingReaderRead(t *testing.T) {
}
}

func goDiffIDComputationGoroutineWithTimeout(layerStream io.ReadCloser, decompressor decompressorFunc) *diffIDResult {
func goDiffIDComputationGoroutineWithTimeout(layerStream io.ReadCloser, decompressor compression.DecompressorFunc) *diffIDResult {
ch := make(chan diffIDResult)
go diffIDComputationGoroutine(ch, layerStream, nil)
timeout := time.After(time.Second)
Expand Down Expand Up @@ -94,12 +95,12 @@ func TestDiffIDComputationGoroutine(t *testing.T) {
func TestComputeDiffID(t *testing.T) {
for _, c := range []struct {
filename string
decompressor decompressorFunc
decompressor compression.DecompressorFunc
result digest.Digest
}{
{"fixtures/Hello.uncompressed", nil, "sha256:185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969"},
{"fixtures/Hello.gz", nil, "sha256:0bd4409dcd76476a263b8f3221b4ce04eb4686dec40bfdcc2e86a7403de13609"},
{"fixtures/Hello.gz", gzipDecompressor, "sha256:185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969"},
{"fixtures/Hello.gz", compression.GzipDecompressor, "sha256:185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969"},
} {
stream, err := os.Open(c.filename)
require.NoError(t, err, c.filename)
Expand All @@ -111,7 +112,7 @@ func TestComputeDiffID(t *testing.T) {
}

// Error initializing decompression
_, err := computeDiffID(bytes.NewReader([]byte{}), gzipDecompressor)
_, err := computeDiffID(bytes.NewReader([]byte{}), compression.GzipDecompressor)
assert.Error(t, err)

// Error reading input
Expand Down
Binary file removed copy/fixtures/Hello.bz2
Binary file not shown.
1 change: 1 addition & 0 deletions copy/fixtures/Hello.bz2
Binary file removed copy/fixtures/Hello.gz
Binary file not shown.
1 change: 1 addition & 0 deletions copy/fixtures/Hello.gz
1 change: 0 additions & 1 deletion copy/fixtures/Hello.uncompressed

This file was deleted.

1 change: 1 addition & 0 deletions copy/fixtures/Hello.uncompressed
Binary file removed copy/fixtures/Hello.xz
Binary file not shown.
1 change: 1 addition & 0 deletions copy/fixtures/Hello.xz
45 changes: 44 additions & 1 deletion docker/daemon/daemon_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"

"github.com/containers/image/manifest"
"github.com/containers/image/pkg/compression"
"github.com/containers/image/types"
"github.com/docker/docker/client"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -334,6 +335,18 @@ func (s *daemonImageSource) GetTargetManifest(digest digest.Digest) ([]byte, str
return nil, "", errors.Errorf(`Manifest lists are not supported by "docker-daemon:"`)
}

type readCloseWrapper struct {
io.Reader
closeFunc func() error
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There already is a tarReadCloser in this file. Modifying that to use this readCloseWrapper would be nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two are wrapping different internal objects. I can't really see a nice way of combining the two.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS the *tar.Reader in tarReadCloser can just as well be an io.Reader; nothing depends on that being a *.tar.Reader.


func (r readCloseWrapper) Close() error {
if r.closeFunc != nil {
return r.closeFunc()
}
return nil
}

// GetBlob returns a stream for the specified blob, and the blob’s size (or -1 if unknown).
func (s *daemonImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, error) {
if err := s.ensureCachedDataIsPresent(); err != nil {
Expand All @@ -349,7 +362,37 @@ func (s *daemonImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64,
if err != nil {
return nil, 0, err
}
return stream, li.size, nil

// In order to handle the fact that digests != diffIDs (and thus that a
// caller which is trying to verify the blob will run into problems),
// we need to decompress blobs. This is a bit ugly, but it's a
// consequence of making everything addressable by their DiffID rather
// than by their digest...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“digests != diffIDs” is true in general; it would still be worth explaining explicitly that we (choose to) generate a v2s2 manifest with the available DiffID values, so GetBlob needs to return a content matching those DiffID values even if the blob has been compressed.

//
// In particular, because the v2s2 manifest being generated uses
// DiffIDs, any caller of GetBlob is going to be asking for DiffIDs of
// layers not their _actual_ digest. The result is that copy/... will
// be verifing a "digest" which is not the actual layer's digest (but
// is instead the DiffID).

decompressFunc, reader, err := compression.DetectCompression(stream)
if err != nil {
return nil, 0, errors.Wrapf(err, "Detecting compression in blob %s", info.Digest)
}

if decompressFunc != nil {
reader, err = decompressFunc(reader)
if err != nil {
return nil, 0, errors.Wrapf(err, "Decompressing blob %s stream", info.Digest)
}
}

newStream := readCloseWrapper{
Reader: reader,
closeFunc: stream.Close,
}

return newStream, li.size, nil
}

return nil, 0, errors.Errorf("Unknown blob %s", info.Digest)
Expand Down
33 changes: 19 additions & 14 deletions copy/compression.go → pkg/compression/compression.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package copy
package compression

import (
"bytes"
Expand All @@ -11,32 +11,37 @@ import (
"github.com/Sirupsen/logrus"
)

// decompressorFunc, given a compressed stream, returns the decompressed stream.
type decompressorFunc func(io.Reader) (io.Reader, error)
// DecompressorFunc returns the decompressed stream, given a compressed stream.
type DecompressorFunc func(io.Reader) (io.Reader, error)

func gzipDecompressor(r io.Reader) (io.Reader, error) {
// GzipDecompressor is a DecompressorFunc for the gzip compression algorithm.
func GzipDecompressor(r io.Reader) (io.Reader, error) {
return gzip.NewReader(r)
}
func bzip2Decompressor(r io.Reader) (io.Reader, error) {

// Bzip2Decompressor is a DecompressorFunc for the bzip2 compression algorithm.
func Bzip2Decompressor(r io.Reader) (io.Reader, error) {
return bzip2.NewReader(r), nil
}
func xzDecompressor(r io.Reader) (io.Reader, error) {

// XzDecompressor is a DecompressorFunc for the xz compression algorithm.
func XzDecompressor(r io.Reader) (io.Reader, error) {
return nil, errors.New("Decompressing xz streams is not supported")
}

// compressionAlgos is an internal implementation detail of detectCompression
// compressionAlgos is an internal implementation detail of DetectCompression
var compressionAlgos = map[string]struct {
prefix []byte
decompressor decompressorFunc
decompressor DecompressorFunc
}{
"gzip": {[]byte{0x1F, 0x8B, 0x08}, gzipDecompressor}, // gzip (RFC 1952)
"bzip2": {[]byte{0x42, 0x5A, 0x68}, bzip2Decompressor}, // bzip2 (decompress.c:BZ2_decompress)
"xz": {[]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, xzDecompressor}, // xz (/usr/share/doc/xz/xz-file-format.txt)
"gzip": {[]byte{0x1F, 0x8B, 0x08}, GzipDecompressor}, // gzip (RFC 1952)
"bzip2": {[]byte{0x42, 0x5A, 0x68}, Bzip2Decompressor}, // bzip2 (decompress.c:BZ2_decompress)
"xz": {[]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, XzDecompressor}, // xz (/usr/share/doc/xz/xz-file-format.txt)
}

// detectCompression returns a decompressorFunc if the input is recognized as a compressed format, nil otherwise.
// DetectCompression returns a DecompressorFunc if the input is recognized as a compressed format, nil otherwise.
// Because it consumes the start of input, other consumers must use the returned io.Reader instead to also read from the beginning.
func detectCompression(input io.Reader) (decompressorFunc, io.Reader, error) {
func DetectCompression(input io.Reader) (DecompressorFunc, io.Reader, error) {
buffer := [8]byte{}

n, err := io.ReadAtLeast(input, buffer[:], len(buffer))
Expand All @@ -46,7 +51,7 @@ func detectCompression(input io.Reader) (decompressorFunc, io.Reader, error) {
return nil, nil, err
}

var decompressor decompressorFunc
var decompressor DecompressorFunc
for name, algo := range compressionAlgos {
if bytes.HasPrefix(buffer[:n], algo.prefix) {
logrus.Debugf("Detected compression format %s", name)
Expand Down
12 changes: 6 additions & 6 deletions copy/compression_test.go → pkg/compression/compression_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package copy
package compression

import (
"bytes"
Expand Down Expand Up @@ -33,7 +33,7 @@ func TestDetectCompression(t *testing.T) {
require.NoError(t, err, c.filename)
defer stream.Close()

_, updatedStream, err := detectCompression(stream)
_, updatedStream, err := DetectCompression(stream)
require.NoError(t, err, c.filename)

updatedContents, err := ioutil.ReadAll(updatedStream)
Expand All @@ -47,7 +47,7 @@ func TestDetectCompression(t *testing.T) {
require.NoError(t, err, c.filename)
defer stream.Close()

decompressor, updatedStream, err := detectCompression(stream)
decompressor, updatedStream, err := DetectCompression(stream)
require.NoError(t, err, c.filename)

var uncompressedStream io.Reader
Expand All @@ -70,7 +70,7 @@ func TestDetectCompression(t *testing.T) {
}

// Empty input is handled reasonably.
decompressor, updatedStream, err := detectCompression(bytes.NewReader([]byte{}))
decompressor, updatedStream, err := DetectCompression(bytes.NewReader([]byte{}))
require.NoError(t, err)
assert.Nil(t, decompressor)
updatedContents, err := ioutil.ReadAll(updatedStream)
Expand All @@ -80,7 +80,7 @@ func TestDetectCompression(t *testing.T) {
// Error reading input
reader, writer := io.Pipe()
defer reader.Close()
writer.CloseWithError(errors.New("Expected error reading input in detectCompression"))
_, _, err = detectCompression(reader)
writer.CloseWithError(errors.New("Expected error reading input in DetectCompression"))
_, _, err = DetectCompression(reader)
assert.Error(t, err)
}
Binary file added pkg/compression/fixtures/Hello.bz2
Binary file not shown.
Binary file added pkg/compression/fixtures/Hello.gz
Binary file not shown.
1 change: 1 addition & 0 deletions pkg/compression/fixtures/Hello.uncompressed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello
Binary file added pkg/compression/fixtures/Hello.xz
Binary file not shown.