Skip to content

Commit ece43a5

Browse files
committedNov 10, 2015
Merge pull request #1937 from ipfs/refactor/transport
refactor net code to use transports, in rough accordance with libp2p
2 parents 6ad200e + 94647ed commit ece43a5

File tree

11 files changed

+483
-213
lines changed

11 files changed

+483
-213
lines changed
 

‎metrics/conn/conn.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
package meterconn
22

33
import (
4-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
54
metrics "github.com/ipfs/go-ipfs/metrics"
5+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
66
)
77

88
type MeteredConn struct {
99
mesRecv metrics.MeterCallback
1010
mesSent metrics.MeterCallback
1111

12-
manet.Conn
12+
transport.Conn
1313
}
1414

15-
func WrapConn(bwc metrics.Reporter, c manet.Conn) manet.Conn {
15+
func WrapConn(bwc metrics.Reporter, c transport.Conn) transport.Conn {
1616
return newMeteredConn(c, bwc.LogRecvMessage, bwc.LogSentMessage)
1717
}
1818

19-
func newMeteredConn(base manet.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) manet.Conn {
19+
func newMeteredConn(base transport.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) transport.Conn {
2020
return &MeteredConn{
2121
Conn: base,
2222
mesRecv: rcb,

‎p2p/net/conn/dial.go

+31-100
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,32 @@ package conn
33
import (
44
"fmt"
55
"math/rand"
6-
"net"
76
"strings"
8-
"syscall"
97

108
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
119
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
12-
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
1310
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1411
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
1512

13+
ci "github.com/ipfs/go-ipfs/p2p/crypto"
1614
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
15+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1716
peer "github.com/ipfs/go-ipfs/p2p/peer"
1817
)
1918

19+
type WrapFunc func(transport.Conn) transport.Conn
20+
21+
func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer {
22+
return &Dialer{
23+
LocalPeer: p,
24+
PrivateKey: pk,
25+
Wrapper: wrap,
26+
}
27+
}
28+
2029
// String returns the string rep of d.
2130
func (d *Dialer) String() string {
22-
return fmt.Sprintf("<Dialer %s %s ...>", d.LocalPeer, d.LocalAddrs[0])
31+
return fmt.Sprintf("<Dialer %s ...>", d.LocalPeer)
2332
}
2433

2534
// Dial connects to a peer over a particular address
@@ -95,112 +104,34 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
95104
return connOut, nil
96105
}
97106

98-
// rawConnDial dials the underlying net.Conn + manet.Conns
99-
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) {
100-
101-
// before doing anything, check we're going to be able to dial.
102-
// we may not support the given address.
103-
if _, _, err := manet.DialArgs(raddr); err != nil {
104-
return nil, err
105-
}
106-
107-
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
108-
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
109-
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
110-
}
111-
112-
// get local addr to use.
113-
laddr := pickLocalAddr(d.LocalAddrs, raddr)
114-
logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr)
115-
defer log.EventBegin(ctx, "connDialRawConn", logdial).Done()
116-
117-
// make a copy of the manet.Dialer, we may need to change its timeout.
118-
madialer := d.Dialer
119-
120-
if laddr != nil && reuseportIsAvailable() {
121-
// we're perhaps going to dial twice. half the timeout, so we can afford to.
122-
// otherwise our context would expire right after the first dial.
123-
madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2)
124-
125-
// dial using reuseport.Dialer, because we're probably reusing addrs.
126-
// this is optimistic, as the reuseDial may fail to bind the port.
127-
rpev := log.EventBegin(ctx, "connDialReusePort", logdial)
128-
if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil {
129-
// if it worked, wrap the raw net.Conn with our manet.Conn
130-
logdial["reuseport"] = "success"
131-
rpev.Done()
132-
return manet.WrapNetConn(nconn)
133-
} else if !retry {
134-
// reuseDial is sure this is a legitimate dial failure, not a reuseport failure.
135-
logdial["reuseport"] = "failure"
136-
logdial["error"] = reuseErr
137-
rpev.Done()
138-
return nil, reuseErr
139-
} else {
140-
// this is a failure to reuse port. log it.
141-
logdial["reuseport"] = "retry"
142-
logdial["error"] = reuseErr
143-
rpev.Done()
144-
}
145-
}
146-
147-
defer log.EventBegin(ctx, "connDialManet", logdial).Done()
148-
return madialer.Dial(raddr)
107+
func (d *Dialer) AddDialer(pd transport.Dialer) {
108+
d.Dialers = append(d.Dialers, pd)
149109
}
150110

151-
func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) {
152-
if laddr == nil {
153-
// if we're given no local address no sense in using reuseport to dial, dial out as usual.
154-
return nil, true, reuseport.ErrReuseFailed
155-
}
156-
157-
// give reuse.Dialer the manet.Dialer's Dialer.
158-
// (wow, Dialer should've so been an interface...)
159-
rd := reuseport.Dialer{dialer}
160-
161-
// get the local net.Addr manually
162-
rd.D.LocalAddr, err = manet.ToNetAddr(laddr)
163-
if err != nil {
164-
return nil, true, err // something wrong with laddr. retry without.
165-
}
166-
167-
// get the raddr dial args for rd.dial
168-
network, netraddr, err := manet.DialArgs(raddr)
169-
if err != nil {
170-
return nil, true, err // something wrong with laddr. retry without.
111+
// returns dialer that can dial the given address
112+
func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer {
113+
for _, pd := range d.Dialers {
114+
if pd.Matches(raddr) {
115+
return pd
116+
}
171117
}
172118

173-
// rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set.
174-
conn, err = rd.Dial(network, netraddr)
175-
return conn, reuseErrShouldRetry(err), err // hey! it worked!
119+
return nil
176120
}
177121

178-
// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
179-
// if we failed to bind, we should retry. if bind worked and this is a
180-
// real dial error (remote end didnt answer) then we should not retry.
181-
func reuseErrShouldRetry(err error) bool {
182-
if err == nil {
183-
return false // hey, it worked! no need to retry.
184-
}
185-
186-
// if it's a network timeout error, it's a legitimate failure.
187-
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
188-
return false
122+
// rawConnDial dials the underlying net.Conn + manet.Conns
123+
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (transport.Conn, error) {
124+
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
125+
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
126+
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
189127
}
190128

191-
errno, ok := err.(syscall.Errno)
192-
if !ok { // not an errno? who knows what this is. retry.
193-
return true
129+
sd := d.subDialerForAddr(raddr)
130+
if sd == nil {
131+
return nil, fmt.Errorf("no dialer for %s", raddr)
194132
}
195133

196-
switch errno {
197-
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
198-
return true // failure to bind. retry.
199-
case syscall.ECONNREFUSED:
200-
return false // real dial error
201-
default:
202-
return true // optimistically default to retry.
203-
}
134+
return sd.Dial(raddr)
204135
}
205136

206137
func pickLocalAddr(laddrs []ma.Multiaddr, raddr ma.Multiaddr) (laddr ma.Multiaddr) {

‎p2p/net/conn/dial_test.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@ import (
88
"testing"
99
"time"
1010

11+
ic "github.com/ipfs/go-ipfs/p2p/crypto"
12+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
13+
peer "github.com/ipfs/go-ipfs/p2p/peer"
1114
tu "github.com/ipfs/go-ipfs/util/testutil"
1215

16+
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
1317
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1418
)
1519

@@ -49,6 +53,25 @@ func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.Pe
4953
return setupConn(t, ctx, false)
5054
}
5155

56+
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
57+
list, err := transport.NewTCPTransport().Listen(addr)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
return WrapTransportListener(ctx, list, local, sk)
63+
}
64+
65+
func dialer(t *testing.T, a ma.Multiaddr) transport.Dialer {
66+
tpt := transport.NewTCPTransport()
67+
tptd, err := tpt.Dialer(a)
68+
if err != nil {
69+
t.Fatal(err)
70+
}
71+
72+
return tptd
73+
}
74+
5275
func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 tu.PeerNetParams) {
5376

5477
p1 = tu.RandPeerNetParamsOrFatal(t)
@@ -71,6 +94,8 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
7194
PrivateKey: key2,
7295
}
7396

97+
d2.AddDialer(dialer(t, p2.Addr))
98+
7499
var c2 Conn
75100

76101
done := make(chan error)
@@ -152,6 +177,7 @@ func testDialer(t *testing.T, secure bool) {
152177
LocalPeer: p2.ID,
153178
PrivateKey: key2,
154179
}
180+
d2.AddDialer(dialer(t, p2.Addr))
155181

156182
go echoListen(ctx, l1)
157183

@@ -227,6 +253,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
227253
LocalPeer: p2.ID,
228254
// PrivateKey: key2, -- dont give it key. we'll just close the conn.
229255
}
256+
d2.AddDialer(dialer(t, p2.Addr))
230257

231258
errs := make(chan error, 100)
232259
done := make(chan struct{}, 1)
@@ -253,7 +280,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
253280

254281
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
255282
if err != nil {
256-
errs <- err
283+
t.Fatal(err)
257284
}
258285
c.Close() // close it early.
259286

‎p2p/net/conn/interface.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88
key "github.com/ipfs/go-ipfs/blocks/key"
99
ic "github.com/ipfs/go-ipfs/p2p/crypto"
1010
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
11+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1112
peer "github.com/ipfs/go-ipfs/p2p/peer"
1213

1314
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
1415
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
15-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
1616
)
1717

1818
// Map maps Keys (Peer.IDs) to Connections.
@@ -54,22 +54,22 @@ type Conn interface {
5454
// Dial function as before, but it would have many arguments, as dialing is
5555
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
5656
type Dialer struct {
57-
58-
// Dialer is an optional manet.Dialer to use.
59-
Dialer manet.Dialer
60-
6157
// LocalPeer is the identity of the local Peer.
6258
LocalPeer peer.ID
6359

6460
// LocalAddrs is a set of local addresses to use.
65-
LocalAddrs []ma.Multiaddr
61+
//LocalAddrs []ma.Multiaddr
62+
63+
// Dialers are the sub-dialers usable by this dialer
64+
// selected in order based on the address being dialed
65+
Dialers []transport.Dialer
6666

6767
// PrivateKey used to initialize a secure connection.
6868
// Warning: if PrivateKey is nil, connection will not be secured.
6969
PrivateKey ic.PrivKey
7070

7171
// Wrapper to wrap the raw connection (optional)
72-
Wrapper func(manet.Conn) manet.Conn
72+
Wrapper WrapFunc
7373
}
7474

7575
// Listener is an object that can accept connections. It matches net.Listener

‎p2p/net/conn/listen.go

+4-31
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,23 @@ import (
66
"net"
77

88
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
9-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
10-
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
119
tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher"
1210
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
1311
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
1412
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1513

1614
ic "github.com/ipfs/go-ipfs/p2p/crypto"
1715
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
16+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1817
peer "github.com/ipfs/go-ipfs/p2p/peer"
1918
)
2019

2120
// ConnWrapper is any function that wraps a raw multiaddr connection
22-
type ConnWrapper func(manet.Conn) manet.Conn
21+
type ConnWrapper func(transport.Conn) transport.Conn
2322

2423
// listener is an object that can accept connections. It implements Listener
2524
type listener struct {
26-
manet.Listener
25+
transport.Listener
2726

2827
local peer.ID // LocalPeer is the identity of the local Peer
2928
privk ic.PrivKey // private key to use to initialize secure conns
@@ -147,13 +146,7 @@ func (l *listener) Loggable() map[string]interface{} {
147146
}
148147
}
149148

150-
// Listen listens on the particular multiaddr, with given peer and peerstore.
151-
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
152-
ml, err := manetListen(addr)
153-
if err != nil {
154-
return nil, err
155-
}
156-
149+
func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) {
157150
l := &listener{
158151
Listener: ml,
159152
local: local,
@@ -175,23 +168,3 @@ type ListenerConnWrapper interface {
175168
func (l *listener) SetConnWrapper(cw ConnWrapper) {
176169
l.wrapper = cw
177170
}
178-
179-
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
180-
network, naddr, err := manet.DialArgs(addr)
181-
if err != nil {
182-
return nil, err
183-
}
184-
185-
if reuseportIsAvailable() {
186-
nl, err := reuseport.Listen(network, naddr)
187-
if err == nil {
188-
// hey, it worked!
189-
return manet.WrapNetListener(nl)
190-
}
191-
// reuseport is available, but we failed to listen. log debug, and retry normally.
192-
log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
193-
}
194-
195-
// either reuseport not available, or it failed. try normally.
196-
return manet.Listen(addr)
197-
}

‎p2p/net/swarm/swarm.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ import (
88
"time"
99

1010
metrics "github.com/ipfs/go-ipfs/metrics"
11+
mconn "github.com/ipfs/go-ipfs/metrics/conn"
1112
inet "github.com/ipfs/go-ipfs/p2p/net"
13+
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
1214
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
1315
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
16+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1417
peer "github.com/ipfs/go-ipfs/p2p/peer"
1518
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
1619

@@ -58,9 +61,13 @@ type Swarm struct {
5861
backf dialbackoff
5962
dialT time.Duration // mainly for tests
6063

64+
dialer *conn.Dialer
65+
6166
notifmu sync.RWMutex
6267
notifs map[inet.Notifiee]ps.Notifiee
6368

69+
transports []transport.Transport
70+
6471
// filters for addresses that shouldnt be dialed
6572
Filters *filter.Filters
6673

@@ -81,16 +88,22 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
8188
return nil, err
8289
}
8390

91+
wrap := func(c transport.Conn) transport.Conn {
92+
return mconn.WrapConn(bwc, c)
93+
}
94+
8495
s := &Swarm{
8596
swarm: ps.NewSwarm(PSTransport),
8697
local: local,
8798
peers: peers,
8899
ctx: ctx,
89100
dialT: DialTimeout,
90101
notifs: make(map[inet.Notifiee]ps.Notifiee),
102+
transports: []transport.Transport{transport.NewTCPTransport()},
91103
bwc: bwc,
92104
fdRateLimit: make(chan struct{}, concurrentFdDials),
93105
Filters: filter.NewFilters(),
106+
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
94107
}
95108

96109
// configure Swarm
@@ -101,7 +114,12 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
101114
prom.MustRegisterOrGet(peersTotal)
102115
s.Notify((*metricsNotifiee)(s))
103116

104-
return s, s.listen(listenAddrs)
117+
err = s.setupInterfaces(listenAddrs)
118+
if err != nil {
119+
return nil, err
120+
}
121+
122+
return s, nil
105123
}
106124

107125
func (s *Swarm) teardown() error {
@@ -134,7 +152,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
134152
return err
135153
}
136154

137-
return s.listen(addrs)
155+
return s.setupInterfaces(addrs)
138156
}
139157

140158
// Process returns the Process of the swarm

‎p2p/net/swarm/swarm_dial.go

+6-31
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,17 @@ import (
44
"bytes"
55
"errors"
66
"fmt"
7-
"net"
87
"sort"
98
"sync"
109
"time"
1110

12-
mconn "github.com/ipfs/go-ipfs/metrics/conn"
11+
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
1312
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
1413
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
1514
peer "github.com/ipfs/go-ipfs/p2p/peer"
1615
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
1716

1817
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
19-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
2018
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
2119
)
2220

@@ -289,14 +287,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
289287
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
290288
}
291289

292-
// get our own addrs. try dialing out from our listener addresses (reusing ports)
293-
// Note that using our peerstore's addresses here is incorrect, as that would
294-
// include observed addresses. TODO: make peerstore's address book smarter.
295-
localAddrs := s.ListenAddresses()
296-
if len(localAddrs) == 0 {
297-
log.Debug("Dialing out with no local addresses.")
298-
}
299-
300290
// get remote peer addrs
301291
remoteAddrs := s.peers.Addrs(p)
302292
// make sure we can use the addresses.
@@ -321,23 +311,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
321311
return nil, err
322312
}
323313

324-
// open connection to peer
325-
d := &conn.Dialer{
326-
Dialer: manet.Dialer{
327-
Dialer: net.Dialer{
328-
Timeout: s.dialT,
329-
},
330-
},
331-
LocalPeer: s.local,
332-
LocalAddrs: localAddrs,
333-
PrivateKey: sk,
334-
Wrapper: func(c manet.Conn) manet.Conn {
335-
return mconn.WrapConn(s.bwc, c)
336-
},
337-
}
338-
339314
// try to get a connection to any addr
340-
connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
315+
connC, err := s.dialAddrs(ctx, p, remoteAddrs)
341316
if err != nil {
342317
logdial["error"] = err
343318
return nil, err
@@ -357,7 +332,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
357332
return swarmC, nil
358333
}
359334

360-
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
335+
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
361336

362337
// sort addresses so preferred addresses are dialed sooner
363338
sort.Sort(AddrList(remoteAddrs))
@@ -381,7 +356,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
381356
connsout := conns
382357
errsout := errs
383358

384-
connC, err := s.dialAddr(ctx, d, p, addr)
359+
connC, err := s.dialAddr(ctx, p, addr)
385360
if err != nil {
386361
connsout = nil
387362
} else if connC == nil {
@@ -451,10 +426,10 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
451426
return nil, exitErr
452427
}
453428

454-
func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
429+
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
455430
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
456431

457-
connC, err := d.Dial(ctx, addr, p)
432+
connC, err := s.dialer.Dial(ctx, addr, p)
458433
if err != nil {
459434
return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
460435
}

‎p2p/net/swarm/swarm_listen.go

+54-35
Original file line numberDiff line numberDiff line change
@@ -6,77 +6,94 @@ import (
66
mconn "github.com/ipfs/go-ipfs/metrics/conn"
77
inet "github.com/ipfs/go-ipfs/p2p/net"
88
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
9-
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
9+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1010
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
1111

1212
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
13-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
1413
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
1514
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
16-
multierr "github.com/ipfs/go-ipfs/thirdparty/multierr"
1715
)
1816

19-
// Open listeners for each network the swarm should listen on
20-
func (s *Swarm) listen(addrs []ma.Multiaddr) error {
17+
// Open listeners and reuse-dialers for the given addresses
18+
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
19+
errs := make([]error, len(addrs))
20+
var succeeded int
21+
for i, a := range addrs {
22+
tpt := s.transportForAddr(a)
23+
if tpt == nil {
24+
errs[i] = fmt.Errorf("no transport for address: %s", a)
25+
continue
26+
}
2127

22-
for _, addr := range addrs {
23-
if !addrutil.AddrUsable(addr, true) {
24-
return fmt.Errorf("cannot use addr: %s", addr)
28+
d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
29+
if err != nil {
30+
errs[i] = err
31+
continue
2532
}
26-
}
2733

28-
retErr := multierr.New()
34+
s.dialer.AddDialer(d)
2935

30-
// listen on every address
31-
for i, addr := range addrs {
32-
err := s.setupListener(addr)
36+
list, err := tpt.Listen(a)
3337
if err != nil {
34-
if retErr.Errors == nil {
35-
retErr.Errors = make([]error, len(addrs))
36-
}
37-
retErr.Errors[i] = err
38-
log.Debugf("Failed to listen on: %s - %s", addr, err)
38+
errs[i] = err
39+
continue
40+
}
41+
42+
err = s.addListener(list)
43+
if err != nil {
44+
errs[i] = err
45+
continue
3946
}
47+
succeeded++
4048
}
4149

42-
if retErr.Errors != nil {
43-
return retErr
50+
for i, e := range errs {
51+
if e != nil {
52+
log.Warning("listen on %s failed: %s", addrs[i], errs[i])
53+
}
4454
}
55+
if succeeded == 0 && len(addrs) > 0 {
56+
return fmt.Errorf("failed to listen on any addresses: %s", errs)
57+
}
58+
4559
return nil
4660
}
4761

48-
// Listen for new connections on the given multiaddr
49-
func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
62+
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
63+
for _, t := range s.transports {
64+
if t.Matches(a) {
65+
return t
66+
}
67+
}
68+
69+
return nil
70+
}
5071

51-
// TODO rethink how this has to work. (jbenet)
52-
//
53-
// resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
54-
// if err != nil {
55-
// return err
56-
// }
57-
// for _, a := range resolved {
58-
// s.peers.AddAddr(s.local, a)
59-
// }
72+
func (s *Swarm) addListener(tptlist transport.Listener) error {
6073

6174
sk := s.peers.PrivKey(s.local)
6275
if sk == nil {
6376
// may be fine for sk to be nil, just log a warning.
6477
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
6578
}
66-
log.Debugf("Swarm Listening at %s", maddr)
67-
list, err := conn.Listen(s.Context(), maddr, s.local, sk)
79+
80+
list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk)
6881
if err != nil {
6982
return err
7083
}
7184

7285
list.SetAddrFilters(s.Filters)
7386

7487
if cw, ok := list.(conn.ListenerConnWrapper); ok {
75-
cw.SetConnWrapper(func(c manet.Conn) manet.Conn {
88+
cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
7689
return mconn.WrapConn(s.bwc, c)
7790
})
7891
}
7992

93+
return s.addConnListener(list)
94+
}
95+
96+
func (s *Swarm) addConnListener(list conn.Listener) error {
8097
// AddListener to the peerstream Listener. this will begin accepting connections
8198
// and streams!
8299
sl, err := s.swarm.AddListener(list)
@@ -85,6 +102,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
85102
}
86103
log.Debugf("Swarm Listeners at %s", s.ListenAddresses())
87104

105+
maddr := list.Multiaddr()
106+
88107
// signal to our notifiees on successful conn.
89108
s.notifyAll(func(n inet.Notifiee) {
90109
n.Listen((*Network)(s), maddr)
@@ -107,7 +126,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
107126
if !more {
108127
return
109128
}
110-
log.Warningf("swarm listener accept error: %s", err)
129+
log.Errorf("swarm listener accept error: %s", err)
111130
case <-ctx.Done():
112131
return
113132
}

‎p2p/net/conn/reuseport.go ‎p2p/net/transport/reuseport.go

+32-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
package conn
1+
package transport
22

33
import (
4+
"net"
45
"os"
56
"strings"
7+
"syscall"
68

79
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
810
)
@@ -30,6 +32,34 @@ func init() {
3032
//
3133
// If this becomes a sought after feature, we could add this to the config.
3234
// In the end, reuseport is a stop-gap.
33-
func reuseportIsAvailable() bool {
35+
func ReuseportIsAvailable() bool {
3436
return envReuseportVal && reuseport.Available()
3537
}
38+
39+
// ReuseErrShouldRetry diagnoses whether to retry after a reuse error.
40+
// if we failed to bind, we should retry. if bind worked and this is a
41+
// real dial error (remote end didnt answer) then we should not retry.
42+
func ReuseErrShouldRetry(err error) bool {
43+
if err == nil {
44+
return false // hey, it worked! no need to retry.
45+
}
46+
47+
// if it's a network timeout error, it's a legitimate failure.
48+
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
49+
return false
50+
}
51+
52+
errno, ok := err.(syscall.Errno)
53+
if !ok { // not an errno? who knows what this is. retry.
54+
return true
55+
}
56+
57+
switch errno {
58+
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
59+
return true // failure to bind. retry.
60+
case syscall.ECONNREFUSED:
61+
return false // real dial error
62+
default:
63+
return true // optimistically default to retry.
64+
}
65+
}

‎p2p/net/transport/tcp.go

+236
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
package transport
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"sync"
7+
"time"
8+
9+
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
10+
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
11+
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
12+
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
13+
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
14+
)
15+
16+
type TcpTransport struct {
17+
dlock sync.Mutex
18+
dialers map[string]Dialer
19+
20+
llock sync.Mutex
21+
listeners map[string]Listener
22+
}
23+
24+
func NewTCPTransport() *TcpTransport {
25+
return &TcpTransport{
26+
dialers: make(map[string]Dialer),
27+
listeners: make(map[string]Listener),
28+
}
29+
}
30+
31+
func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) {
32+
t.dlock.Lock()
33+
defer t.dlock.Unlock()
34+
s := laddr.String()
35+
d, found := t.dialers[s]
36+
if found {
37+
return d, nil
38+
}
39+
var base manet.Dialer
40+
41+
var doReuse bool
42+
for _, o := range opts {
43+
switch o := o.(type) {
44+
case TimeoutOpt:
45+
base.Timeout = time.Duration(o)
46+
case ReuseportOpt:
47+
doReuse = bool(o)
48+
default:
49+
return nil, fmt.Errorf("unrecognized option: %#v", o)
50+
}
51+
}
52+
53+
tcpd, err := t.newTcpDialer(base, laddr, doReuse)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
t.dialers[s] = tcpd
59+
return tcpd, nil
60+
}
61+
62+
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (Listener, error) {
63+
t.llock.Lock()
64+
defer t.llock.Unlock()
65+
s := laddr.String()
66+
l, found := t.listeners[s]
67+
if found {
68+
return l, nil
69+
}
70+
71+
list, err := manetListen(laddr)
72+
if err != nil {
73+
return nil, err
74+
}
75+
76+
tlist := &tcpListener{
77+
list: list,
78+
transport: t,
79+
}
80+
81+
t.listeners[s] = tlist
82+
return tlist, nil
83+
}
84+
85+
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
86+
network, naddr, err := manet.DialArgs(addr)
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
if ReuseportIsAvailable() {
92+
nl, err := reuseport.Listen(network, naddr)
93+
if err == nil {
94+
// hey, it worked!
95+
return manet.WrapNetListener(nl)
96+
}
97+
// reuseport is available, but we failed to listen. log debug, and retry normally.
98+
log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
99+
}
100+
101+
// either reuseport not available, or it failed. try normally.
102+
return manet.Listen(addr)
103+
}
104+
105+
func (t *TcpTransport) Matches(a ma.Multiaddr) bool {
106+
return IsTcpMultiaddr(a)
107+
}
108+
109+
type tcpDialer struct {
110+
laddr ma.Multiaddr
111+
112+
doReuse bool
113+
114+
rd reuseport.Dialer
115+
madialer manet.Dialer
116+
117+
transport Transport
118+
}
119+
120+
func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr, doReuse bool) (*tcpDialer, error) {
121+
// get the local net.Addr manually
122+
la, err := manet.ToNetAddr(laddr)
123+
if err != nil {
124+
return nil, err // something wrong with laddr.
125+
}
126+
127+
if doReuse && ReuseportIsAvailable() {
128+
rd := reuseport.Dialer{
129+
D: net.Dialer{
130+
LocalAddr: la,
131+
Timeout: base.Timeout,
132+
},
133+
}
134+
135+
return &tcpDialer{
136+
doReuse: true,
137+
laddr: laddr,
138+
rd: rd,
139+
madialer: base,
140+
transport: t,
141+
}, nil
142+
}
143+
144+
return &tcpDialer{
145+
doReuse: false,
146+
laddr: laddr,
147+
madialer: base,
148+
transport: t,
149+
}, nil
150+
}
151+
152+
func (d *tcpDialer) Dial(raddr ma.Multiaddr) (Conn, error) {
153+
var c manet.Conn
154+
var err error
155+
if d.doReuse {
156+
c, err = d.reuseDial(raddr)
157+
} else {
158+
c, err = d.madialer.Dial(raddr)
159+
}
160+
161+
if err != nil {
162+
return nil, err
163+
}
164+
165+
return &connWrap{
166+
Conn: c,
167+
transport: d.transport,
168+
}, nil
169+
}
170+
171+
func (d *tcpDialer) reuseDial(raddr ma.Multiaddr) (manet.Conn, error) {
172+
logdial := lgbl.Dial("conn", "", "", d.laddr, raddr)
173+
rpev := log.EventBegin(context.TODO(), "tptDialReusePort", logdial)
174+
175+
network, netraddr, err := manet.DialArgs(raddr)
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
con, err := d.rd.Dial(network, netraddr)
181+
if err == nil {
182+
logdial["reuseport"] = "success"
183+
rpev.Done()
184+
return manet.WrapNetConn(con)
185+
}
186+
187+
if !ReuseErrShouldRetry(err) {
188+
logdial["reuseport"] = "failure"
189+
logdial["error"] = err
190+
rpev.Done()
191+
return nil, err
192+
}
193+
194+
logdial["reuseport"] = "retry"
195+
logdial["error"] = err
196+
rpev.Done()
197+
198+
return d.madialer.Dial(raddr)
199+
}
200+
201+
func (d *tcpDialer) Matches(a ma.Multiaddr) bool {
202+
return IsTcpMultiaddr(a)
203+
}
204+
205+
type tcpListener struct {
206+
list manet.Listener
207+
transport Transport
208+
}
209+
210+
func (d *tcpListener) Accept() (Conn, error) {
211+
c, err := d.list.Accept()
212+
if err != nil {
213+
return nil, err
214+
}
215+
216+
return &connWrap{
217+
Conn: c,
218+
transport: d.transport,
219+
}, nil
220+
}
221+
222+
func (d *tcpListener) Addr() net.Addr {
223+
return d.list.Addr()
224+
}
225+
226+
func (t *tcpListener) Multiaddr() ma.Multiaddr {
227+
return t.list.Multiaddr()
228+
}
229+
230+
func (t *tcpListener) NetListener() net.Listener {
231+
return t.list.NetListener()
232+
}
233+
234+
func (d *tcpListener) Close() error {
235+
return d.list.Close()
236+
}

‎p2p/net/transport/transport.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package transport
2+
3+
import (
4+
"net"
5+
"time"
6+
7+
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
8+
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
9+
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
10+
)
11+
12+
var log = logging.Logger("transport")
13+
14+
type Conn interface {
15+
manet.Conn
16+
17+
Transport() Transport
18+
}
19+
20+
type Transport interface {
21+
Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error)
22+
Listen(laddr ma.Multiaddr) (Listener, error)
23+
Matches(ma.Multiaddr) bool
24+
}
25+
26+
type Dialer interface {
27+
Dial(raddr ma.Multiaddr) (Conn, error)
28+
Matches(ma.Multiaddr) bool
29+
}
30+
31+
type Listener interface {
32+
Accept() (Conn, error)
33+
Close() error
34+
Addr() net.Addr
35+
Multiaddr() ma.Multiaddr
36+
}
37+
38+
type connWrap struct {
39+
manet.Conn
40+
transport Transport
41+
}
42+
43+
func (cw *connWrap) Transport() Transport {
44+
return cw.transport
45+
}
46+
47+
type DialOpt interface{}
48+
type TimeoutOpt time.Duration
49+
type ReuseportOpt bool
50+
51+
var ReusePorts ReuseportOpt = true
52+
53+
func IsTcpMultiaddr(a ma.Multiaddr) bool {
54+
p := a.Protocols()
55+
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
56+
}
57+
58+
func IsUtpMultiaddr(a ma.Multiaddr) bool {
59+
p := a.Protocols()
60+
return len(p) == 3 && p[2].Name == "utp"
61+
}

0 commit comments

Comments
 (0)
Please sign in to comment.