Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e5327c2

Browse files
committedDec 9, 2015
implement utp transport
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent 28fa917 commit e5327c2

File tree

6 files changed

+170
-19
lines changed

6 files changed

+170
-19
lines changed
 

‎p2p/net/swarm/addr/addr.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ var log = logging.Logger("p2p/net/swarm/addr")
1818
var SupportedTransportStrings = []string{
1919
"/ip4/tcp",
2020
"/ip6/tcp",
21-
// "/ip4/udp/utp", disabled because the lib is broken
22-
// "/ip6/udp/utp", disabled because the lib is broken
21+
"/ip4/udp/utp",
22+
"/ip6/udp/utp",
2323
// "/ip4/udp/udt", disabled because the lib doesnt work on arm
2424
// "/ip6/udp/udt", disabled because the lib doesnt work on arm
2525
}

‎p2p/net/swarm/addr/addr_test.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ func TestFilterAddrs(t *testing.T) {
2020
bad := []ma.Multiaddr{
2121
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234"), // unreliable
2222
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
23-
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"), // utp is broken
2423
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
2524
newMultiaddr(t, "/ip6/fe80::1/tcp/1234"), // link local
2625
newMultiaddr(t, "/ip6/fe80::100/tcp/1234"), // link local
@@ -29,6 +28,7 @@ func TestFilterAddrs(t *testing.T) {
2928
good := []ma.Multiaddr{
3029
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
3130
newMultiaddr(t, "/ip6/::1/tcp/1234"),
31+
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"),
3232
}
3333

3434
goodAndBad := append(good, bad...)
@@ -39,18 +39,12 @@ func TestFilterAddrs(t *testing.T) {
3939
if AddrUsable(a, false) {
4040
t.Errorf("addr %s should be unusable", a)
4141
}
42-
if AddrUsable(a, true) {
43-
t.Errorf("addr %s should be unusable", a)
44-
}
4542
}
4643

4744
for _, a := range good {
4845
if !AddrUsable(a, false) {
4946
t.Errorf("addr %s should be usable", a)
5047
}
51-
if !AddrUsable(a, true) {
52-
t.Errorf("addr %s should be usable", a)
53-
}
5448
}
5549

5650
subtestAddrsEqual(t, FilterUsableAddrs(bad), []ma.Multiaddr{})

‎p2p/net/swarm/swarm.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,16 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
9191
}
9292

9393
s := &Swarm{
94-
swarm: ps.NewSwarm(PSTransport),
95-
local: local,
96-
peers: peers,
97-
ctx: ctx,
98-
dialT: DialTimeout,
99-
notifs: make(map[inet.Notifiee]ps.Notifiee),
100-
transports: []transport.Transport{transport.NewTCPTransport()},
94+
swarm: ps.NewSwarm(PSTransport),
95+
local: local,
96+
peers: peers,
97+
ctx: ctx,
98+
dialT: DialTimeout,
99+
notifs: make(map[inet.Notifiee]ps.Notifiee),
100+
transports: []transport.Transport{
101+
transport.NewTCPTransport(),
102+
transport.NewUtpTransport(),
103+
},
101104
bwc: bwc,
102105
fdRateLimit: make(chan struct{}, concurrentFdDials),
103106
Filters: filter.NewFilters(),

‎p2p/net/swarm/swarm_addr_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ func TestFilterAddrs(t *testing.T) {
2525
bad := []ma.Multiaddr{
2626
m("/ip4/1.2.3.4/udp/1234"), // unreliable
2727
m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
28-
m("/ip4/1.2.3.4/udp/1234/utp"), // utp is broken
2928
m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
3029
m("/ip6/fe80::1/tcp/0"), // link local
3130
m("/ip6/fe80::100/tcp/1234"), // link local
@@ -34,20 +33,21 @@ func TestFilterAddrs(t *testing.T) {
3433
good := []ma.Multiaddr{
3534
m("/ip4/127.0.0.1/tcp/0"),
3635
m("/ip6/::1/tcp/0"),
36+
m("/ip4/1.2.3.4/udp/1234/utp"),
3737
}
3838

3939
goodAndBad := append(good, bad...)
4040

4141
// test filters
4242

4343
for _, a := range bad {
44-
if addrutil.AddrUsable(a, true) {
44+
if addrutil.AddrUsable(a, false) {
4545
t.Errorf("addr %s should be unusable", a)
4646
}
4747
}
4848

4949
for _, a := range good {
50-
if !addrutil.AddrUsable(a, true) {
50+
if !addrutil.AddrUsable(a, false) {
5151
t.Errorf("addr %s should be usable", a)
5252
}
5353
}

‎p2p/net/transport/utp.go

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package transport
2+
3+
import (
4+
"net"
5+
"sync"
6+
7+
utp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/anacrolix/utp"
8+
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
9+
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
10+
mautp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/utp"
11+
)
12+
13+
type UtpTransport struct {
14+
sockLock sync.Mutex
15+
sockets map[string]*UtpSocket
16+
}
17+
18+
func NewUtpTransport() *UtpTransport {
19+
return &UtpTransport{
20+
sockets: make(map[string]*UtpSocket),
21+
}
22+
}
23+
24+
func (d *UtpTransport) Matches(a ma.Multiaddr) bool {
25+
p := a.Protocols()
26+
return len(p) == 3 && p[2].Name == "utp"
27+
}
28+
29+
type UtpSocket struct {
30+
s *utp.Socket
31+
laddr ma.Multiaddr
32+
transport Transport
33+
}
34+
35+
func (t *UtpTransport) Listen(laddr ma.Multiaddr) (Listener, error) {
36+
t.sockLock.Lock()
37+
defer t.sockLock.Unlock()
38+
s, ok := t.sockets[laddr.String()]
39+
if ok {
40+
return s, nil
41+
}
42+
43+
ns, err := t.newConn(laddr)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
t.sockets[laddr.String()] = ns
49+
return ns, nil
50+
}
51+
52+
func (t *UtpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) {
53+
t.sockLock.Lock()
54+
defer t.sockLock.Unlock()
55+
s, ok := t.sockets[laddr.String()]
56+
if ok {
57+
return s, nil
58+
}
59+
60+
ns, err := t.newConn(laddr, opts...)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
t.sockets[laddr.String()] = ns
66+
return ns, nil
67+
}
68+
69+
func (t *UtpTransport) newConn(addr ma.Multiaddr, opts ...DialOpt) (*UtpSocket, error) {
70+
network, netaddr, err := manet.DialArgs(addr)
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
s, err := utp.NewSocket("udp"+network[3:], netaddr)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
laddr, err := manet.FromNetAddr(mautp.MakeAddr(s.LocalAddr()))
81+
if err != nil {
82+
return nil, err
83+
}
84+
85+
return &UtpSocket{
86+
s: s,
87+
laddr: laddr,
88+
transport: t,
89+
}, nil
90+
}
91+
92+
func (s *UtpSocket) Dial(raddr ma.Multiaddr) (Conn, error) {
93+
_, addr, err := manet.DialArgs(raddr)
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
con, err := s.s.Dial(addr)
99+
if err != nil {
100+
return nil, err
101+
}
102+
103+
mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con})
104+
if err != nil {
105+
return nil, err
106+
}
107+
108+
return &connWrap{
109+
Conn: mnc,
110+
transport: s.transport,
111+
}, nil
112+
}
113+
114+
func (s *UtpSocket) Accept() (Conn, error) {
115+
c, err := s.s.Accept()
116+
if err != nil {
117+
return nil, err
118+
}
119+
120+
mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: c})
121+
if err != nil {
122+
return nil, err
123+
}
124+
125+
return &connWrap{
126+
Conn: mnc,
127+
transport: s.transport,
128+
}, nil
129+
}
130+
131+
func (s *UtpSocket) Matches(a ma.Multiaddr) bool {
132+
p := a.Protocols()
133+
return len(p) == 3 && p[2].Name == "utp"
134+
}
135+
136+
func (t *UtpSocket) Close() error {
137+
return t.s.Close()
138+
}
139+
140+
func (t *UtpSocket) Addr() net.Addr {
141+
return t.s.Addr()
142+
}
143+
144+
func (t *UtpSocket) Multiaddr() ma.Multiaddr {
145+
return t.laddr
146+
}
147+
148+
var _ Transport = (*UtpTransport)(nil)

‎test/sharness/t0130-multinode.sh

+6
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,10 @@ test_expect_success "set up tcp testbed" '
7878

7979
run_basic_test
8080

81+
test_expect_success "set up utp testbed" '
82+
iptb init -n 5 -p 0 -f --bootstrap=none --utp
83+
'
84+
85+
run_basic_test
86+
8187
test_done

0 commit comments

Comments
 (0)
Please sign in to comment.