Skip to content

Commit f105ce4

Browse files
committedAug 5, 2015
get: fix bug + improvements
up until now there has been a very annoying bug with get, we would get halting behavior. I'm not 100% sure this commit fixes it, but it should. It certainly fixes others found in the process of digging into the get / tar extractor code. (wish we could repro the bug reliably enough to make a test case). This is a much cleaner tar writer. the ad-hoc, error-prone synch for the tar reader is gone (with i believe was incorrect). it is replaced with a simple pipe and bufio. The tar logic is now in tar.Writer, which writes unixfs dag nodes into a tar archive (no need for synch here). And get's reader is constructed with DagArchive which sets up the pipe + bufio. NOTE: this commit also changes this behavior of `get`: When retrieving a single file, if the file exists, get would fail. this emulated the behavior of wget by default, which (without opts) does not overwrite if the file is there. This change makes get fail if the file is available locally. This seems more intuitive to me as expected from a unix tool-- though perhaps it should be discussed more before adopting. Everything seems to work fine, and i have not been able to reproduce the get halt bug. License: MIT Signed-off-by: Juan Batiz-Benet <juan@benet.ai>
1 parent 741cf7e commit f105ce4

File tree

5 files changed

+272
-284
lines changed

5 files changed

+272
-284
lines changed
 

‎core/commands/get.go

+73-40
Original file line numberDiff line numberDiff line change
@@ -98,52 +98,85 @@ may also specify the level of compression by specifying '-l=<1-9>'.
9898
return
9999
}
100100

101-
if archive, _, _ := req.Option("archive").Bool(); archive || cmplvl != gzip.NoCompression {
102-
if archive && !strings.HasSuffix(outPath, ".tar") {
103-
outPath += ".tar"
104-
}
105-
if cmplvl != gzip.NoCompression {
106-
outPath += ".gz"
107-
}
108-
fmt.Printf("Saving archive to %s\n", outPath)
109-
110-
file, err := os.Create(outPath)
111-
if err != nil {
112-
res.SetError(err, cmds.ErrNormal)
113-
return
114-
}
115-
defer file.Close()
116-
117-
bar := pb.New(0).SetUnits(pb.U_BYTES)
118-
bar.Output = os.Stderr
119-
pbReader := bar.NewProxyReader(outReader)
120-
bar.Start()
121-
defer bar.Finish()
122-
123-
if _, err := io.Copy(file, pbReader); err != nil {
124-
res.SetError(err, cmds.ErrNormal)
125-
return
126-
}
101+
archive, _, _ := req.Option("archive").Bool()
127102

103+
gw := getWriter{
104+
Out: os.Stdout,
105+
Err: os.Stderr,
106+
Archive: archive,
107+
Compression: cmplvl,
108+
}
109+
110+
if err := gw.Write(outReader, outPath); err != nil {
111+
res.SetError(err, cmds.ErrNormal)
128112
return
129113
}
114+
},
115+
}
130116

131-
fmt.Printf("Saving file(s) to %s\n", outPath)
117+
func progressBarForReader(out io.Writer, r io.Reader) (*pb.ProgressBar, *pb.Reader) {
118+
// setup bar reader
119+
// TODO: get total length of files
120+
bar := pb.New(0).SetUnits(pb.U_BYTES)
121+
bar.Output = out
122+
barR := bar.NewProxyReader(r)
123+
return bar, barR
124+
}
132125

133-
// TODO: get total length of files
134-
bar := pb.New(0).SetUnits(pb.U_BYTES)
135-
bar.Output = os.Stderr
126+
type getWriter struct {
127+
Out io.Writer // for output to user
128+
Err io.Writer // for progress bar output
136129

137-
// wrap the reader with the progress bar proxy reader
138-
reader := bar.NewProxyReader(outReader)
130+
Archive bool
131+
Compression int
132+
}
139133

140-
bar.Start()
141-
defer bar.Finish()
142-
extractor := &tar.Extractor{outPath}
143-
if err := extractor.Extract(reader); err != nil {
144-
res.SetError(err, cmds.ErrNormal)
134+
func (gw *getWriter) Write(r io.Reader, fpath string) error {
135+
if gw.Archive || gw.Compression != gzip.NoCompression {
136+
return gw.writeArchive(r, fpath)
137+
}
138+
return gw.writeExtracted(r, fpath)
139+
}
140+
141+
func (gw *getWriter) writeArchive(r io.Reader, fpath string) error {
142+
// adjust file name if tar
143+
if gw.Archive {
144+
if !strings.HasSuffix(fpath, ".tar") && !strings.HasSuffix(fpath, ".tar.gz") {
145+
fpath += ".tar"
145146
}
146-
},
147+
}
148+
149+
// adjust file name if gz
150+
if gw.Compression != gzip.NoCompression {
151+
if !strings.HasSuffix(fpath, ".gz") {
152+
fpath += ".gz"
153+
}
154+
}
155+
156+
// create file
157+
file, err := os.Create(fpath)
158+
if err != nil {
159+
return err
160+
}
161+
defer file.Close()
162+
163+
fmt.Fprintf(gw.Out, "Saving archive to %s\n", fpath)
164+
bar, barR := progressBarForReader(gw.Err, r)
165+
bar.Start()
166+
defer bar.Finish()
167+
168+
_, err = io.Copy(file, barR)
169+
return err
170+
}
171+
172+
func (gw *getWriter) writeExtracted(r io.Reader, fpath string) error {
173+
fmt.Fprintf(gw.Out, "Saving file(s) to %s\n", fpath)
174+
bar, barR := progressBarForReader(gw.Err, r)
175+
bar.Start()
176+
defer bar.Finish()
177+
178+
extractor := &tar.Extractor{fpath}
179+
return extractor.Extract(barR)
147180
}
148181

149182
func getCompressOptions(req cmds.Request) (int, error) {
@@ -161,12 +194,12 @@ func getCompressOptions(req cmds.Request) (int, error) {
161194
}
162195

163196
func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) {
164-
dagnode, err := core.Resolve(ctx, node, p)
197+
dn, err := core.Resolve(ctx, node, p)
165198
if err != nil {
166199
return nil, err
167200
}
168201

169-
return utar.NewReader(ctx, p, node.DAG, dagnode, compression)
202+
return utar.DagArchive(ctx, dn, p.String(), node.DAG, compression)
170203
}
171204

172205
// getZip is equivalent to `ipfs getdag $hash | gzip`

‎test/sharness/t0090-get.sh

+2-3
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ test_get_cmd() {
3737
test_cmp "$HASH" data
3838
'
3939

40-
# this started failing after this change. fixed in later commit
41-
test_expect_failure "ipfs get errors when trying to overwrite a file" '
42-
test_must_fail ipfs get "$HASH" >actual &&
40+
test_expect_success "ipfs get DOES NOT error when trying to overwrite a file" '
41+
ipfs get "$HASH" >actual &&
4342
rm "$HASH"
4443
'
4544

‎thirdparty/tar/extractor.go

+27-20
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"archive/tar"
55
"io"
66
"os"
7+
gopath "path"
78
fp "path/filepath"
89
"strings"
910
)
@@ -39,7 +40,7 @@ func (te *Extractor) Extract(reader io.Reader) error {
3940
}
4041

4142
if header.Typeflag == tar.TypeDir {
42-
if err := te.extractDir(header, i, rootExists); err != nil {
43+
if err := te.extractDir(header, i); err != nil {
4344
return err
4445
}
4546
continue
@@ -52,13 +53,19 @@ func (te *Extractor) Extract(reader io.Reader) error {
5253
return nil
5354
}
5455

55-
func (te *Extractor) extractDir(h *tar.Header, depth int, rootExists bool) error {
56-
pathElements := strings.Split(h.Name, "/")
57-
if !rootExists {
58-
pathElements = pathElements[1:]
59-
}
60-
path := fp.Join(pathElements...)
61-
path = fp.Join(te.Path, path)
56+
// outputPath returns the path at whicht o place tarPath
57+
func (te *Extractor) outputPath(tarPath string) string {
58+
elems := strings.Split(tarPath, "/") // break into elems
59+
elems = elems[1:] // remove original root
60+
61+
path := fp.Join(elems...) // join elems
62+
path = fp.Join(te.Path, path) // rebase on extractor root
63+
return path
64+
}
65+
66+
func (te *Extractor) extractDir(h *tar.Header, depth int) error {
67+
path := te.outputPath(h.Name)
68+
6269
if depth == 0 {
6370
// if this is the root root directory, use it as the output path for remaining files
6471
te.Path = path
@@ -73,18 +80,18 @@ func (te *Extractor) extractDir(h *tar.Header, depth int, rootExists bool) error
7380
}
7481

7582
func (te *Extractor) extractFile(h *tar.Header, r *tar.Reader, depth int, rootExists bool, rootIsDir bool) error {
76-
path := te.Path
77-
if depth == 0 && rootExists {
78-
// if depth is 0, this is the only file (we aren't 'ipfs get'ing a directory)
79-
if rootIsDir { // putting file inside of a root dir.
80-
path = fp.Join(te.Path, h.Name)
81-
}
82-
// else if the file exists, just overwrite it.
83-
} else {
84-
// we are outputting a directory, this file is inside of it
85-
pathElements := strings.Split(h.Name, "/")[1:]
86-
path = fp.Join(pathElements...)
87-
path = fp.Join(te.Path, path)
83+
path := te.outputPath(h.Name)
84+
85+
if depth == 0 { // if depth is 0, this is the only file (we aren't 'ipfs get'ing a directory)
86+
if rootExists && rootIsDir {
87+
// putting file inside of a root dir.
88+
fnameo := gopath.Base(h.Name)
89+
fnamen := fp.Base(path)
90+
// add back original name if lost.
91+
if fnameo != fnamen {
92+
path = fp.Join(path, fnameo)
93+
}
94+
} // else if old file exists, just overwrite it.
8895
}
8996

9097
file, err := os.Create(path)

‎unixfs/tar/reader.go

-221
This file was deleted.

‎unixfs/tar/writer.go

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package tar
2+
3+
import (
4+
"archive/tar"
5+
"bufio"
6+
"compress/gzip"
7+
"fmt"
8+
"io"
9+
"path"
10+
"time"
11+
12+
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
13+
cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
14+
15+
mdag "github.com/ipfs/go-ipfs/merkledag"
16+
uio "github.com/ipfs/go-ipfs/unixfs/io"
17+
upb "github.com/ipfs/go-ipfs/unixfs/pb"
18+
)
19+
20+
// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks.
21+
// TODO: does this need to be configurable?
22+
var DefaultBufSize = 1048576
23+
24+
func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, compression int) (io.Reader, error) {
25+
26+
_, filename := path.Split(name)
27+
28+
// need to connect a writer to a reader
29+
piper, pipew := io.Pipe()
30+
31+
// use a buffered writer to parallelize task
32+
bufw := bufio.NewWriterSize(pipew, DefaultBufSize)
33+
34+
// construct the tar writer
35+
w, err := NewWriter(bufw, dag, compression)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
// write all the nodes recursively
41+
go func() {
42+
if err := w.WriteNode(ctx, nd, filename); err != nil {
43+
pipew.CloseWithError(err)
44+
return
45+
}
46+
47+
if err := bufw.Flush(); err != nil {
48+
pipew.CloseWithError(err)
49+
return
50+
}
51+
52+
pipew.Close() // everything seems to be ok.
53+
}()
54+
55+
return piper, nil
56+
}
57+
58+
// Writer is a utility structure that helps to write
59+
// unixfs merkledag nodes as a tar archive format.
60+
// It wraps any io.Writer.
61+
type Writer struct {
62+
Dag mdag.DAGService
63+
TarW *tar.Writer
64+
}
65+
66+
// NewWriter wraps given io.Writer.
67+
// compression determines whether to use gzip compression.
68+
func NewWriter(w io.Writer, dag mdag.DAGService, compression int) (*Writer, error) {
69+
70+
if compression != gzip.NoCompression {
71+
var err error
72+
w, err = gzip.NewWriterLevel(w, compression)
73+
if err != nil {
74+
return nil, err
75+
}
76+
}
77+
78+
return &Writer{
79+
Dag: dag,
80+
TarW: tar.NewWriter(w),
81+
}, nil
82+
}
83+
84+
func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error {
85+
if err := writeDirHeader(w.TarW, fpath); err != nil {
86+
return err
87+
}
88+
89+
for i, ng := range w.Dag.GetDAG(ctx, nd) {
90+
child, err := ng.Get(ctx)
91+
if err != nil {
92+
return err
93+
}
94+
95+
npath := path.Join(fpath, nd.Links[i].Name)
96+
if err := w.WriteNode(ctx, child, npath); err != nil {
97+
return err
98+
}
99+
}
100+
101+
return nil
102+
}
103+
104+
func (w *Writer) WriteFile(ctx cxt.Context, nd *mdag.Node, fpath string) error {
105+
pb := new(upb.Data)
106+
if err := proto.Unmarshal(nd.Data, pb); err != nil {
107+
return err
108+
}
109+
110+
return w.writeFile(ctx, nd, pb, fpath)
111+
}
112+
113+
func (w *Writer) writeFile(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, fpath string) error {
114+
if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil {
115+
return err
116+
}
117+
118+
dagr, err := uio.NewDagReader(ctx, nd, w.Dag)
119+
if err != nil {
120+
return err
121+
}
122+
123+
_, err = io.Copy(w.TarW, dagr)
124+
if err != nil && err != io.EOF {
125+
return err
126+
}
127+
128+
return nil
129+
}
130+
131+
func (w *Writer) WriteNode(ctx cxt.Context, nd *mdag.Node, fpath string) error {
132+
pb := new(upb.Data)
133+
if err := proto.Unmarshal(nd.Data, pb); err != nil {
134+
return err
135+
}
136+
137+
switch pb.GetType() {
138+
case upb.Data_Directory:
139+
return w.WriteDir(ctx, nd, fpath)
140+
case upb.Data_File:
141+
return w.writeFile(ctx, nd, pb, fpath)
142+
default:
143+
return fmt.Errorf("unixfs type not supported: %s", pb.GetType())
144+
}
145+
}
146+
147+
func (w *Writer) Close() error {
148+
return w.TarW.Close()
149+
}
150+
151+
func writeDirHeader(w *tar.Writer, fpath string) error {
152+
return w.WriteHeader(&tar.Header{
153+
Name: fpath,
154+
Typeflag: tar.TypeDir,
155+
Mode: 0777,
156+
ModTime: time.Now(),
157+
// TODO: set mode, dates, etc. when added to unixFS
158+
})
159+
}
160+
161+
func writeFileHeader(w *tar.Writer, fpath string, size uint64) error {
162+
return w.WriteHeader(&tar.Header{
163+
Name: fpath,
164+
Size: int64(size),
165+
Typeflag: tar.TypeReg,
166+
Mode: 0644,
167+
ModTime: time.Now(),
168+
// TODO: set mode, dates, etc. when added to unixFS
169+
})
170+
}

2 commit comments

Comments
 (2)

rht commented on Aug 9, 2015

@rht
Contributor

This commit fixes #1279 sync problem.
I think WriteFile is superseded by WriteNode. To parallel the pattern in thirdparty/tar/extractor.go, WriteDir should be private.

jbenet commented on Aug 9, 2015

@jbenet
MemberAuthor

@rht yeah makes sense, WriteNode is all we need.

Please sign in to comment.