Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ipfs/kubo
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 2b06ffaa7d42
Choose a base ref
...
head repository: ipfs/kubo
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 1ce310be8b45
Choose a head ref
  • 2 commits
  • 2 files changed
  • 1 contributor

Commits on Jul 26, 2015

  1. refactor http client code

    License: MIT
    Signed-off-by: Jeromy <jeromyj@gmail.com>
    whyrusleeping committed Jul 26, 2015
    Copy the full SHA
    fd75b64 View commit details
  2. some commenting

    License: MIT
    Signed-off-by: Jeromy <jeromyj@gmail.com>
    whyrusleeping committed Jul 26, 2015
    Copy the full SHA
    1ce310b View commit details
Showing with 67 additions and 61 deletions.
  1. +65 −61 commands/http/client.go
  2. +2 −0 commands/http/handler.go
126 changes: 65 additions & 61 deletions commands/http/client.go
Original file line number Diff line number Diff line change
@@ -184,78 +184,41 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error

res.SetCloser(httpRes.Body)

if len(httpRes.Header.Get(streamHeader)) > 0 && contentType != "application/json" {
// if output is a stream, we can just use the body reader
if contentType != "application/json" {
// for all non json output types, just stream back the output
res.SetOutput(&httpResponseReader{httpRes})
return res, nil

} else if len(httpRes.Header.Get(channelHeader)) > 0 {
// if output is coming from a channel, decode each chunk
outChan := make(chan interface{})
go func() {
dec := json.NewDecoder(&httpResponseReader{httpRes})
outputType := reflect.TypeOf(req.Command().Type)

ctx := req.Context()

for {
var v interface{}
var err error
if outputType != nil {
v = reflect.New(outputType).Interface()
err = dec.Decode(v)
} else {
err = dec.Decode(&v)
}

// since we are just looping reading on the response, the only way to
// know we are 'done' is for the consumer to close the response body.
// doing so doesnt throw an io.EOF, but we want to treat it like one.
if err != nil && strings.Contains(err.Error(), "read on closed response body") {
err = io.EOF
}
if err != nil && err != io.EOF {
log.Error(err)
return
}

select {
case <-ctx.Done():
close(outChan)
return
default:
}

if err == io.EOF {
close(outChan)
return
}
outChan <- v
}
}()

go readStreamedJson(req, httpRes, outChan)

res.SetOutput((<-chan interface{})(outChan))
return res, nil
}

dec := json.NewDecoder(&httpResponseReader{httpRes})

// If we ran into an error
if httpRes.StatusCode >= http.StatusBadRequest {
e := cmds.Error{}

if httpRes.StatusCode == http.StatusNotFound {
switch {
case httpRes.StatusCode == http.StatusNotFound:
// handle 404s
e.Message = "Command not found."
e.Code = cmds.ErrClient

} else if contentType == "text/plain" {
case contentType == "text/plain":
// handle non-marshalled errors
buf := bytes.NewBuffer(nil)
io.Copy(buf, httpRes.Body)
e.Message = string(buf.Bytes())
e.Code = cmds.ErrNormal

} else {
default:
// handle marshalled errors
err = dec.Decode(&e)
if err != nil {
@@ -265,27 +228,68 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error

res.SetError(e, e.Code)

} else {
outputType := reflect.TypeOf(req.Command().Type)
var v interface{}

if outputType != nil {
v = reflect.New(outputType).Interface()
err = dec.Decode(v)
} else {
err = dec.Decode(&v)
}
if err != nil && err != io.EOF {
return nil, err
return res, nil
}

outputType := reflect.TypeOf(req.Command().Type)
v, err := decodeTypedVal(outputType, dec)
if err != nil && err != io.EOF {
return nil, err
}

res.SetOutput(v)

return res, nil
}

// read json objects off of the given stream, and write the objects out to
// the 'out' channel
func readStreamedJson(req cmds.Request, httpRes *http.Response, out chan<- interface{}) {
defer close(out)
dec := json.NewDecoder(&httpResponseReader{httpRes})
outputType := reflect.TypeOf(req.Command().Type)

ctx := req.Context()

for {
v, err := decodeTypedVal(outputType, dec)
if err != nil {
// since we are just looping reading on the response, the only way to
// know we are 'done' is for the consumer to close the response body.
// doing so doesnt throw an io.EOF, but we want to treat it like one.
if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) {
log.Error(err)
}
return
}
if v != nil {
res.SetOutput(v)

select {
case <-ctx.Done():
return
case out <- v:
}

}
}

return res, nil
// decode a value of the given type, if the type is nil, attempt to decode into
// an interface{} anyways
func decodeTypedVal(t reflect.Type, dec *json.Decoder) (interface{}, error) {
var v interface{}
var err error
if t != nil {
v = reflect.New(t).Interface()
err = dec.Decode(v)
} else {
err = dec.Decode(&v)
}

return v, err
}

// httpResponseReader reads from the response body, and checks for an error
// in the http trailer upon EOF, this error if present is returned instead
// of the EOF.
type httpResponseReader struct {
resp *http.Response
}
2 changes: 2 additions & 0 deletions commands/http/handler.go
Original file line number Diff line number Diff line change
@@ -193,6 +193,7 @@ func sendResponse(w http.ResponseWriter, req cmds.Request, res cmds.Response) {
mime = applicationJson
}
}

if mime != "" {
h.Set(contentTypeHeader, mime)
}
@@ -207,6 +208,7 @@ func sendResponse(w http.ResponseWriter, req cmds.Request, res cmds.Response) {
// Copies from an io.Reader to a http.ResponseWriter.
// Flushes chunks over HTTP stream as they are read (if supported by transport).
func copyChunks(status int, w http.ResponseWriter, out io.Reader) error {
// hijack the connection so we can write our own chunked output and trailers
hijacker, ok := w.(http.Hijacker)
if !ok {
log.Error("Failed to create hijacker! cannot continue!")