This repository was archived by the owner on Jun 27, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathdagbuilder.go
458 lines (394 loc) · 13 KB
/
dagbuilder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
package helpers
import (
"context"
"io"
"os"
dag "github.com/ipfs/go-merkledag"
ft "github.com/ipfs/go-unixfs"
pb "github.com/ipfs/go-unixfs/pb"
cid "github.com/ipfs/go-cid"
chunker "github.com/ipfs/go-ipfs-chunker"
files "github.com/ipfs/go-ipfs-cmdkit/files"
pi "github.com/ipfs/go-ipfs-posinfo"
ipld "github.com/ipfs/go-ipld-format"
)
// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv ipld.DAGService
spl chunker.Splitter
recvdErr error
rawLeaves bool
nextData []byte // the next item to return.
maxlinks int
batch *ipld.Batch
cidBuilder cid.Builder
// Filestore support variables.
// ----------------------------
// TODO: Encapsulate in `FilestoreNode` (which is basically what they are).
//
// Besides having the path this variable (if set) is used as a flag
// to indicate that Filestore should be used.
fullPath string
stat os.FileInfo
// Keeps track of the current file size added to the DAG (used in
// the balanced builder). It is assumed that the `DagBuilderHelper`
// is not reused to construct another DAG, but a new one (with a
// zero `offset`) is created.
offset uint64
}
// DagBuilderParams wraps configuration options to create a DagBuilderHelper
// from a chunker.Splitter.
type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int
// RawLeaves signifies that the importer should use raw ipld nodes as leaves
// instead of using the unixfs TRaw type
RawLeaves bool
// CID Builder to use if set
CidBuilder cid.Builder
// DAGService to write blocks to (required)
Dagserv ipld.DAGService
// NoCopy signals to the chunker that it should track fileinfo for
// filestore adds
NoCopy bool
// URL if non-empty (and NoCopy is also true) indicates that the
// file will not be stored in the datastore but instead retrieved
// from this location via the urlstore.
URL string
}
// New generates a new DagBuilderHelper from the given params and a given
// chunker.Splitter as data source.
func (dbp *DagBuilderParams) New(spl chunker.Splitter) *DagBuilderHelper {
db := &DagBuilderHelper{
dserv: dbp.Dagserv,
spl: spl,
rawLeaves: dbp.RawLeaves,
cidBuilder: dbp.CidBuilder,
maxlinks: dbp.Maxlinks,
batch: ipld.NewBatch(context.TODO(), dbp.Dagserv),
}
if fi, ok := spl.Reader().(files.FileInfo); dbp.NoCopy && ok {
db.fullPath = fi.AbsPath()
db.stat = fi.Stat()
}
if dbp.URL != "" && dbp.NoCopy {
db.fullPath = dbp.URL
}
return db
}
// prepareNext consumes the next item from the splitter and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
func (db *DagBuilderHelper) prepareNext() {
// if we already have data waiting to be consumed, we're ready
if db.nextData != nil || db.recvdErr != nil {
return
}
db.nextData, db.recvdErr = db.spl.NextBytes()
if db.recvdErr == io.EOF {
db.recvdErr = nil
}
}
// Done returns whether or not we're done consuming the incoming data.
func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
if db.recvdErr != nil {
return false
}
return db.nextData == nil
}
// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish.
func (db *DagBuilderHelper) Next() ([]byte, error) {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
if db.recvdErr != nil {
return nil, db.recvdErr
}
return d, nil
}
// GetDagServ returns the dagservice object this Helper is using
func (db *DagBuilderHelper) GetDagServ() ipld.DAGService {
return db.dserv
}
// NewUnixfsNode creates a new Unixfs node to represent a file.
func (db *DagBuilderHelper) NewUnixfsNode() *UnixfsNode {
n := &UnixfsNode{
node: new(dag.ProtoNode),
ufmt: ft.NewFSNode(ft.TFile),
}
n.SetCidBuilder(db.cidBuilder)
return n
}
// GetCidBuilder returns the internal `cid.CidBuilder` set in the builder.
func (db *DagBuilderHelper) GetCidBuilder() cid.Builder {
return db.cidBuilder
}
// NewLeaf creates a leaf node filled with data. If rawLeaves is
// defined than a raw leaf will be returned. Otherwise, if data is
// nil the type field will be TRaw (for backwards compatibility), if
// data is defined (but possibly empty) the type field will be TRaw.
func (db *DagBuilderHelper) NewLeaf(data []byte) (*UnixfsNode, error) {
if len(data) > BlockSizeLimit {
return nil, ErrSizeLimitExceeded
}
if db.rawLeaves {
if db.cidBuilder == nil {
return &UnixfsNode{
rawnode: dag.NewRawNode(data),
raw: true,
}, nil
}
rawnode, err := dag.NewRawNodeWPrefix(data, db.cidBuilder)
if err != nil {
return nil, err
}
return &UnixfsNode{
rawnode: rawnode,
raw: true,
}, nil
}
if data == nil {
return db.NewUnixfsNode(), nil
}
blk := db.newUnixfsBlock()
blk.SetData(data)
return blk, nil
}
// NewLeafNode is a variation from `NewLeaf` (see its description) that
// returns an `ipld.Node` instead.
func (db *DagBuilderHelper) NewLeafNode(data []byte) (ipld.Node, error) {
if len(data) > BlockSizeLimit {
return nil, ErrSizeLimitExceeded
}
if db.rawLeaves {
// Encapsulate the data in a raw node.
if db.cidBuilder == nil {
return dag.NewRawNode(data), nil
}
rawnode, err := dag.NewRawNodeWPrefix(data, db.cidBuilder)
if err != nil {
return nil, err
}
return rawnode, nil
}
// Encapsulate the data in UnixFS node (instead of a raw node).
fsNodeOverDag := db.NewFSNodeOverDag(ft.TFile)
fsNodeOverDag.SetFileData(data)
node, err := fsNodeOverDag.Commit()
if err != nil {
return nil, err
}
// TODO: Encapsulate this sequence of calls into a function that
// just returns the final `ipld.Node` avoiding going through
// `FSNodeOverDag`.
// TODO: Using `TFile` for backwards-compatibility, a bug in the
// balanced builder was causing the leaf nodes to be generated
// with this type instead of `TRaw`, the one that should be used
// (like the trickle builder does).
// (See https://github.com/ipfs/go-ipfs/pull/5120.)
return node, nil
}
// newUnixfsBlock creates a new Unixfs node to represent a raw data block
func (db *DagBuilderHelper) newUnixfsBlock() *UnixfsNode {
n := &UnixfsNode{
node: new(dag.ProtoNode),
ufmt: ft.NewFSNode(ft.TRaw),
}
n.SetCidBuilder(db.cidBuilder)
return n
}
// FillNodeLayer will add datanodes as children to the give node until
// at most db.indirSize nodes are added.
func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
// while we have room AND we're not done
for node.NumChildren() < db.maxlinks && !db.Done() {
child, err := db.GetNextDataNode()
if err != nil {
return err
}
if err := node.AddChild(child, db); err != nil {
return err
}
}
return nil
}
// GetNextDataNode builds a UnixFsNode with the data obtained from the
// Splitter, given the constraints (BlockSizeLimit, RawLeaves) specified
// when creating the DagBuilderHelper.
func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data, err := db.Next()
if err != nil {
return nil, err
}
if data == nil { // we're done!
return nil, nil
}
return db.NewLeaf(data)
}
// NewLeafDataNode is a variation of `GetNextDataNode` that returns
// an `ipld.Node` instead. It builds the `node` with the data obtained
// from the Splitter and returns it with the `dataSize` (that will be
// used to keep track of the DAG file size). The size of the data is
// computed here because after that it will be hidden by `NewLeafNode`
// inside a generic `ipld.Node` representation.
func (db *DagBuilderHelper) NewLeafDataNode() (node ipld.Node, dataSize uint64, err error) {
fileData, err := db.Next()
if err != nil {
return nil, 0, err
}
dataSize = uint64(len(fileData))
// Create a new leaf node containing the file chunk data.
node, err = db.NewLeafNode(fileData)
if err != nil {
return nil, 0, err
}
// Convert this leaf to a `FilestoreNode` if needed.
node = db.ProcessFileStore(node, dataSize)
return node, dataSize, nil
}
// ProcessFileStore generates, if Filestore is being used, the
// `FilestoreNode` representation of the `ipld.Node` that
// contains the file data. If Filestore is not being used just
// return the same node to continue with its addition to the DAG.
//
// The `db.offset` is updated at this point (instead of when
// `NewLeafDataNode` is called, both work in tandem but the
// offset is more related to this function).
func (db *DagBuilderHelper) ProcessFileStore(node ipld.Node, dataSize uint64) ipld.Node {
// Check if Filestore is being used.
if db.fullPath != "" {
// Check if the node is actually a raw node (needed for
// Filestore support).
if _, ok := node.(*dag.RawNode); ok {
fn := &pi.FilestoreNode{
Node: node,
PosInfo: &pi.PosInfo{
Offset: db.offset,
FullPath: db.fullPath,
Stat: db.stat,
},
}
// Update `offset` with the size of the data generated by `db.Next`.
db.offset += dataSize
return fn
}
}
// Filestore is not used, return the same `node` argument.
return node
}
// Add sends a node to the DAGService, and returns it.
func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
return nil, err
}
err = db.dserv.Add(context.TODO(), dn)
if err != nil {
return nil, err
}
return dn, nil
}
// Maxlinks returns the configured maximum number for links
// for nodes built with this helper.
func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks
}
// Close has the DAGService perform a batch Commit operation.
// It should be called at the end of the building process to make
// sure all data is persisted.
func (db *DagBuilderHelper) Close() error {
return db.batch.Commit()
}
// AddNodeAndClose adds the last `ipld.Node` from the DAG and
// closes the builder. It returns the same `node` passed as
// argument.
func (db *DagBuilderHelper) AddNodeAndClose(node ipld.Node) (ipld.Node, error) {
err := db.batch.Add(node)
if err != nil {
return nil, err
}
err = db.Close()
if err != nil {
return nil, err
}
return node, nil
}
// FSNodeOverDag encapsulates an `unixfs.FSNode` that will be stored in a
// `dag.ProtoNode`. Instead of just having a single `ipld.Node` that
// would need to be constantly (un)packed to access and modify its
// internal `FSNode` in the process of creating a UnixFS DAG, this
// structure stores an `FSNode` cache to manipulate it (add child nodes)
// directly , and only when the node has reached its final (immutable) state
// (signaled by calling `Commit()`) is it committed to a single (indivisible)
// `ipld.Node`.
//
// It is used mainly for internal (non-leaf) nodes, and for some
// representations of data leaf nodes (that don't use raw nodes or
// Filestore).
//
// It aims to replace the `UnixfsNode` structure which encapsulated too
// many possible node state combinations.
//
// TODO: Revisit the name.
type FSNodeOverDag struct {
dag *dag.ProtoNode
file *ft.FSNode
}
// NewFSNodeOverDag creates a new `dag.ProtoNode` and `ft.FSNode`
// decoupled from one onther (and will continue in that way until
// `Commit` is called), with `fsNodeType` specifying the type of
// the UnixFS layer node (either `File` or `Raw`).
func (db *DagBuilderHelper) NewFSNodeOverDag(fsNodeType pb.Data_DataType) *FSNodeOverDag {
node := new(FSNodeOverDag)
node.dag = new(dag.ProtoNode)
node.dag.SetCidBuilder(db.GetCidBuilder())
node.file = ft.NewFSNode(fsNodeType)
return node
}
// AddChild adds a `child` `ipld.Node` to both node layers. The
// `dag.ProtoNode` creates a link to the child node while the
// `ft.FSNode` stores its file size (that is, not the size of the
// node but the size of the file data that it is storing at the
// UnixFS layer). The child is also stored in the `DAGService`.
func (n *FSNodeOverDag) AddChild(child ipld.Node, fileSize uint64, db *DagBuilderHelper) error {
err := n.dag.AddNodeLink("", child)
if err != nil {
return err
}
n.file.AddBlockSize(fileSize)
return db.batch.Add(child)
}
// Commit unifies (resolves) the cache nodes into a single `ipld.Node`
// that represents them: the `ft.FSNode` is encoded inside the
// `dag.ProtoNode`.
//
// TODO: Evaluate making it read-only after committing.
func (n *FSNodeOverDag) Commit() (ipld.Node, error) {
fileData, err := n.file.GetBytes()
if err != nil {
return nil, err
}
n.dag.SetData(fileData)
return n.dag, nil
}
// NumChildren returns the number of children of the `ft.FSNode`.
func (n *FSNodeOverDag) NumChildren() int {
return n.file.NumChildren()
}
// FileSize returns the `Filesize` attribute from the underlying
// representation of the `ft.FSNode`.
func (n *FSNodeOverDag) FileSize() uint64 {
return n.file.FileSize()
}
// SetFileData stores the `fileData` in the `ft.FSNode`. It
// should be used only when `FSNodeOverDag` represents a leaf
// node (internal nodes don't carry data, just file sizes).
func (n *FSNodeOverDag) SetFileData(fileData []byte) {
n.file.SetData(fileData)
}