Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: EmbedMessage WrapV1 option & Reader#ReadEmbeddedMessage #322

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions v2/block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ func TestMaxHeaderLength(t *testing.T) {
require.EqualError(t, err, "invalid header data, length of read beyond allowable maximum")
}

func TestCanReadMessagingCar(t *testing.T) {
car, err := carv2.NewBlockReader(requireReaderFromPath(t, "testdata/messaging.car"))
require.NoError(t, err)
readBlock, err := car.Next()
require.NoError(t, err)
require.Equal(t, "random meaningless bytes", string(readBlock.RawData()))
}

func requireReaderFromPath(t *testing.T, path string) io.Reader {
f, err := os.Open(path)
require.NoError(t, err)
Expand Down
22 changes: 21 additions & 1 deletion v2/car.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ type (
}
)

// fullyIndexedCharPos is the position of Characteristics.Hi bit that specifies whether the index is a catalog af all CIDs or not.
// fullyIndexedCharPos is the position of Characteristics.Hi bit that specifies whether the index is
// a catalog af all CIDs or not.
const fullyIndexedCharPos = 7 // left-most bit
// messageAfterHeaderCharPos is the position of Characteristics.Hi bit that specifies whether there
// is a length-prefixed dag-cbor message object directly after the header.
const messageAfterHeaderCharPos = 6 // second-to-left-most bit

// WriteTo writes this characteristics to the given w.
func (c Characteristics) WriteTo(w io.Writer) (n int64, err error) {
Expand Down Expand Up @@ -83,6 +87,22 @@ func (c *Characteristics) SetFullyIndexed(b bool) {
}
}

// IsMessageAfterHeader specifies whether there is a length-prefixed dag-cbor message embedded
// directly after the CARv2 header that can optionally be decoded.
func (c *Characteristics) IsMessageAfterHeader() bool {
return isBitSet(c.Hi, messageAfterHeaderCharPos)
}

// SetMessageAfterHeader sets whether there is a length-prefixed dag-cbor message embedded directly
// after the CARv2 header.
func (c *Characteristics) SetMessageAfterHeader(b bool) {
if b {
c.Hi = setBit(c.Hi, messageAfterHeaderCharPos)
} else {
c.Hi = unsetBit(c.Hi, messageAfterHeaderCharPos)
}
}

func setBit(n uint64, pos uint) uint64 {
n |= 1 << pos
return n
Expand Down
104 changes: 65 additions & 39 deletions v2/car_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,43 +205,69 @@ func TestNewHeaderHasExpectedValues(t *testing.T) {
assert.Equal(t, want, got, "NewHeader got = %v, want = %v", got, want)
}

func TestCharacteristics_StoreIdentityCIDs(t *testing.T) {
subject := carv2.Characteristics{}
require.False(t, subject.IsFullyIndexed())

subject.SetFullyIndexed(true)
require.True(t, subject.IsFullyIndexed())

var buf bytes.Buffer
written, err := subject.WriteTo(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), written)
require.Equal(t, []byte{
0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
}, buf.Bytes())

var decodedSubject carv2.Characteristics
read, err := decodedSubject.ReadFrom(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), read)
require.True(t, decodedSubject.IsFullyIndexed())

buf.Reset()
subject.SetFullyIndexed(false)
require.False(t, subject.IsFullyIndexed())

written, err = subject.WriteTo(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), written)
require.Equal(t, []byte{
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
}, buf.Bytes())

var decodedSubjectAgain carv2.Characteristics
read, err = decodedSubjectAgain.ReadFrom(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), read)
require.False(t, decodedSubjectAgain.IsFullyIndexed())
func TestCharacteristics(t *testing.T) {
tests := []struct {
name string
isset func(carv2.Characteristics) bool
set func(*carv2.Characteristics, bool)
expectBytes []byte
}{
{
"FullyIndexed",
func(c carv2.Characteristics) bool { return c.IsFullyIndexed() },
func(c *carv2.Characteristics, s bool) { c.SetFullyIndexed(s) },
[]byte{
0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
},
},
{
"MessageAfterHeader",
func(c carv2.Characteristics) bool { return c.IsMessageAfterHeader() },
func(c *carv2.Characteristics, s bool) { c.SetMessageAfterHeader(s) },
[]byte{
0x40, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
subject := carv2.Characteristics{}
require.False(t, tt.isset(subject))
tt.set(&subject, true)
require.True(t, tt.isset(subject))

var buf bytes.Buffer
written, err := subject.WriteTo(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), written)
require.Equal(t, tt.expectBytes, buf.Bytes())

var decodedSubject carv2.Characteristics
read, err := decodedSubject.ReadFrom(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), read)
require.True(t, tt.isset(decodedSubject))

buf.Reset()
tt.set(&subject, false)
require.False(t, tt.isset(subject))

written, err = subject.WriteTo(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), written)
require.Equal(t, []byte{
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
}, buf.Bytes())

var decodedSubjectAgain carv2.Characteristics
read, err = decodedSubjectAgain.ReadFrom(&buf)
require.NoError(t, err)
require.Equal(t, int64(16), read)
require.False(t, tt.isset(decodedSubjectAgain))
})
}
}
12 changes: 12 additions & 0 deletions v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"math"

"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/multiformats/go-multicodec"

Expand Down Expand Up @@ -63,6 +64,8 @@ type Options struct {

MaxAllowedHeaderSize uint64
MaxAllowedSectionSize uint64

EmbeddedMessage datamodel.Node
}

// ApplyOptions applies given opts and returns the resulting Options.
Expand Down Expand Up @@ -172,3 +175,12 @@ func MaxAllowedSectionSize(max uint64) Option {
o.MaxAllowedSectionSize = max
}
}

// EmbedMessage writes a length-prefixed dag-cbor message after the CARv2 header
// and sets the 'MessageAfterHeader' characteristic bit is set for the resulting
// CAR
func EmbedMessage(msg datamodel.Node) Option {
return func(o *Options) {
o.EmbeddedMessage = msg
}
}
23 changes: 23 additions & 0 deletions v2/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/ipld/go-car/v2/internal/carv1/util"
internalio "github.com/ipld/go-car/v2/internal/io"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
Expand Down Expand Up @@ -349,6 +352,26 @@ func (r *Reader) Inspect(validateBlockHash bool) (Stats, error) {
return stats, nil
}

// ReadEmbeddedMessage reads a length-prefixed dag-cbor message embedded after the CARv2 header
// if the 'MessageAfterHeader' characteristic bit is set for this CAR and the message exists.
func (r *Reader) ReadEmbeddedMessage() (datamodel.Node, error) {
if !r.Header.Characteristics.IsMessageAfterHeader() {
return nil, errors.New("'MessageAfterHeader' bit is not set in the characteristics for this CAR")
}

msgStart := int64(PragmaSize + HeaderSize)
gapLen := int64(r.Header.DataOffset) - msgStart
if gapLen <= 0 {
return nil, errors.New("invalid MessageAfterHeader, no space after header")
}
msgReader := io.NewSectionReader(r.r, msgStart, gapLen)
byts, err := util.LdRead(msgReader, false, r.opts.MaxAllowedSectionSize)
if err != nil {
return nil, err
}
return ipld.Decode(byts, dagcbor.Decode)
}

// Close closes the underlying reader if it was opened by OpenReader.
func (r *Reader) Close() error {
if r.closer != nil {
Expand Down
56 changes: 56 additions & 0 deletions v2/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-car/v2/internal/carv1"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -424,6 +426,31 @@ func TestInspect(t *testing.T) {
MinCidLength: 25,
},
},
// CARv2 with embedded message
{
name: "CarWithMessage",
path: "testdata/messaging.car",
zerLenAsEOF: true,
expectedStats: carv2.Stats{
Version: 2,
Header: carv2.Header{
Characteristics: carv2.Characteristics{Hi: 64},
DataOffset: 158,
DataSize: 120,
},
Roots: []cid.Cid{mustCidDecode("bafkreihwkf6mtnjobdqrkiksr7qhp6tiiqywux64aylunbvmfhzeql2coa")},
RootsPresent: true,
AvgBlockLength: 24,
MinBlockLength: 24,
MaxBlockLength: 24,
AvgCidLength: 36,
MinCidLength: 36,
MaxCidLength: 36,
BlockCount: 1,
CodecCounts: map[multicodec.Code]uint64{multicodec.Raw: 1},
MhTypeCounts: map[multicodec.Code]uint64{multicodec.Sha2_256: 1},
},
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -560,3 +587,32 @@ func mustCidDecode(s string) cid.Cid {
}
return c
}

func TestEmbeddedMessage(t *testing.T) {
t.Run("has", func(t *testing.T) {
r, err := carv2.OpenReader("testdata/messaging.car")
require.NoError(t, err)
msg, err := r.ReadEmbeddedMessage()
require.NoError(t, err)
enc, err := ipld.Encode(msg, dagjson.Encode)
require.NoError(t, err)
require.Equal(t,
`{"expectedRoot":{"/":"bafkreihwkf6mtnjobdqrkiksr7qhp6tiiqywux64aylunbvmfhzeql2coa"},"sneaky":"sending a message outside of CARv1 payload"}`,
string(enc),
)
})
t.Run("hasnot-v1", func(t *testing.T) {
r, err := carv2.OpenReader("testdata/sample-v1.car")
require.NoError(t, err)
_, err = r.ReadEmbeddedMessage()
require.NotNil(t, err)
require.Equal(t, err.Error(), "'MessageAfterHeader' bit is not set in the characteristics for this CAR")
})
t.Run("hasnot-v2", func(t *testing.T) {
r, err := carv2.OpenReader("testdata/sample-wrapped-v2.car")
require.NoError(t, err)
_, err = r.ReadEmbeddedMessage()
require.NotNil(t, err)
require.Equal(t, err.Error(), "'MessageAfterHeader' bit is not set in the characteristics for this CAR")
})
}
Binary file added v2/testdata/messaging.car
Binary file not shown.
63 changes: 49 additions & 14 deletions v2/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-car/v2/internal/carv1"
internalio "github.com/ipld/go-car/v2/internal/io"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/multiformats/go-varint"
)

// ErrAlreadyV1 signals that the given payload is already in CARv1 format.
Expand Down Expand Up @@ -46,21 +49,27 @@ func WrapV1File(srcPath, dstPath string) error {
return nil
}

// WrapV1 takes a CARv1 file and wraps it as a CARv2 file with an index.
// WrapV1 takes a CARv1 file and wraps it as a CARv2 file with an index (unless
// the WithoutIndex option is supplied).
// The resulting CARv2 file's inner CARv1 payload is left unmodified,
// and does not use any padding before the innner CARv1 or index.
// and does not use any padding before the inner CARv1 or index.
// The EmbedMessage option may be used to insert an additional message after the
// CARv2 header.
func WrapV1(src io.ReadSeeker, dst io.Writer, opts ...Option) error {
// TODO: verify src is indeed a CARv1 to prevent misuse.
// GenerateIndex should probably be in charge of that.

o := ApplyOptions(opts...)
idx, err := index.New(o.IndexCodec)
if err != nil {
return err
}

if err := LoadIndex(idx, src, opts...); err != nil {
return err
var idx index.Index
var err error
if o.IndexCodec != index.CarIndexNone {
idx, err = index.New(o.IndexCodec)
if err != nil {
return err
}
if err := LoadIndex(idx, src, opts...); err != nil {
return err
}
}

// Use Seek to learn the size of the CARv1 before reading it.
Expand All @@ -74,18 +83,44 @@ func WrapV1(src io.ReadSeeker, dst io.Writer, opts ...Option) error {

// Similar to the writer API, write all components of a CARv2 to the
// destination file: Pragma, Header, CARv1, Index.
v2Header := NewHeader(uint64(v1Size))
if _, err := dst.Write(Pragma); err != nil {
return err
}
if _, err := v2Header.WriteTo(dst); err != nil {
return err
v2Header := NewHeader(uint64(v1Size))
if o.IndexCodec == index.CarIndexNone {
v2Header.IndexOffset = 0
}

if o.EmbeddedMessage != nil {
v2Header.Characteristics.SetMessageAfterHeader(true)
msgBytes, err := ipld.Encode(o.EmbeddedMessage, dagcbor.Encode)
if err != nil {
return err
}
lenBuf := make([]byte, 8)
lenLen := varint.PutUvarint(lenBuf, uint64(len(msgBytes)))
v2Header.DataOffset += uint64(lenLen + len(msgBytes))
if _, err := v2Header.WriteTo(dst); err != nil {
return err
}
if _, err = dst.Write(lenBuf[:lenLen]); err != nil {
return err
}
if _, err = dst.Write(msgBytes); err != nil {
return err
}
} else {
if _, err := v2Header.WriteTo(dst); err != nil {
return err
}
}
if _, err := io.Copy(dst, src); err != nil {
return err
}
if _, err := index.WriteTo(idx, dst); err != nil {
return err
if idx != nil {
if _, err := index.WriteTo(idx, dst); err != nil {
return err
}
}

return nil
Expand Down
Loading