1
1
package swarm
2
2
3
3
import (
4
+ "bytes"
4
5
"errors"
5
6
"fmt"
6
- "math/rand"
7
7
"net"
8
+ "sort"
9
+ "strings"
8
10
"sync"
9
11
"time"
10
12
@@ -16,9 +18,6 @@ import (
16
18
17
19
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
18
20
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
19
- process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
20
- processctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
21
- ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
22
21
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
23
22
)
24
23
@@ -361,6 +360,9 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
361
360
362
361
func (s * Swarm ) dialAddrs (ctx context.Context , d * conn.Dialer , p peer.ID , remoteAddrs []ma.Multiaddr ) (conn.Conn , error ) {
363
362
363
+ // sort addresses so preferred addresses are dialed sooner
364
+ sort .Sort (AddrList (remoteAddrs ))
365
+
364
366
// try to connect to one of the peer's known addresses.
365
367
// we dial concurrently to each of the addresses, which:
366
368
// * makes the process faster overall
@@ -372,74 +374,77 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
372
374
defer cancel () // cancel work when we exit func
373
375
374
376
foundConn := make (chan struct {})
375
- conns := make (chan conn.Conn , len ( remoteAddrs ) )
377
+ conns := make (chan conn.Conn )
376
378
errs := make (chan error , len (remoteAddrs ))
377
379
378
380
// dialSingleAddr is used in the rate-limited async thing below.
379
381
dialSingleAddr := func (addr ma.Multiaddr ) {
382
+ // rebind chans in scope so we can nil them out easily
383
+ connsout := conns
384
+ errsout := errs
385
+
380
386
connC , err := s .dialAddr (ctx , d , p , addr )
387
+ if err != nil {
388
+ connsout = nil
389
+ } else if connC == nil {
390
+ // NOTE: this really should never happen
391
+ log .Errorf ("failed to dial %s %s and got no error!" , p , addr )
392
+ err = fmt .Errorf ("failed to dial %s %s" , p , addr )
393
+ connsout = nil
394
+ } else {
395
+ errsout = nil
396
+ }
381
397
382
398
// check parent still wants our results
383
399
select {
384
400
case <- foundConn :
385
401
if connC != nil {
386
402
connC .Close ()
387
403
}
388
- return
389
- default :
390
- }
391
-
392
- if err != nil {
393
- errs <- err
394
- } else if connC == nil {
395
- errs <- fmt .Errorf ("failed to dial %s %s" , p , addr )
396
- } else {
397
- conns <- connC
404
+ case errsout <- err :
405
+ case connsout <- connC :
398
406
}
399
407
}
400
408
401
409
// this whole thing is in a goroutine so we can use foundConn
402
410
// to end early.
403
411
go func () {
404
- // rate limiting just in case. at most 10 addrs at once.
405
- limiter := ratelimit .NewRateLimiter (process .Background (), 8 )
406
- limiter .Go (func (worker process.Process ) {
407
- // permute addrs so we try different sets first each time.
408
- for _ , i := range rand .Perm (len (remoteAddrs )) {
409
- select {
410
- case <- foundConn : // if one of them succeeded already
411
- break
412
- case <- worker .Closing (): // our context was cancelled
413
- break
414
- default :
415
- }
416
-
417
- workerAddr := remoteAddrs [i ] // shadow variable to avoid race
418
-
419
- // we have to do the waiting concurrently because there are addrs
420
- // that SHOULD NOT be rate limited (utp), nor blocked by other
421
- // rate limited addrs (tcp).
422
- //
423
- // (and we need to call `limiter.Go`, instead of `go` as required
424
- // by goproc/limiter semantics. note: limiter.Go is not LimitedGo.)
425
- limiter .Go (func (p process.Process ) {
426
-
427
- // returns whatever ratelimiting is acceptable for workerAddr.
428
- // may not rate limit at all.
429
- rl := s .addrDialRateLimit (workerAddr )
430
- rl <- struct {}{}
431
-
432
- limiter .LimitedGo (func (worker process.Process ) {
433
- dialSingleAddr (workerAddr )
434
- })
435
-
436
- <- rl
437
- })
412
+ limiter := make (chan struct {}, 8 )
413
+ // permute addrs so we try different sets first each time.
414
+ for _ , addr := range remoteAddrs {
415
+ select {
416
+ case <- foundConn : // if one of them succeeded already
417
+ return
418
+ case <- ctx .Done (): // our context was cancelled
419
+ return
420
+ case limiter <- struct {}{}:
421
+ // continue
422
+ }
438
423
424
+ // returns whatever ratelimiting is acceptable for workerAddr.
425
+ // may not rate limit at all.
426
+ rl := s .addrDialRateLimit (addr )
427
+ select {
428
+ case <- foundConn : // if one of them succeeded already
429
+ return
430
+ case <- ctx .Done (): // our context was cancelled
431
+ return
432
+ case rl <- struct {}{}:
433
+ // continue
439
434
}
440
- })
441
435
442
- processctx .CloseAfterContext (limiter , ctx )
436
+ // we have to do the waiting concurrently because there are addrs
437
+ // that SHOULD NOT be rate limited (utp), nor blocked by other
438
+ // rate limited addrs (tcp).
439
+ go func (rlc <- chan struct {}, a ma.Multiaddr ) {
440
+ defer func () {
441
+ <- limiter
442
+ <- rlc
443
+ }()
444
+ dialSingleAddr (a )
445
+ }(rl , addr )
446
+
447
+ }
443
448
}()
444
449
445
450
// wair fot the results.
@@ -533,3 +538,63 @@ func isTCPMultiaddr(a ma.Multiaddr) bool {
533
538
p := a .Protocols ()
534
539
return len (p ) == 2 && (p [0 ].Name == "ip4" || p [0 ].Name == "ip6" ) && p [1 ].Name == "tcp"
535
540
}
541
+
542
+ func isDefaultDockerRange (a ma.Multiaddr ) bool {
543
+ parts := strings .Split (a .String (), "/" )
544
+ if len (parts ) != 5 {
545
+ return false
546
+ }
547
+
548
+ if parts [1 ] == "ip4" && strings .HasPrefix (parts [2 ], "172.17." ) {
549
+ return true
550
+ }
551
+
552
+ return false
553
+ }
554
+
555
+ type AddrList []ma.Multiaddr
556
+
557
+ func (al AddrList ) Len () int {
558
+ return len (al )
559
+ }
560
+
561
+ func (al AddrList ) Swap (i , j int ) {
562
+ al [i ], al [j ] = al [j ], al [i ]
563
+ }
564
+
565
+ func (al AddrList ) Less (i , j int ) bool {
566
+ a := al [i ]
567
+ b := al [j ]
568
+
569
+ // dial utp and similar 'non-fd-consuming' addresses first
570
+ if ! isFDCostlyTransport (a ) {
571
+ if isFDCostlyTransport (b ) {
572
+ return true
573
+ }
574
+
575
+ // if neither consume fd's, assume equal ordering
576
+ return false
577
+ }
578
+
579
+ // dial localhost addresses next, they should fail immediately
580
+ if manet .IsIPLoopback (a ) {
581
+ if ! manet .IsIPLoopback (b ) {
582
+ return true
583
+ }
584
+
585
+ // both local? equal
586
+ return false
587
+ }
588
+
589
+ // docker addresses should be tried last. they very rarely work.
590
+ if isDefaultDockerRange (a ) {
591
+ return false
592
+ }
593
+
594
+ if isDefaultDockerRange (b ) {
595
+ return true
596
+ }
597
+
598
+ // for the rest, just sort by bytes
599
+ return bytes .Compare (a .Bytes (), b .Bytes ()) > 0
600
+ }
0 commit comments