Skip to content

Commit b7a4853

Browse files
authored
Merge pull request #5611 from ipfs/features/streaming-ls-5600
Add --stream option to `ls` command
2 parents ebc7cdc + f5ab6a3 commit b7a4853

File tree

3 files changed

+272
-111
lines changed

3 files changed

+272
-111
lines changed

core/commands/ls.go

+189-106
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package commands
22

33
import (
4-
"bytes"
54
"fmt"
65
"io"
6+
"os"
77
"text/tabwriter"
88

9-
cmds "github.com/ipfs/go-ipfs/commands"
10-
e "github.com/ipfs/go-ipfs/core/commands/e"
9+
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
1110
iface "github.com/ipfs/go-ipfs/core/coreapi/interface"
1211

1312
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
@@ -16,29 +15,36 @@ import (
1615
unixfspb "gx/ipfs/QmUnHNqhSB1JgzVCxL1Kz3yb4bdyB4q1Z9AD5AUBVmt3fZ/go-unixfs/pb"
1716
blockservice "gx/ipfs/QmVDTbzzTwnuBwNbJdhW3u7LoBQp46bezm9yp4z1RoEepM/go-blockservice"
1817
offline "gx/ipfs/QmYZwey1thDTynSrvd6qQkX24UpTka6TFhQ2v569UpoqxD/go-ipfs-exchange-offline"
18+
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
1919
merkledag "gx/ipfs/QmcGt25mrjuB2kKW2zhPbXVZNHc4yoTDQ65NA8m6auP2f1/go-merkledag"
2020
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
2121
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
2222
)
2323

24+
// LsLink contains printable data for a single ipld link in ls output
2425
type LsLink struct {
2526
Name, Hash string
2627
Size uint64
2728
Type unixfspb.Data_DataType
2829
}
2930

31+
// LsObject is an element of LsOutput
32+
// It can represent all or part of a directory
3033
type LsObject struct {
3134
Hash string
3235
Links []LsLink
3336
}
3437

38+
// LsOutput is a set of printable data for directories,
39+
// it can be complete or partial
3540
type LsOutput struct {
3641
Objects []LsObject
3742
}
3843

3944
const (
4045
lsHeadersOptionNameTime = "headers"
4146
lsResolveTypeOptionName = "resolve-type"
47+
lsStreamOptionName = "stream"
4248
)
4349

4450
var LsCmd = &cmds.Command{
@@ -60,158 +66,235 @@ The JSON output contains type information.
6066
Options: []cmdkit.Option{
6167
cmdkit.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
6268
cmdkit.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
69+
cmdkit.BoolOption(lsStreamOptionName, "s", "Enable exprimental streaming of directory entries as they are traversed."),
6370
},
64-
Run: func(req cmds.Request, res cmds.Response) {
65-
nd, err := req.InvocContext().GetNode()
71+
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
72+
nd, err := cmdenv.GetNode(env)
6673
if err != nil {
67-
res.SetError(err, cmdkit.ErrNormal)
68-
return
74+
return err
6975
}
7076

71-
api, err := req.InvocContext().GetApi()
77+
api, err := cmdenv.GetApi(env)
7278
if err != nil {
73-
res.SetError(err, cmdkit.ErrNormal)
74-
return
75-
}
76-
77-
// get options early -> exit early in case of error
78-
if _, _, err := req.Option(lsHeadersOptionNameTime).Bool(); err != nil {
79-
res.SetError(err, cmdkit.ErrNormal)
80-
return
81-
}
82-
83-
resolve, _, err := req.Option(lsResolveTypeOptionName).Bool()
84-
if err != nil {
85-
res.SetError(err, cmdkit.ErrNormal)
86-
return
79+
return err
8780
}
8881

82+
resolve, _ := req.Options[lsResolveTypeOptionName].(bool)
8983
dserv := nd.DAG
9084
if !resolve {
9185
offlineexch := offline.Exchange(nd.Blockstore)
9286
bserv := blockservice.New(nd.Blockstore, offlineexch)
9387
dserv = merkledag.NewDAGService(bserv)
9488
}
9589

96-
paths := req.Arguments()
90+
err = req.ParseBodyArgs()
91+
if err != nil {
92+
return err
93+
}
94+
95+
paths := req.Arguments
9796

9897
var dagnodes []ipld.Node
9998
for _, fpath := range paths {
10099
p, err := iface.ParsePath(fpath)
101100
if err != nil {
102-
res.SetError(err, cmdkit.ErrNormal)
103-
return
101+
return err
104102
}
105-
106-
dagnode, err := api.ResolveNode(req.Context(), p)
103+
dagnode, err := api.ResolveNode(req.Context, p)
107104
if err != nil {
108-
res.SetError(err, cmdkit.ErrNormal)
109-
return
105+
return err
110106
}
111107
dagnodes = append(dagnodes, dagnode)
112108
}
113-
114-
output := make([]LsObject, len(req.Arguments()))
115-
ng := merkledag.NewSession(req.Context(), nd.DAG)
109+
ng := merkledag.NewSession(req.Context, nd.DAG)
116110
ro := merkledag.NewReadOnlyDagService(ng)
117111

112+
stream, _ := req.Options[lsStreamOptionName].(bool)
113+
114+
if !stream {
115+
output := make([]LsObject, len(req.Arguments))
116+
117+
for i, dagnode := range dagnodes {
118+
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
119+
if err != nil && err != uio.ErrNotADir {
120+
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
121+
}
122+
123+
var links []*ipld.Link
124+
if dir == nil {
125+
links = dagnode.Links()
126+
} else {
127+
links, err = dir.Links(req.Context)
128+
if err != nil {
129+
return err
130+
}
131+
}
132+
outputLinks := make([]LsLink, len(links))
133+
for j, link := range links {
134+
lsLink, err := makeLsLink(req, dserv, resolve, link)
135+
if err != nil {
136+
return err
137+
}
138+
outputLinks[j] = *lsLink
139+
}
140+
output[i] = LsObject{
141+
Hash: paths[i],
142+
Links: outputLinks,
143+
}
144+
}
145+
146+
return cmds.EmitOnce(res, &LsOutput{output})
147+
}
148+
118149
for i, dagnode := range dagnodes {
119150
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
120151
if err != nil && err != uio.ErrNotADir {
121-
res.SetError(fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err), cmdkit.ErrNormal)
122-
return
152+
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
123153
}
124154

125-
var links []*ipld.Link
155+
var linkResults <-chan unixfs.LinkResult
126156
if dir == nil {
127-
links = dagnode.Links()
157+
linkResults = makeDagNodeLinkResults(req, dagnode)
128158
} else {
129-
links, err = dir.Links(req.Context())
130-
if err != nil {
131-
res.SetError(err, cmdkit.ErrNormal)
132-
return
133-
}
134-
}
135-
136-
output[i] = LsObject{
137-
Hash: paths[i],
138-
Links: make([]LsLink, len(links)),
159+
linkResults = dir.EnumLinksAsync(req.Context)
139160
}
140161

141-
for j, link := range links {
142-
t := unixfspb.Data_DataType(-1)
143-
144-
switch link.Cid.Type() {
145-
case cid.Raw:
146-
// No need to check with raw leaves
147-
t = unixfs.TFile
148-
case cid.DagProtobuf:
149-
linkNode, err := link.GetNode(req.Context(), dserv)
150-
if err == ipld.ErrNotFound && !resolve {
151-
// not an error
152-
linkNode = nil
153-
} else if err != nil {
154-
res.SetError(err, cmdkit.ErrNormal)
155-
return
156-
}
162+
for linkResult := range linkResults {
157163

158-
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
159-
d, err := unixfs.FSNodeFromBytes(pn.Data())
160-
if err != nil {
161-
res.SetError(err, cmdkit.ErrNormal)
162-
return
163-
}
164-
t = d.Type()
165-
}
164+
if linkResult.Err != nil {
165+
return linkResult.Err
166+
}
167+
link := linkResult.Link
168+
lsLink, err := makeLsLink(req, dserv, resolve, link)
169+
if err != nil {
170+
return err
166171
}
167-
output[i].Links[j] = LsLink{
168-
Name: link.Name,
169-
Hash: link.Cid.String(),
170-
Size: link.Size,
171-
Type: t,
172+
output := []LsObject{{
173+
Hash: paths[i],
174+
Links: []LsLink{*lsLink},
175+
}}
176+
if err = res.Emit(&LsOutput{output}); err != nil {
177+
return err
172178
}
173179
}
174180
}
181+
return nil
182+
},
183+
PostRun: cmds.PostRunMap{
184+
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
185+
req := res.Request()
186+
lastObjectHash := ""
175187

176-
res.SetOutput(&LsOutput{output})
188+
for {
189+
v, err := res.Next()
190+
if err != nil {
191+
if err == io.EOF {
192+
return nil
193+
}
194+
return err
195+
}
196+
out := v.(*LsOutput)
197+
lastObjectHash = tabularOutput(req, os.Stdout, out, lastObjectHash, false)
198+
}
199+
},
200+
},
201+
Encoders: cmds.EncoderMap{
202+
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *LsOutput) error {
203+
// when streaming over HTTP using a text encoder, we cannot render breaks
204+
// between directories because we don't know the hash of the last
205+
// directory encoder
206+
ignoreBreaks, _ := req.Options[lsStreamOptionName].(bool)
207+
tabularOutput(req, w, out, "", ignoreBreaks)
208+
return nil
209+
}),
177210
},
178-
Marshalers: cmds.MarshalerMap{
179-
cmds.Text: func(res cmds.Response) (io.Reader, error) {
211+
Type: LsOutput{},
212+
}
180213

181-
v, err := unwrapOutput(res.Output())
214+
func makeDagNodeLinkResults(req *cmds.Request, dagnode ipld.Node) <-chan unixfs.LinkResult {
215+
links := dagnode.Links()
216+
linkResults := make(chan unixfs.LinkResult, len(links))
217+
defer close(linkResults)
218+
for _, l := range links {
219+
linkResults <- unixfs.LinkResult{
220+
Link: l,
221+
Err: nil,
222+
}
223+
}
224+
return linkResults
225+
}
226+
227+
func makeLsLink(req *cmds.Request, dserv ipld.DAGService, resolve bool, link *ipld.Link) (*LsLink, error) {
228+
t := unixfspb.Data_DataType(-1)
229+
230+
switch link.Cid.Type() {
231+
case cid.Raw:
232+
// No need to check with raw leaves
233+
t = unixfs.TFile
234+
case cid.DagProtobuf:
235+
linkNode, err := link.GetNode(req.Context, dserv)
236+
if err == ipld.ErrNotFound && !resolve {
237+
// not an error
238+
linkNode = nil
239+
} else if err != nil {
240+
return nil, err
241+
}
242+
243+
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
244+
d, err := unixfs.FSNodeFromBytes(pn.Data())
182245
if err != nil {
183246
return nil, err
184247
}
248+
t = d.Type()
249+
}
250+
}
251+
return &LsLink{
252+
Name: link.Name,
253+
Hash: link.Cid.String(),
254+
Size: link.Size,
255+
Type: t,
256+
}, nil
257+
}
185258

186-
headers, _, _ := res.Request().Option(lsHeadersOptionNameTime).Bool()
187-
output, ok := v.(*LsOutput)
188-
if !ok {
189-
return nil, e.TypeErr(output, v)
190-
}
259+
func tabularOutput(req *cmds.Request, w io.Writer, out *LsOutput, lastObjectHash string, ignoreBreaks bool) string {
260+
headers, _ := req.Options[lsHeadersOptionNameTime].(bool)
261+
stream, _ := req.Options[lsStreamOptionName].(bool)
262+
// in streaming mode we can't automatically align the tabs
263+
// so we take a best guess
264+
var minTabWidth int
265+
if stream {
266+
minTabWidth = 10
267+
} else {
268+
minTabWidth = 1
269+
}
191270

192-
buf := new(bytes.Buffer)
193-
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
194-
for _, object := range output.Objects {
195-
if len(output.Objects) > 1 {
196-
fmt.Fprintf(w, "%s:\n", object.Hash)
197-
}
198-
if headers {
199-
fmt.Fprintln(w, "Hash\tSize\tName")
200-
}
201-
for _, link := range object.Links {
202-
if link.Type == unixfs.TDirectory {
203-
link.Name += "/"
204-
}
205-
fmt.Fprintf(w, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
206-
}
207-
if len(output.Objects) > 1 {
208-
fmt.Fprintln(w)
271+
multipleFolders := len(req.Arguments) > 1
272+
273+
tw := tabwriter.NewWriter(w, minTabWidth, 2, 1, ' ', 0)
274+
275+
for _, object := range out.Objects {
276+
277+
if !ignoreBreaks && object.Hash != lastObjectHash {
278+
if multipleFolders {
279+
if lastObjectHash != "" {
280+
fmt.Fprintln(tw)
209281
}
282+
fmt.Fprintf(tw, "%s:\n", object.Hash)
210283
}
211-
w.Flush()
284+
if headers {
285+
fmt.Fprintln(tw, "Hash\tSize\tName")
286+
}
287+
lastObjectHash = object.Hash
288+
}
212289

213-
return buf, nil
214-
},
215-
},
216-
Type: LsOutput{},
290+
for _, link := range object.Links {
291+
if link.Type == unixfs.TDirectory {
292+
link.Name += "/"
293+
}
294+
295+
fmt.Fprintf(tw, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
296+
}
297+
}
298+
tw.Flush()
299+
return lastObjectHash
217300
}

0 commit comments

Comments
 (0)