Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 9538115

Browse files
committedOct 13, 2015
refactor dialing to not panic, and to be smart about ordering
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent bea47c9 commit 9538115

File tree

2 files changed

+160
-51
lines changed

2 files changed

+160
-51
lines changed
 

‎p2p/net/swarm/dial_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package swarm
22

33
import (
44
"net"
5+
"sort"
56
"sync"
67
"testing"
78
"time"
@@ -438,3 +439,46 @@ func TestDialBackoffClears(t *testing.T) {
438439
t.Log("correctly cleared backoff")
439440
}
440441
}
442+
443+
func mkAddr(t *testing.T, s string) ma.Multiaddr {
444+
a, err := ma.NewMultiaddr(s)
445+
if err != nil {
446+
t.Fatal(err)
447+
}
448+
449+
return a
450+
}
451+
452+
func TestDockerAddrCheck(t *testing.T) {
453+
a := mkAddr(t, "/ip4/172.17.2.65/tcp/3124")
454+
455+
if !isDefaultDockerRange(a) {
456+
t.Fatal("expected to be in docker range check")
457+
}
458+
}
459+
460+
func TestAddressSorting(t *testing.T) {
461+
u1 := mkAddr(t, "/ip4/152.12.23.53/udp/1234/utp")
462+
local := mkAddr(t, "/ip4/127.0.0.1/tcp/1234")
463+
norm := mkAddr(t, "/ip4/6.5.4.3/tcp/1234")
464+
docker := mkAddr(t, "/ip4/172.17.4.3/tcp/1234")
465+
466+
l := AddrList{local, docker, u1, norm}
467+
sort.Sort(l)
468+
469+
if !l[0].Equal(u1) {
470+
t.Fatal("expected utp addr to be sorted first")
471+
}
472+
473+
if !l[1].Equal(local) {
474+
t.Fatal("expected localhost addr second")
475+
}
476+
477+
if !l[2].Equal(norm) {
478+
t.Fatal("expected normal addr before docker addr")
479+
}
480+
481+
if !l[3].Equal(docker) {
482+
t.Fatal("expected docker addr last")
483+
}
484+
}

‎p2p/net/swarm/swarm_dial.go

+116-51
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package swarm
22

33
import (
4+
"bytes"
45
"errors"
56
"fmt"
6-
"math/rand"
77
"net"
8+
"sort"
9+
"strings"
810
"sync"
911
"time"
1012

@@ -16,9 +18,6 @@ import (
1618

1719
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
1820
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
19-
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
20-
processctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
21-
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
2221
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
2322
)
2423

@@ -361,6 +360,9 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
361360

362361
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
363362

363+
// sort addresses so preferred addresses are dialed sooner
364+
sort.Sort(AddrList(remoteAddrs))
365+
364366
// try to connect to one of the peer's known addresses.
365367
// we dial concurrently to each of the addresses, which:
366368
// * makes the process faster overall
@@ -372,74 +374,77 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
372374
defer cancel() // cancel work when we exit func
373375

374376
foundConn := make(chan struct{})
375-
conns := make(chan conn.Conn, len(remoteAddrs))
377+
conns := make(chan conn.Conn)
376378
errs := make(chan error, len(remoteAddrs))
377379

378380
// dialSingleAddr is used in the rate-limited async thing below.
379381
dialSingleAddr := func(addr ma.Multiaddr) {
382+
// rebind chans in scope so we can nil them out easily
383+
connsout := conns
384+
errsout := errs
385+
380386
connC, err := s.dialAddr(ctx, d, p, addr)
387+
if err != nil {
388+
connsout = nil
389+
} else if connC == nil {
390+
// NOTE: this really should never happen
391+
log.Errorf("failed to dial %s %s and got no error!", p, addr)
392+
err = fmt.Errorf("failed to dial %s %s", p, addr)
393+
connsout = nil
394+
} else {
395+
errsout = nil
396+
}
381397

382398
// check parent still wants our results
383399
select {
384400
case <-foundConn:
385401
if connC != nil {
386402
connC.Close()
387403
}
388-
return
389-
default:
390-
}
391-
392-
if err != nil {
393-
errs <- err
394-
} else if connC == nil {
395-
errs <- fmt.Errorf("failed to dial %s %s", p, addr)
396-
} else {
397-
conns <- connC
404+
case errsout <- err:
405+
case connsout <- connC:
398406
}
399407
}
400408

401409
// this whole thing is in a goroutine so we can use foundConn
402410
// to end early.
403411
go func() {
404-
// rate limiting just in case. at most 10 addrs at once.
405-
limiter := ratelimit.NewRateLimiter(process.Background(), 8)
406-
limiter.Go(func(worker process.Process) {
407-
// permute addrs so we try different sets first each time.
408-
for _, i := range rand.Perm(len(remoteAddrs)) {
409-
select {
410-
case <-foundConn: // if one of them succeeded already
411-
break
412-
case <-worker.Closing(): // our context was cancelled
413-
break
414-
default:
415-
}
416-
417-
workerAddr := remoteAddrs[i] // shadow variable to avoid race
418-
419-
// we have to do the waiting concurrently because there are addrs
420-
// that SHOULD NOT be rate limited (utp), nor blocked by other
421-
// rate limited addrs (tcp).
422-
//
423-
// (and we need to call `limiter.Go`, instead of `go` as required
424-
// by goproc/limiter semantics. note: limiter.Go is not LimitedGo.)
425-
limiter.Go(func(p process.Process) {
426-
427-
// returns whatever ratelimiting is acceptable for workerAddr.
428-
// may not rate limit at all.
429-
rl := s.addrDialRateLimit(workerAddr)
430-
rl <- struct{}{}
431-
432-
limiter.LimitedGo(func(worker process.Process) {
433-
dialSingleAddr(workerAddr)
434-
})
435-
436-
<-rl
437-
})
412+
limiter := make(chan struct{}, 8)
413+
// permute addrs so we try different sets first each time.
414+
for _, addr := range remoteAddrs {
415+
select {
416+
case <-foundConn: // if one of them succeeded already
417+
return
418+
case <-ctx.Done(): // our context was cancelled
419+
return
420+
case limiter <- struct{}{}:
421+
// continue
422+
}
438423

424+
// returns whatever ratelimiting is acceptable for workerAddr.
425+
// may not rate limit at all.
426+
rl := s.addrDialRateLimit(addr)
427+
select {
428+
case <-foundConn: // if one of them succeeded already
429+
return
430+
case <-ctx.Done(): // our context was cancelled
431+
return
432+
case rl <- struct{}{}:
433+
// continue
439434
}
440-
})
441435

442-
processctx.CloseAfterContext(limiter, ctx)
436+
// we have to do the waiting concurrently because there are addrs
437+
// that SHOULD NOT be rate limited (utp), nor blocked by other
438+
// rate limited addrs (tcp).
439+
go func(rlc <-chan struct{}, a ma.Multiaddr) {
440+
defer func() {
441+
<-limiter
442+
<-rlc
443+
}()
444+
dialSingleAddr(a)
445+
}(rl, addr)
446+
447+
}
443448
}()
444449

445450
// wair fot the results.
@@ -533,3 +538,63 @@ func isTCPMultiaddr(a ma.Multiaddr) bool {
533538
p := a.Protocols()
534539
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
535540
}
541+
542+
func isDefaultDockerRange(a ma.Multiaddr) bool {
543+
parts := strings.Split(a.String(), "/")
544+
if len(parts) != 5 {
545+
return false
546+
}
547+
548+
if parts[1] == "ip4" && strings.HasPrefix(parts[2], "172.17.") {
549+
return true
550+
}
551+
552+
return false
553+
}
554+
555+
type AddrList []ma.Multiaddr
556+
557+
func (al AddrList) Len() int {
558+
return len(al)
559+
}
560+
561+
func (al AddrList) Swap(i, j int) {
562+
al[i], al[j] = al[j], al[i]
563+
}
564+
565+
func (al AddrList) Less(i, j int) bool {
566+
a := al[i]
567+
b := al[j]
568+
569+
// dial utp and similar 'non-fd-consuming' addresses first
570+
if !isFDCostlyTransport(a) {
571+
if isFDCostlyTransport(b) {
572+
return true
573+
}
574+
575+
// if neither consume fd's, assume equal ordering
576+
return false
577+
}
578+
579+
// dial localhost addresses next, they should fail immediately
580+
if manet.IsIPLoopback(a) {
581+
if !manet.IsIPLoopback(b) {
582+
return true
583+
}
584+
585+
// both local? equal
586+
return false
587+
}
588+
589+
// docker addresses should be tried last. they very rarely work.
590+
if isDefaultDockerRange(a) {
591+
return false
592+
}
593+
594+
if isDefaultDockerRange(b) {
595+
return true
596+
}
597+
598+
// for the rest, just sort by bytes
599+
return bytes.Compare(a.Bytes(), b.Bytes()) > 0
600+
}

0 commit comments

Comments
 (0)
Please sign in to comment.