Skip to content

Commit aaaf884

Browse files
committedJul 28, 2015
Merge pull request #1519 from ipfs/get-fix
Implement http trailers for error handling
2 parents 633b66d + 8176766 commit aaaf884

File tree

5 files changed

+214
-164
lines changed

5 files changed

+214
-164
lines changed
 

‎commands/http/client.go

+110-69
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package http
33
import (
44
"bytes"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"io"
89
"net/http"
@@ -29,10 +30,21 @@ type Client interface {
2930

3031
type client struct {
3132
serverAddress string
33+
httpClient http.Client
3234
}
3335

3436
func NewClient(address string) Client {
35-
return &client{address}
37+
// We cannot use the default transport because of a bug in go's connection reuse
38+
// code. It causes random failures in the connection including io.EOF and connection
39+
// refused on 'client.Do'
40+
return &client{
41+
serverAddress: address,
42+
httpClient: http.Client{
43+
Transport: &http.Transport{
44+
DisableKeepAlives: true,
45+
},
46+
},
47+
}
3648
}
3749

3850
func (c *client) Send(req cmds.Request) (cmds.Response, error) {
@@ -84,20 +96,20 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
8496

8597
// TODO extract string consts?
8698
if fileReader != nil {
87-
httpReq.Header.Set("Content-Type", "multipart/form-data; boundary="+fileReader.Boundary())
88-
httpReq.Header.Set("Content-Disposition", "form-data: name=\"files\"")
99+
httpReq.Header.Set(contentTypeHeader, "multipart/form-data; boundary="+fileReader.Boundary())
100+
httpReq.Header.Set(contentDispHeader, "form-data: name=\"files\"")
89101
} else {
90-
httpReq.Header.Set("Content-Type", "application/octet-stream")
102+
httpReq.Header.Set(contentTypeHeader, applicationOctetStream)
91103
}
92104
version := config.CurrentVersionNumber
93-
httpReq.Header.Set("User-Agent", fmt.Sprintf("/go-ipfs/%s/", version))
105+
httpReq.Header.Set(uaHeader, fmt.Sprintf("/go-ipfs/%s/", version))
94106

95107
ec := make(chan error, 1)
96108
rc := make(chan cmds.Response, 1)
97109
dc := req.Context().Done()
98110

99111
go func() {
100-
httpRes, err := http.DefaultClient.Do(httpReq)
112+
httpRes, err := c.httpClient.Do(httpReq)
101113
if err != nil {
102114
ec <- err
103115
return
@@ -181,80 +193,44 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
181193
res.SetLength(length)
182194
}
183195

184-
res.SetCloser(httpRes.Body)
196+
rr := &httpResponseReader{httpRes}
197+
res.SetCloser(rr)
185198

186-
if len(httpRes.Header.Get(streamHeader)) > 0 {
187-
// if output is a stream, we can just use the body reader
188-
res.SetOutput(httpRes.Body)
199+
if contentType != applicationJson {
200+
// for all non json output types, just stream back the output
201+
res.SetOutput(rr)
189202
return res, nil
190203

191204
} else if len(httpRes.Header.Get(channelHeader)) > 0 {
192205
// if output is coming from a channel, decode each chunk
193206
outChan := make(chan interface{})
194-
go func() {
195-
dec := json.NewDecoder(httpRes.Body)
196-
outputType := reflect.TypeOf(req.Command().Type)
197-
198-
ctx := req.Context()
199-
200-
for {
201-
var v interface{}
202-
var err error
203-
if outputType != nil {
204-
v = reflect.New(outputType).Interface()
205-
err = dec.Decode(v)
206-
} else {
207-
err = dec.Decode(&v)
208-
}
209-
210-
// since we are just looping reading on the response, the only way to
211-
// know we are 'done' is for the consumer to close the response body.
212-
// doing so doesnt throw an io.EOF, but we want to treat it like one.
213-
if err != nil && strings.Contains(err.Error(), "read on closed response body") {
214-
err = io.EOF
215-
}
216-
if err != nil && err != io.EOF {
217-
log.Error(err)
218-
return
219-
}
220-
221-
select {
222-
case <-ctx.Done():
223-
close(outChan)
224-
return
225-
default:
226-
}
227-
228-
if err == io.EOF {
229-
close(outChan)
230-
return
231-
}
232-
outChan <- v
233-
}
234-
}()
207+
208+
go readStreamedJson(req, rr, outChan)
235209

236210
res.SetOutput((<-chan interface{})(outChan))
237211
return res, nil
238212
}
239213

240-
dec := json.NewDecoder(httpRes.Body)
214+
dec := json.NewDecoder(rr)
241215

216+
// If we ran into an error
242217
if httpRes.StatusCode >= http.StatusBadRequest {
243218
e := cmds.Error{}
244219

245-
if httpRes.StatusCode == http.StatusNotFound {
220+
switch {
221+
case httpRes.StatusCode == http.StatusNotFound:
246222
// handle 404s
247223
e.Message = "Command not found."
248224
e.Code = cmds.ErrClient
249225

250-
} else if contentType == "text/plain" {
226+
case contentType == plainText:
251227
// handle non-marshalled errors
252228
buf := bytes.NewBuffer(nil)
253229
io.Copy(buf, httpRes.Body)
254230
e.Message = string(buf.Bytes())
255231
e.Code = cmds.ErrNormal
256232

257-
} else {
233+
default:
258234
// handle marshalled errors
259235
err = dec.Decode(&e)
260236
if err != nil {
@@ -264,23 +240,88 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
264240

265241
res.SetError(e, e.Code)
266242

267-
} else {
268-
outputType := reflect.TypeOf(req.Command().Type)
269-
var v interface{}
270-
271-
if outputType != nil {
272-
v = reflect.New(outputType).Interface()
273-
err = dec.Decode(v)
274-
} else {
275-
err = dec.Decode(&v)
243+
return res, nil
244+
}
245+
246+
outputType := reflect.TypeOf(req.Command().Type)
247+
v, err := decodeTypedVal(outputType, dec)
248+
if err != nil && err != io.EOF {
249+
return nil, err
250+
}
251+
252+
res.SetOutput(v)
253+
254+
return res, nil
255+
}
256+
257+
// read json objects off of the given stream, and write the objects out to
258+
// the 'out' channel
259+
func readStreamedJson(req cmds.Request, rr io.Reader, out chan<- interface{}) {
260+
defer close(out)
261+
dec := json.NewDecoder(rr)
262+
outputType := reflect.TypeOf(req.Command().Type)
263+
264+
ctx := req.Context()
265+
266+
for {
267+
v, err := decodeTypedVal(outputType, dec)
268+
if err != nil {
269+
// reading on a closed response body is as good as an io.EOF here
270+
if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) {
271+
log.Error(err)
272+
}
273+
return
276274
}
277-
if err != nil && err != io.EOF {
278-
return nil, err
275+
276+
select {
277+
case <-ctx.Done():
278+
return
279+
case out <- v:
279280
}
280-
if v != nil {
281-
res.SetOutput(v)
281+
}
282+
}
283+
284+
// decode a value of the given type, if the type is nil, attempt to decode into
285+
// an interface{} anyways
286+
func decodeTypedVal(t reflect.Type, dec *json.Decoder) (interface{}, error) {
287+
var v interface{}
288+
var err error
289+
if t != nil {
290+
v = reflect.New(t).Interface()
291+
err = dec.Decode(v)
292+
} else {
293+
err = dec.Decode(&v)
294+
}
295+
296+
return v, err
297+
}
298+
299+
// httpResponseReader reads from the response body, and checks for an error
300+
// in the http trailer upon EOF, this error if present is returned instead
301+
// of the EOF.
302+
type httpResponseReader struct {
303+
resp *http.Response
304+
}
305+
306+
func (r *httpResponseReader) Read(b []byte) (int, error) {
307+
n, err := r.resp.Body.Read(b)
308+
if err == io.EOF {
309+
_ = r.resp.Body.Close()
310+
trailerErr := r.checkError()
311+
if trailerErr != nil {
312+
return n, trailerErr
282313
}
283314
}
315+
return n, err
316+
}
284317

285-
return res, nil
318+
func (r *httpResponseReader) checkError() error {
319+
if e := r.resp.Trailer.Get(StreamErrHeader); e != "" {
320+
return errors.New(e)
321+
}
322+
return nil
323+
}
324+
325+
func (r *httpResponseReader) Close() error {
326+
return r.resp.Body.Close()
286327
}

‎commands/http/handler.go

+93-75
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package http
22

33
import (
4+
"bufio"
45
"errors"
56
"fmt"
67
"io"
@@ -32,12 +33,17 @@ type Handler struct {
3233
var ErrNotFound = errors.New("404 page not found")
3334

3435
const (
36+
StreamErrHeader = "X-Stream-Error"
3537
streamHeader = "X-Stream-Output"
3638
channelHeader = "X-Chunked-Output"
39+
uaHeader = "User-Agent"
3740
contentTypeHeader = "Content-Type"
3841
contentLengthHeader = "Content-Length"
42+
contentDispHeader = "Content-Disposition"
3943
transferEncodingHeader = "Transfer-Encoding"
4044
applicationJson = "application/json"
45+
applicationOctetStream = "application/octet-stream"
46+
plainText = "text/plain"
4147
)
4248

4349
var mimeTypes = map[string]string{
@@ -70,6 +76,11 @@ func NewHandler(ctx cmds.Context, root *cmds.Command, allowedOrigin string) *Han
7076
return &Handler{internal, c.Handler(internal)}
7177
}
7278

79+
func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
80+
// Call the CORS handler which wraps the internal handler.
81+
i.corsHandler.ServeHTTP(w, r)
82+
}
83+
7384
func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
7485
log.Debug("Incoming API request: ", r.URL)
7586

@@ -101,8 +112,8 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
101112
// get the node's context to pass into the commands.
102113
node, err := i.ctx.GetNode()
103114
if err != nil {
104-
err = fmt.Errorf("cmds/http: couldn't GetNode(): %s", err)
105-
http.Error(w, err.Error(), http.StatusInternalServerError)
115+
s := fmt.Sprintf("cmds/http: couldn't GetNode(): %s", err)
116+
http.Error(w, s, http.StatusInternalServerError)
106117
return
107118
}
108119

@@ -117,46 +128,60 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
117128
// call the command
118129
res := i.root.Call(req)
119130

120-
// set the Content-Type based on res output
131+
// now handle responding to the client properly
132+
sendResponse(w, req, res)
133+
}
134+
135+
func guessMimeType(res cmds.Response) (string, error) {
121136
if _, ok := res.Output().(io.Reader); ok {
122137
// we don't set the Content-Type for streams, so that browsers can MIME-sniff the type themselves
123138
// we set this header so clients have a way to know this is an output stream
124139
// (not marshalled command output)
125140
// TODO: set a specific Content-Type if the command response needs it to be a certain type
126-
w.Header().Set(streamHeader, "1")
141+
return "", nil
142+
}
127143

128-
} else {
129-
enc, found, err := req.Option(cmds.EncShort).String()
130-
if err != nil || !found {
131-
w.WriteHeader(http.StatusInternalServerError)
132-
return
133-
}
134-
mime := mimeTypes[enc]
135-
w.Header().Set(contentTypeHeader, mime)
144+
// Try to guess mimeType from the encoding option
145+
enc, found, err := res.Request().Option(cmds.EncShort).String()
146+
if err != nil {
147+
return "", err
148+
}
149+
if !found {
150+
return "", errors.New("no encoding option set")
136151
}
137152

138-
// set the Content-Length from the response length
139-
if res.Length() > 0 {
140-
w.Header().Set(contentLengthHeader, strconv.FormatUint(res.Length(), 10))
153+
return mimeTypes[enc], nil
154+
}
155+
156+
func sendResponse(w http.ResponseWriter, req cmds.Request, res cmds.Response) {
157+
mime, err := guessMimeType(res)
158+
if err != nil {
159+
http.Error(w, err.Error(), http.StatusInternalServerError)
160+
return
141161
}
142162

163+
status := http.StatusOK
143164
// if response contains an error, write an HTTP error status code
144165
if e := res.Error(); e != nil {
145166
if e.Code == cmds.ErrClient {
146-
w.WriteHeader(http.StatusBadRequest)
167+
status = http.StatusBadRequest
147168
} else {
148-
w.WriteHeader(http.StatusInternalServerError)
169+
status = http.StatusInternalServerError
149170
}
171+
// NOTE: The error will actually be written out by the reader below
150172
}
151173

152174
out, err := res.Reader()
153175
if err != nil {
154-
w.Header().Set(contentTypeHeader, "text/plain")
155-
w.WriteHeader(http.StatusInternalServerError)
156-
w.Write([]byte(err.Error()))
176+
http.Error(w, err.Error(), http.StatusInternalServerError)
157177
return
158178
}
159179

180+
h := w.Header()
181+
if res.Length() > 0 {
182+
h.Set(contentLengthHeader, strconv.FormatUint(res.Length(), 10))
183+
}
184+
160185
// if output is a channel and user requested streaming channels,
161186
// use chunk copier for the output
162187
_, isChan := res.Output().(chan interface{})
@@ -165,44 +190,32 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
165190
}
166191

167192
streamChans, _, _ := req.Option("stream-channels").Bool()
168-
if isChan && streamChans {
169-
// w.WriteString(transferEncodingHeader + ": chunked\r\n")
170-
// w.Header().Set(channelHeader, "1")
171-
// w.WriteHeader(200)
172-
err = copyChunks(applicationJson, w, out)
173-
if err != nil {
174-
log.Debug("copy chunks error: ", err)
193+
if isChan {
194+
h.Set(channelHeader, "1")
195+
if streamChans {
196+
// streaming output from a channel will always be json objects
197+
mime = applicationJson
175198
}
176-
return
177199
}
178200

179-
err = flushCopy(w, out)
180-
if err != nil {
181-
log.Debug("Flush copy returned an error: ", err)
201+
if mime != "" {
202+
h.Set(contentTypeHeader, mime)
182203
}
183-
}
204+
h.Set(streamHeader, "1")
205+
h.Set(transferEncodingHeader, "chunked")
184206

185-
func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
186-
// Call the CORS handler which wraps the internal handler.
187-
i.corsHandler.ServeHTTP(w, r)
188-
}
189-
190-
// flushCopy Copies from an io.Reader to a http.ResponseWriter.
191-
// Flushes chunks over HTTP stream as they are read (if supported by transport).
192-
func flushCopy(w http.ResponseWriter, out io.Reader) error {
193-
if _, ok := w.(http.Flusher); !ok {
194-
return copyChunks("", w, out)
207+
if err := writeResponse(status, w, out); err != nil {
208+
log.Error("error while writing stream", err)
195209
}
196-
197-
_, err := io.Copy(&flushResponse{w}, out)
198-
return err
199210
}
200211

201212
// Copies from an io.Reader to a http.ResponseWriter.
202213
// Flushes chunks over HTTP stream as they are read (if supported by transport).
203-
func copyChunks(contentType string, w http.ResponseWriter, out io.Reader) error {
214+
func writeResponse(status int, w http.ResponseWriter, out io.Reader) error {
215+
// hijack the connection so we can write our own chunked output and trailers
204216
hijacker, ok := w.(http.Hijacker)
205217
if !ok {
218+
log.Error("Failed to create hijacker! cannot continue!")
206219
return errors.New("Could not create hijacker")
207220
}
208221
conn, writer, err := hijacker.Hijack()
@@ -211,29 +224,47 @@ func copyChunks(contentType string, w http.ResponseWriter, out io.Reader) error
211224
}
212225
defer conn.Close()
213226

214-
writer.WriteString("HTTP/1.1 200 OK\r\n")
215-
if contentType != "" {
216-
writer.WriteString(contentTypeHeader + ": " + contentType + "\r\n")
227+
// write status
228+
writer.WriteString(fmt.Sprintf("HTTP/1.1 %d %s\r\n", status, http.StatusText(status)))
229+
230+
// Write out headers
231+
w.Header().Write(writer)
232+
233+
// end of headers
234+
writer.WriteString("\r\n")
235+
236+
// write body
237+
streamErr := writeChunks(out, writer)
238+
239+
// close body
240+
writer.WriteString("0\r\n")
241+
242+
// if there was a stream error, write out an error trailer. hopefully
243+
// the client will pick it up!
244+
if streamErr != nil {
245+
writer.WriteString(StreamErrHeader + ": " + sanitizedErrStr(streamErr) + "\r\n")
217246
}
218-
writer.WriteString(transferEncodingHeader + ": chunked\r\n")
219-
writer.WriteString(channelHeader + ": 1\r\n\r\n")
247+
writer.WriteString("\r\n") // close response
248+
writer.Flush()
249+
return streamErr
250+
}
220251

252+
func writeChunks(r io.Reader, w *bufio.ReadWriter) error {
221253
buf := make([]byte, 32*1024)
222-
223254
for {
224-
n, err := out.Read(buf)
255+
n, err := r.Read(buf)
225256

226257
if n > 0 {
227258
length := fmt.Sprintf("%x\r\n", n)
228-
writer.WriteString(length)
259+
w.WriteString(length)
229260

230-
_, err := writer.Write(buf[0:n])
261+
_, err := w.Write(buf[0:n])
231262
if err != nil {
232263
return err
233264
}
234265

235-
writer.WriteString("\r\n")
236-
writer.Flush()
266+
w.WriteString("\r\n")
267+
w.Flush()
237268
}
238269

239270
if err != nil && err != io.EOF {
@@ -243,25 +274,12 @@ func copyChunks(contentType string, w http.ResponseWriter, out io.Reader) error
243274
break
244275
}
245276
}
246-
247-
writer.WriteString("0\r\n\r\n")
248-
writer.Flush()
249-
250277
return nil
251278
}
252279

253-
type flushResponse struct {
254-
W http.ResponseWriter
255-
}
256-
257-
func (fr *flushResponse) Write(buf []byte) (int, error) {
258-
n, err := fr.W.Write(buf)
259-
if err != nil {
260-
return n, err
261-
}
262-
263-
if flusher, ok := fr.W.(http.Flusher); ok {
264-
flusher.Flush()
265-
}
266-
return n, err
280+
func sanitizedErrStr(err error) string {
281+
s := err.Error()
282+
s = strings.Split(s, "\n")[0]
283+
s = strings.Split(s, "\r")[0]
284+
return s
267285
}

‎core/commands/get.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ may also specify the level of compression by specifying '-l=<1-9>'.
120120
bar.Start()
121121
defer bar.Finish()
122122

123-
_, err = io.Copy(file, pbReader)
124-
if err != nil {
123+
if _, err := io.Copy(file, pbReader); err != nil {
125124
res.SetError(err, cmds.ErrNormal)
126125
return
127126
}
@@ -140,10 +139,8 @@ may also specify the level of compression by specifying '-l=<1-9>'.
140139

141140
bar.Start()
142141
defer bar.Finish()
143-
144142
extractor := &tar.Extractor{outPath}
145-
err = extractor.Extract(reader)
146-
if err != nil {
143+
if err := extractor.Extract(reader); err != nil {
147144
res.SetError(err, cmds.ErrNormal)
148145
}
149146
},
@@ -169,7 +166,7 @@ func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int)
169166
return nil, err
170167
}
171168

172-
return utar.NewReader(p, node.DAG, dagnode, compression)
169+
return utar.NewReader(ctx, p, node.DAG, dagnode, compression)
173170
}
174171

175172
// getZip is equivalent to `ipfs getdag $hash | gzip`

‎thirdparty/tar/extractor.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,13 @@ func (te *Extractor) Extract(reader io.Reader) error {
3939
}
4040

4141
if header.Typeflag == tar.TypeDir {
42-
err = te.extractDir(header, i, exists)
43-
if err != nil {
42+
if err := te.extractDir(header, i, exists); err != nil {
4443
return err
4544
}
4645
continue
4746
}
4847

49-
err = te.extractFile(header, tarReader, i, exists, pathIsDir)
50-
if err != nil {
48+
if err := te.extractFile(header, tarReader, i, exists, pathIsDir); err != nil {
5149
return err
5250
}
5351
}

‎unixfs/tar/reader.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"time"
1010

1111
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
12-
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
12+
cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1313

1414
mdag "github.com/ipfs/go-ipfs/merkledag"
1515
path "github.com/ipfs/go-ipfs/path"
@@ -28,7 +28,7 @@ type Reader struct {
2828
err error
2929
}
3030

31-
func NewReader(path path.Path, dag mdag.DAGService, dagnode *mdag.Node, compression int) (*Reader, error) {
31+
func NewReader(ctx cxt.Context, path path.Path, dag mdag.DAGService, dagnode *mdag.Node, compression int) (*Reader, error) {
3232

3333
reader := &Reader{
3434
signalChan: make(chan struct{}),
@@ -49,12 +49,11 @@ func NewReader(path path.Path, dag mdag.DAGService, dagnode *mdag.Node, compress
4949
// writeToBuf will write the data to the buffer, and will signal when there
5050
// is new data to read
5151
_, filename := gopath.Split(path.String())
52-
go reader.writeToBuf(dagnode, filename, 0)
53-
52+
go reader.writeToBuf(ctx, dagnode, filename, 0)
5453
return reader, nil
5554
}
5655

57-
func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
56+
func (r *Reader) writeToBuf(ctx cxt.Context, dagnode *mdag.Node, path string, depth int) {
5857
pb := new(upb.Data)
5958
err := proto.Unmarshal(dagnode.Data, pb)
6059
if err != nil {
@@ -80,16 +79,13 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
8079
}
8180
r.flush()
8281

83-
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*60)
84-
defer cancel()
85-
8682
for i, ng := range r.dag.GetDAG(ctx, dagnode) {
8783
childNode, err := ng.Get(ctx)
8884
if err != nil {
8985
r.emitError(err)
9086
return
9187
}
92-
r.writeToBuf(childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
88+
r.writeToBuf(ctx, childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
9389
}
9490
return
9591
}
@@ -108,7 +104,7 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
108104
}
109105
r.flush()
110106

111-
reader, err := uio.NewDagReader(context.TODO(), dagnode, r.dag)
107+
reader, err := uio.NewDagReader(ctx, dagnode, r.dag)
112108
if err != nil {
113109
r.emitError(err)
114110
return

0 commit comments

Comments
 (0)
Please sign in to comment.