Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6de20f9

Browse files
committedNov 3, 2015
refactor net code to use transports, in rough accordance with libp2p
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent f36ada8 commit 6de20f9

File tree

11 files changed

+436
-214
lines changed

11 files changed

+436
-214
lines changed
 

‎metrics/conn/conn.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
package meterconn
22

33
import (
4-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
54
metrics "github.com/ipfs/go-ipfs/metrics"
5+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
66
)
77

88
type MeteredConn struct {
99
mesRecv metrics.MeterCallback
1010
mesSent metrics.MeterCallback
1111

12-
manet.Conn
12+
transport.Conn
1313
}
1414

15-
func WrapConn(bwc metrics.Reporter, c manet.Conn) manet.Conn {
15+
func WrapConn(bwc metrics.Reporter, c transport.Conn) transport.Conn {
1616
return newMeteredConn(c, bwc.LogRecvMessage, bwc.LogSentMessage)
1717
}
1818

19-
func newMeteredConn(base manet.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) manet.Conn {
19+
func newMeteredConn(base transport.Conn, rcb metrics.MeterCallback, scb metrics.MeterCallback) transport.Conn {
2020
return &MeteredConn{
2121
Conn: base,
2222
mesRecv: rcb,

‎p2p/net/conn/dial.go

+32-100
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,33 @@ package conn
33
import (
44
"fmt"
55
"math/rand"
6-
"net"
76
"strings"
8-
"syscall"
7+
"time"
98

109
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
1110
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
12-
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
1311
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1412
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
1513

14+
ci "github.com/ipfs/go-ipfs/p2p/crypto"
1615
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
16+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1717
peer "github.com/ipfs/go-ipfs/p2p/peer"
1818
)
1919

20+
type WrapFunc func(transport.Conn) transport.Conn
21+
22+
func NewDialer(p peer.ID, pk ci.PrivKey, tout time.Duration, wrap WrapFunc) *Dialer {
23+
return &Dialer{
24+
LocalPeer: p,
25+
PrivateKey: pk,
26+
Wrapper: wrap,
27+
}
28+
}
29+
2030
// String returns the string rep of d.
2131
func (d *Dialer) String() string {
22-
return fmt.Sprintf("<Dialer %s %s ...>", d.LocalPeer, d.LocalAddrs[0])
32+
return fmt.Sprintf("<Dialer %s ...>", d.LocalPeer)
2333
}
2434

2535
// Dial connects to a peer over a particular address
@@ -95,112 +105,34 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
95105
return connOut, nil
96106
}
97107

98-
// rawConnDial dials the underlying net.Conn + manet.Conns
99-
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (manet.Conn, error) {
100-
101-
// before doing anything, check we're going to be able to dial.
102-
// we may not support the given address.
103-
if _, _, err := manet.DialArgs(raddr); err != nil {
104-
return nil, err
105-
}
106-
107-
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
108-
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
109-
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
110-
}
111-
112-
// get local addr to use.
113-
laddr := pickLocalAddr(d.LocalAddrs, raddr)
114-
logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr)
115-
defer log.EventBegin(ctx, "connDialRawConn", logdial).Done()
116-
117-
// make a copy of the manet.Dialer, we may need to change its timeout.
118-
madialer := d.Dialer
119-
120-
if laddr != nil && reuseportIsAvailable() {
121-
// we're perhaps going to dial twice. half the timeout, so we can afford to.
122-
// otherwise our context would expire right after the first dial.
123-
madialer.Dialer.Timeout = (madialer.Dialer.Timeout / 2)
124-
125-
// dial using reuseport.Dialer, because we're probably reusing addrs.
126-
// this is optimistic, as the reuseDial may fail to bind the port.
127-
rpev := log.EventBegin(ctx, "connDialReusePort", logdial)
128-
if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil {
129-
// if it worked, wrap the raw net.Conn with our manet.Conn
130-
logdial["reuseport"] = "success"
131-
rpev.Done()
132-
return manet.WrapNetConn(nconn)
133-
} else if !retry {
134-
// reuseDial is sure this is a legitimate dial failure, not a reuseport failure.
135-
logdial["reuseport"] = "failure"
136-
logdial["error"] = reuseErr
137-
rpev.Done()
138-
return nil, reuseErr
139-
} else {
140-
// this is a failure to reuse port. log it.
141-
logdial["reuseport"] = "retry"
142-
logdial["error"] = reuseErr
143-
rpev.Done()
144-
}
145-
}
146-
147-
defer log.EventBegin(ctx, "connDialManet", logdial).Done()
148-
return madialer.Dial(raddr)
108+
func (d *Dialer) AddDialer(pd transport.Dialer) {
109+
d.Dialers = append(d.Dialers, pd)
149110
}
150111

151-
func reuseDial(dialer net.Dialer, laddr, raddr ma.Multiaddr) (conn net.Conn, retry bool, err error) {
152-
if laddr == nil {
153-
// if we're given no local address no sense in using reuseport to dial, dial out as usual.
154-
return nil, true, reuseport.ErrReuseFailed
155-
}
156-
157-
// give reuse.Dialer the manet.Dialer's Dialer.
158-
// (wow, Dialer should've so been an interface...)
159-
rd := reuseport.Dialer{dialer}
160-
161-
// get the local net.Addr manually
162-
rd.D.LocalAddr, err = manet.ToNetAddr(laddr)
163-
if err != nil {
164-
return nil, true, err // something wrong with laddr. retry without.
165-
}
166-
167-
// get the raddr dial args for rd.dial
168-
network, netraddr, err := manet.DialArgs(raddr)
169-
if err != nil {
170-
return nil, true, err // something wrong with laddr. retry without.
112+
// returns dialer that can dial the given address
113+
func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer {
114+
for _, pd := range d.Dialers {
115+
if pd.Matches(raddr) {
116+
return pd
117+
}
171118
}
172119

173-
// rd.Dial gets us a net.Conn with SO_REUSEPORT and SO_REUSEADDR set.
174-
conn, err = rd.Dial(network, netraddr)
175-
return conn, reuseErrShouldRetry(err), err // hey! it worked!
120+
return nil
176121
}
177122

178-
// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
179-
// if we failed to bind, we should retry. if bind worked and this is a
180-
// real dial error (remote end didnt answer) then we should not retry.
181-
func reuseErrShouldRetry(err error) bool {
182-
if err == nil {
183-
return false // hey, it worked! no need to retry.
184-
}
185-
186-
// if it's a network timeout error, it's a legitimate failure.
187-
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
188-
return false
123+
// rawConnDial dials the underlying net.Conn + manet.Conns
124+
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (transport.Conn, error) {
125+
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
126+
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
127+
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
189128
}
190129

191-
errno, ok := err.(syscall.Errno)
192-
if !ok { // not an errno? who knows what this is. retry.
193-
return true
130+
sd := d.subDialerForAddr(raddr)
131+
if sd == nil {
132+
return nil, fmt.Errorf("no dialer for %s", raddr)
194133
}
195134

196-
switch errno {
197-
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
198-
return true // failure to bind. retry.
199-
case syscall.ECONNREFUSED:
200-
return false // real dial error
201-
default:
202-
return true // optimistically default to retry.
203-
}
135+
return sd.Dial(raddr)
204136
}
205137

206138
func pickLocalAddr(laddrs []ma.Multiaddr, raddr ma.Multiaddr) (laddr ma.Multiaddr) {

‎p2p/net/conn/dial_test.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@ import (
88
"testing"
99
"time"
1010

11+
ic "github.com/ipfs/go-ipfs/p2p/crypto"
12+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
13+
peer "github.com/ipfs/go-ipfs/p2p/peer"
1114
tu "github.com/ipfs/go-ipfs/util/testutil"
1215

16+
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
1317
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1418
)
1519

@@ -49,6 +53,25 @@ func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.Pe
4953
return setupConn(t, ctx, false)
5054
}
5155

56+
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
57+
list, err := transport.NewTCPTransport().Listener(addr)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
return WrapTransportListener(ctx, list, local, sk)
63+
}
64+
65+
func dialer(t *testing.T, a ma.Multiaddr) transport.Dialer {
66+
tpt := transport.NewTCPTransport()
67+
tptd, err := tpt.Dialer(a)
68+
if err != nil {
69+
t.Fatal(err)
70+
}
71+
72+
return tptd
73+
}
74+
5275
func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 tu.PeerNetParams) {
5376

5477
p1 = tu.RandPeerNetParamsOrFatal(t)
@@ -71,6 +94,8 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
7194
PrivateKey: key2,
7295
}
7396

97+
d2.AddDialer(dialer(t, p2.Addr))
98+
7499
var c2 Conn
75100

76101
done := make(chan error)
@@ -152,6 +177,7 @@ func testDialer(t *testing.T, secure bool) {
152177
LocalPeer: p2.ID,
153178
PrivateKey: key2,
154179
}
180+
d2.AddDialer(dialer(t, p2.Addr))
155181

156182
go echoListen(ctx, l1)
157183

@@ -227,6 +253,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
227253
LocalPeer: p2.ID,
228254
// PrivateKey: key2, -- dont give it key. we'll just close the conn.
229255
}
256+
d2.AddDialer(dialer(t, p2.Addr))
230257

231258
errs := make(chan error, 100)
232259
done := make(chan struct{}, 1)
@@ -253,7 +280,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
253280

254281
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
255282
if err != nil {
256-
errs <- err
283+
t.Fatal(err)
257284
}
258285
c.Close() // close it early.
259286

‎p2p/net/conn/interface.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88
key "github.com/ipfs/go-ipfs/blocks/key"
99
ic "github.com/ipfs/go-ipfs/p2p/crypto"
1010
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
11+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1112
peer "github.com/ipfs/go-ipfs/p2p/peer"
1213

1314
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
1415
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
15-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
1616
)
1717

1818
// Map maps Keys (Peer.IDs) to Connections.
@@ -54,22 +54,22 @@ type Conn interface {
5454
// Dial function as before, but it would have many arguments, as dialing is
5555
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
5656
type Dialer struct {
57-
58-
// Dialer is an optional manet.Dialer to use.
59-
Dialer manet.Dialer
60-
6157
// LocalPeer is the identity of the local Peer.
6258
LocalPeer peer.ID
6359

6460
// LocalAddrs is a set of local addresses to use.
65-
LocalAddrs []ma.Multiaddr
61+
//LocalAddrs []ma.Multiaddr
62+
63+
// Dialers are the sub-dialers usable by this dialer
64+
// selected in order based on the address being dialed
65+
Dialers []transport.Dialer
6666

6767
// PrivateKey used to initialize a secure connection.
6868
// Warning: if PrivateKey is nil, connection will not be secured.
6969
PrivateKey ic.PrivKey
7070

7171
// Wrapper to wrap the raw connection (optional)
72-
Wrapper func(manet.Conn) manet.Conn
72+
Wrapper WrapFunc
7373
}
7474

7575
// Listener is an object that can accept connections. It matches net.Listener

‎p2p/net/conn/listen.go

+4-31
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,23 @@ import (
66
"net"
77

88
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
9-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
10-
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
119
tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher"
1210
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
1311
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
1412
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1513

1614
ic "github.com/ipfs/go-ipfs/p2p/crypto"
1715
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
16+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1817
peer "github.com/ipfs/go-ipfs/p2p/peer"
1918
)
2019

2120
// ConnWrapper is any function that wraps a raw multiaddr connection
22-
type ConnWrapper func(manet.Conn) manet.Conn
21+
type ConnWrapper func(transport.Conn) transport.Conn
2322

2423
// listener is an object that can accept connections. It implements Listener
2524
type listener struct {
26-
manet.Listener
25+
transport.Listener
2726

2827
local peer.ID // LocalPeer is the identity of the local Peer
2928
privk ic.PrivKey // private key to use to initialize secure conns
@@ -147,13 +146,7 @@ func (l *listener) Loggable() map[string]interface{} {
147146
}
148147
}
149148

150-
// Listen listens on the particular multiaddr, with given peer and peerstore.
151-
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
152-
ml, err := manetListen(addr)
153-
if err != nil {
154-
return nil, err
155-
}
156-
149+
func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) {
157150
l := &listener{
158151
Listener: ml,
159152
local: local,
@@ -175,23 +168,3 @@ type ListenerConnWrapper interface {
175168
func (l *listener) SetConnWrapper(cw ConnWrapper) {
176169
l.wrapper = cw
177170
}
178-
179-
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
180-
network, naddr, err := manet.DialArgs(addr)
181-
if err != nil {
182-
return nil, err
183-
}
184-
185-
if reuseportIsAvailable() {
186-
nl, err := reuseport.Listen(network, naddr)
187-
if err == nil {
188-
// hey, it worked!
189-
return manet.WrapNetListener(nl)
190-
}
191-
// reuseport is available, but we failed to listen. log debug, and retry normally.
192-
log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
193-
}
194-
195-
// either reuseport not available, or it failed. try normally.
196-
return manet.Listen(addr)
197-
}

‎p2p/net/swarm/swarm.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ import (
88
"time"
99

1010
metrics "github.com/ipfs/go-ipfs/metrics"
11+
mconn "github.com/ipfs/go-ipfs/metrics/conn"
1112
inet "github.com/ipfs/go-ipfs/p2p/net"
13+
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
1214
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
1315
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
16+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1417
peer "github.com/ipfs/go-ipfs/p2p/peer"
1518
logging "github.com/ipfs/go-ipfs/vendor/QmTBXYb6y2ZcJmoXVKk3pf9rzSEjbCg7tQaJW7RSuH14nv/go-log"
1619

@@ -58,9 +61,13 @@ type Swarm struct {
5861
backf dialbackoff
5962
dialT time.Duration // mainly for tests
6063

64+
dialer *conn.Dialer
65+
6166
notifmu sync.RWMutex
6267
notifs map[inet.Notifiee]ps.Notifiee
6368

69+
transports []transport.Transport
70+
6471
// filters for addresses that shouldnt be dialed
6572
Filters *filter.Filters
6673

@@ -81,16 +88,22 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
8188
return nil, err
8289
}
8390

91+
wrap := func(c transport.Conn) transport.Conn {
92+
return mconn.WrapConn(bwc, c)
93+
}
94+
8495
s := &Swarm{
8596
swarm: ps.NewSwarm(PSTransport),
8697
local: local,
8798
peers: peers,
8899
ctx: ctx,
89100
dialT: DialTimeout,
90101
notifs: make(map[inet.Notifiee]ps.Notifiee),
102+
transports: []transport.Transport{transport.NewTCPTransport()},
91103
bwc: bwc,
92104
fdRateLimit: make(chan struct{}, concurrentFdDials),
93105
Filters: filter.NewFilters(),
106+
dialer: conn.NewDialer(local, peers.PrivKey(local), DialTimeout, wrap),
94107
}
95108

96109
// configure Swarm
@@ -101,7 +114,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
101114
prom.MustRegisterOrGet(peersTotal)
102115
s.Notify((*metricsNotifiee)(s))
103116

104-
return s, s.listen(listenAddrs)
117+
return s, s.setupAddresses(listenAddrs)
105118
}
106119

107120
func (s *Swarm) teardown() error {
@@ -134,7 +147,7 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
134147
return err
135148
}
136149

137-
return s.listen(addrs)
150+
return s.setupAddresses(addrs)
138151
}
139152

140153
// Process returns the Process of the swarm

‎p2p/net/swarm/swarm_dial.go

+6-31
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,17 @@ import (
44
"bytes"
55
"errors"
66
"fmt"
7-
"net"
87
"sort"
98
"sync"
109
"time"
1110

12-
mconn "github.com/ipfs/go-ipfs/metrics/conn"
11+
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
1312
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
1413
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
1514
peer "github.com/ipfs/go-ipfs/p2p/peer"
1615
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
1716

1817
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
19-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
2018
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
2119
)
2220

@@ -289,14 +287,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
289287
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
290288
}
291289

292-
// get our own addrs. try dialing out from our listener addresses (reusing ports)
293-
// Note that using our peerstore's addresses here is incorrect, as that would
294-
// include observed addresses. TODO: make peerstore's address book smarter.
295-
localAddrs := s.ListenAddresses()
296-
if len(localAddrs) == 0 {
297-
log.Debug("Dialing out with no local addresses.")
298-
}
299-
300290
// get remote peer addrs
301291
remoteAddrs := s.peers.Addrs(p)
302292
// make sure we can use the addresses.
@@ -321,23 +311,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
321311
return nil, err
322312
}
323313

324-
// open connection to peer
325-
d := &conn.Dialer{
326-
Dialer: manet.Dialer{
327-
Dialer: net.Dialer{
328-
Timeout: s.dialT,
329-
},
330-
},
331-
LocalPeer: s.local,
332-
LocalAddrs: localAddrs,
333-
PrivateKey: sk,
334-
Wrapper: func(c manet.Conn) manet.Conn {
335-
return mconn.WrapConn(s.bwc, c)
336-
},
337-
}
338-
339314
// try to get a connection to any addr
340-
connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
315+
connC, err := s.dialAddrs(ctx, p, remoteAddrs)
341316
if err != nil {
342317
logdial["error"] = err
343318
return nil, err
@@ -357,7 +332,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
357332
return swarmC, nil
358333
}
359334

360-
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
335+
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
361336

362337
// sort addresses so preferred addresses are dialed sooner
363338
sort.Sort(AddrList(remoteAddrs))
@@ -381,7 +356,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
381356
connsout := conns
382357
errsout := errs
383358

384-
connC, err := s.dialAddr(ctx, d, p, addr)
359+
connC, err := s.dialAddr(ctx, p, addr)
385360
if err != nil {
386361
connsout = nil
387362
} else if connC == nil {
@@ -451,10 +426,10 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
451426
return nil, exitErr
452427
}
453428

454-
func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
429+
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
455430
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
456431

457-
connC, err := d.Dial(ctx, addr, p)
432+
connC, err := s.dialer.Dial(ctx, addr, p)
458433
if err != nil {
459434
return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
460435
}

‎p2p/net/swarm/swarm_listen.go

+39-36
Original file line numberDiff line numberDiff line change
@@ -6,77 +6,78 @@ import (
66
mconn "github.com/ipfs/go-ipfs/metrics/conn"
77
inet "github.com/ipfs/go-ipfs/p2p/net"
88
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
9-
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
9+
transport "github.com/ipfs/go-ipfs/p2p/net/transport"
1010
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
1111

1212
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
13-
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
1413
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
1514
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
16-
multierr "github.com/ipfs/go-ipfs/thirdparty/multierr"
1715
)
1816

19-
// Open listeners for each network the swarm should listen on
20-
func (s *Swarm) listen(addrs []ma.Multiaddr) error {
17+
// Open listeners and reuse-dialers for the given addresses
18+
func (s *Swarm) setupAddresses(addrs []ma.Multiaddr) error {
19+
for _, a := range addrs {
20+
tpt := s.transportForAddr(a)
21+
if tpt == nil {
22+
return fmt.Errorf("no transport for address: %s", a)
23+
}
2124

22-
for _, addr := range addrs {
23-
if !addrutil.AddrUsable(addr, true) {
24-
return fmt.Errorf("cannot use addr: %s", addr)
25+
d, err := tpt.Dialer(a)
26+
if err != nil {
27+
return err
2528
}
26-
}
2729

28-
retErr := multierr.New()
30+
s.dialer.AddDialer(d)
2931

30-
// listen on every address
31-
for i, addr := range addrs {
32-
err := s.setupListener(addr)
32+
list, err := tpt.Listener(a)
3333
if err != nil {
34-
if retErr.Errors == nil {
35-
retErr.Errors = make([]error, len(addrs))
36-
}
37-
retErr.Errors[i] = err
38-
log.Debugf("Failed to listen on: %s - %s", addr, err)
34+
return err
3935
}
40-
}
4136

42-
if retErr.Errors != nil {
43-
return retErr
37+
err = s.addListener(list)
38+
if err != nil {
39+
return err
40+
}
4441
}
42+
4543
return nil
4644
}
4745

48-
// Listen for new connections on the given multiaddr
49-
func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
46+
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
47+
for _, t := range s.transports {
48+
if t.Matches(a) {
49+
return t
50+
}
51+
}
52+
53+
return nil
54+
}
5055

51-
// TODO rethink how this has to work. (jbenet)
52-
//
53-
// resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
54-
// if err != nil {
55-
// return err
56-
// }
57-
// for _, a := range resolved {
58-
// s.peers.AddAddr(s.local, a)
59-
// }
56+
func (s *Swarm) addListener(tptlist transport.Listener) error {
6057

6158
sk := s.peers.PrivKey(s.local)
6259
if sk == nil {
6360
// may be fine for sk to be nil, just log a warning.
6461
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
6562
}
66-
log.Debugf("Swarm Listening at %s", maddr)
67-
list, err := conn.Listen(s.Context(), maddr, s.local, sk)
63+
64+
list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk)
6865
if err != nil {
6966
return err
7067
}
7168

7269
list.SetAddrFilters(s.Filters)
7370

7471
if cw, ok := list.(conn.ListenerConnWrapper); ok {
75-
cw.SetConnWrapper(func(c manet.Conn) manet.Conn {
72+
cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
7673
return mconn.WrapConn(s.bwc, c)
7774
})
7875
}
7976

77+
return s.addConnListener(list)
78+
}
79+
80+
func (s *Swarm) addConnListener(list conn.Listener) error {
8081
// AddListener to the peerstream Listener. this will begin accepting connections
8182
// and streams!
8283
sl, err := s.swarm.AddListener(list)
@@ -85,6 +86,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
8586
}
8687
log.Debugf("Swarm Listeners at %s", s.ListenAddresses())
8788

89+
maddr := list.Multiaddr()
90+
8891
// signal to our notifiees on successful conn.
8992
s.notifyAll(func(n inet.Notifiee) {
9093
n.Listen((*Network)(s), maddr)
@@ -107,7 +110,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
107110
if !more {
108111
return
109112
}
110-
log.Warningf("swarm listener accept error: %s", err)
113+
log.Errorf("swarm listener accept error: %s", err)
111114
case <-ctx.Done():
112115
return
113116
}

‎p2p/net/conn/reuseport.go ‎p2p/net/transport/reuseport.go

+32-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
package conn
1+
package transport
22

33
import (
4+
"net"
45
"os"
56
"strings"
7+
"syscall"
68

79
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
810
)
@@ -30,6 +32,34 @@ func init() {
3032
//
3133
// If this becomes a sought after feature, we could add this to the config.
3234
// In the end, reuseport is a stop-gap.
33-
func reuseportIsAvailable() bool {
35+
func ReuseportIsAvailable() bool {
3436
return envReuseportVal && reuseport.Available()
3537
}
38+
39+
// ReuseErrShouldRetry diagnoses whether to retry after a reuse error.
40+
// if we failed to bind, we should retry. if bind worked and this is a
41+
// real dial error (remote end didnt answer) then we should not retry.
42+
func ReuseErrShouldRetry(err error) bool {
43+
if err == nil {
44+
return false // hey, it worked! no need to retry.
45+
}
46+
47+
// if it's a network timeout error, it's a legitimate failure.
48+
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
49+
return false
50+
}
51+
52+
errno, ok := err.(syscall.Errno)
53+
if !ok { // not an errno? who knows what this is. retry.
54+
return true
55+
}
56+
57+
switch errno {
58+
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
59+
return true // failure to bind. retry.
60+
case syscall.ECONNREFUSED:
61+
return false // real dial error
62+
default:
63+
return true // optimistically default to retry.
64+
}
65+
}

‎p2p/net/transport/tcp.go

+215
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package transport
2+
3+
import (
4+
"net"
5+
"sync"
6+
7+
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
8+
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
9+
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
10+
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
11+
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
12+
)
13+
14+
type TcpTransport struct {
15+
dlock sync.Mutex
16+
dialers map[string]Dialer
17+
18+
llock sync.Mutex
19+
listeners map[string]Listener
20+
}
21+
22+
func NewTCPTransport() *TcpTransport {
23+
return &TcpTransport{
24+
dialers: make(map[string]Dialer),
25+
listeners: make(map[string]Listener),
26+
}
27+
}
28+
29+
func (t *TcpTransport) Dialer(laddr ma.Multiaddr) (Dialer, error) {
30+
t.dlock.Lock()
31+
defer t.dlock.Unlock()
32+
s := laddr.String()
33+
d, found := t.dialers[s]
34+
if found {
35+
return d, nil
36+
}
37+
38+
var base manet.Dialer
39+
tcpd, err := t.newTcpDialer(base, laddr)
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
t.dialers[s] = tcpd
45+
return tcpd, nil
46+
}
47+
48+
func (t *TcpTransport) Listener(laddr ma.Multiaddr) (Listener, error) {
49+
t.llock.Lock()
50+
defer t.llock.Unlock()
51+
s := laddr.String()
52+
l, found := t.listeners[s]
53+
if found {
54+
return l, nil
55+
}
56+
57+
list, err := manetListen(laddr)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
tlist := &tcpListener{
63+
list: list,
64+
transport: t,
65+
}
66+
67+
t.listeners[s] = tlist
68+
return tlist, nil
69+
}
70+
71+
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
72+
network, naddr, err := manet.DialArgs(addr)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
if ReuseportIsAvailable() {
78+
nl, err := reuseport.Listen(network, naddr)
79+
if err == nil {
80+
// hey, it worked!
81+
return manet.WrapNetListener(nl)
82+
}
83+
// reuseport is available, but we failed to listen. log debug, and retry normally.
84+
log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
85+
}
86+
87+
// either reuseport not available, or it failed. try normally.
88+
return manet.Listen(addr)
89+
}
90+
91+
func (t *TcpTransport) Matches(a ma.Multiaddr) bool {
92+
return IsTcpMultiaddr(a)
93+
}
94+
95+
type tcpDialer struct {
96+
laddr ma.Multiaddr
97+
98+
doReuse bool
99+
100+
rd reuseport.Dialer
101+
madialer manet.Dialer
102+
103+
transport Transport
104+
}
105+
106+
func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr) (*tcpDialer, error) {
107+
// get the local net.Addr manually
108+
la, err := manet.ToNetAddr(laddr)
109+
if err != nil {
110+
return nil, err // something wrong with laddr.
111+
}
112+
113+
if !ReuseportIsAvailable() {
114+
return &tcpDialer{
115+
doReuse: false,
116+
laddr: laddr,
117+
madialer: base,
118+
transport: t,
119+
}, nil
120+
}
121+
122+
return &tcpDialer{
123+
doReuse: true,
124+
laddr: laddr,
125+
rd: reuseport.Dialer{D: net.Dialer{LocalAddr: la}},
126+
madialer: base,
127+
transport: t,
128+
}, nil
129+
}
130+
131+
func (d *tcpDialer) Dial(raddr ma.Multiaddr) (Conn, error) {
132+
var c manet.Conn
133+
var err error
134+
if d.doReuse {
135+
c, err = d.reuseDial(raddr)
136+
} else {
137+
c, err = d.madialer.Dial(raddr)
138+
}
139+
140+
if err != nil {
141+
return nil, err
142+
}
143+
144+
return &connWrap{
145+
Conn: c,
146+
transport: d.transport,
147+
}, nil
148+
}
149+
150+
func (d *tcpDialer) reuseDial(raddr ma.Multiaddr) (manet.Conn, error) {
151+
logdial := lgbl.Dial("conn", "", "", d.laddr, raddr)
152+
rpev := log.EventBegin(context.TODO(), "tptDialReusePort", logdial)
153+
154+
network, netraddr, err := manet.DialArgs(raddr)
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
con, err := d.rd.Dial(network, netraddr)
160+
if err == nil {
161+
logdial["reuseport"] = "success"
162+
rpev.Done()
163+
return manet.WrapNetConn(con)
164+
}
165+
166+
if !ReuseErrShouldRetry(err) {
167+
logdial["reuseport"] = "failure"
168+
logdial["error"] = err
169+
rpev.Done()
170+
return nil, err
171+
}
172+
173+
logdial["reuseport"] = "retry"
174+
logdial["error"] = err
175+
rpev.Done()
176+
177+
return d.madialer.Dial(raddr)
178+
}
179+
180+
func (d *tcpDialer) Matches(a ma.Multiaddr) bool {
181+
return IsTcpMultiaddr(a)
182+
}
183+
184+
type tcpListener struct {
185+
list manet.Listener
186+
transport Transport
187+
}
188+
189+
func (d *tcpListener) Accept() (Conn, error) {
190+
c, err := d.list.Accept()
191+
if err != nil {
192+
return nil, err
193+
}
194+
195+
return &connWrap{
196+
Conn: c,
197+
transport: d.transport,
198+
}, nil
199+
}
200+
201+
func (d *tcpListener) Addr() net.Addr {
202+
return d.list.Addr()
203+
}
204+
205+
func (t *tcpListener) Multiaddr() ma.Multiaddr {
206+
return t.list.Multiaddr()
207+
}
208+
209+
func (t *tcpListener) NetListener() net.Listener {
210+
return t.list.NetListener()
211+
}
212+
213+
func (d *tcpListener) Close() error {
214+
return d.list.Close()
215+
}

‎p2p/net/transport/transport.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package transport
2+
3+
import (
4+
"net"
5+
6+
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
7+
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
8+
logging "github.com/ipfs/go-ipfs/vendor/QmTBXYb6y2ZcJmoXVKk3pf9rzSEjbCg7tQaJW7RSuH14nv/go-log"
9+
)
10+
11+
var log = logging.Logger("transport")
12+
13+
type Conn interface {
14+
manet.Conn
15+
16+
Transport() Transport
17+
}
18+
19+
type Transport interface {
20+
Dialer(laddr ma.Multiaddr) (Dialer, error)
21+
Listener(laddr ma.Multiaddr) (Listener, error)
22+
Matches(ma.Multiaddr) bool
23+
}
24+
25+
type Dialer interface {
26+
Dial(raddr ma.Multiaddr) (Conn, error)
27+
Matches(ma.Multiaddr) bool
28+
}
29+
30+
type Listener interface {
31+
Accept() (Conn, error)
32+
Close() error
33+
Addr() net.Addr
34+
Multiaddr() ma.Multiaddr
35+
}
36+
37+
type connWrap struct {
38+
manet.Conn
39+
transport Transport
40+
}
41+
42+
func (cw *connWrap) Transport() Transport {
43+
return cw.transport
44+
}
45+
46+
func IsTcpMultiaddr(a ma.Multiaddr) bool {
47+
p := a.Protocols()
48+
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
49+
}
50+
51+
func IsUtpMultiaddr(a ma.Multiaddr) bool {
52+
p := a.Protocols()
53+
return len(p) == 3 && p[2].Name == "utp"
54+
}

0 commit comments

Comments
 (0)
Please sign in to comment.