Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5eadfac

Browse files
committedJun 19, 2015
WIP: using multistream muxer
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent 370df8f commit 5eadfac

File tree

8 files changed

+110
-22
lines changed

8 files changed

+110
-22
lines changed
 

‎Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go

+7-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go

+56
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎p2p/host/basic/basic_host.go

+19-9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
protocol "github.com/ipfs/go-ipfs/p2p/protocol"
1616
identify "github.com/ipfs/go-ipfs/p2p/protocol/identify"
1717
relay "github.com/ipfs/go-ipfs/p2p/protocol/relay"
18+
19+
msmux "github.com/whyrusleeping/go-multistream"
1820
)
1921

2022
var log = eventlog.Logger("p2p/host/basic")
@@ -39,7 +41,7 @@ const (
3941
// * uses a nat service to establish NAT port mappings
4042
type BasicHost struct {
4143
network inet.Network
42-
mux *protocol.Mux
44+
mux *msmux.MultistreamMuxer
4345
ids *identify.IDService
4446
relay *relay.RelayService
4547
natmgr *natManager
@@ -53,7 +55,7 @@ type BasicHost struct {
5355
func New(net inet.Network, opts ...interface{}) *BasicHost {
5456
h := &BasicHost{
5557
network: net,
56-
mux: protocol.NewMux(),
58+
mux: msmux.NewMultistreamMuxer(),
5759
bwc: metrics.NewBandwidthCounter(),
5860
}
5961

@@ -67,7 +69,12 @@ func New(net inet.Network, opts ...interface{}) *BasicHost {
6769

6870
// setup host services
6971
h.ids = identify.NewIDService(h)
70-
h.relay = relay.NewRelayService(h, h.Mux().HandleSync)
72+
73+
muxh := h.Mux().Handle
74+
handle := func(s inet.Stream) {
75+
muxh(s)
76+
}
77+
h.relay = relay.NewRelayService(h, handle)
7178

7279
for _, o := range opts {
7380
switch o := o.(type) {
@@ -95,7 +102,7 @@ func (h *BasicHost) newConnHandler(c inet.Conn) {
95102
// newStreamHandler is the remote-opened stream handler for inet.Network
96103
// TODO: this feels a bit wonky
97104
func (h *BasicHost) newStreamHandler(s inet.Stream) {
98-
protoID, handle, err := h.Mux().ReadHeader(s)
105+
protoID, handle, err := h.Mux().Negotiate(s)
99106
if err != nil {
100107
if err == io.EOF {
101108
log.Debugf("protocol EOF: %s", s.Conn().RemotePeer())
@@ -105,7 +112,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
105112
return
106113
}
107114

108-
logStream := mstream.WrapStream(s, protoID, h.bwc)
115+
logStream := mstream.WrapStream(s, protocol.ID(protoID), h.bwc)
109116

110117
go handle(logStream)
111118
}
@@ -126,7 +133,7 @@ func (h *BasicHost) Network() inet.Network {
126133
}
127134

128135
// Mux returns the Mux multiplexing incoming streams to protocol handlers
129-
func (h *BasicHost) Mux() *protocol.Mux {
136+
func (h *BasicHost) Mux() *msmux.MultistreamMuxer {
130137
return h.mux
131138
}
132139

@@ -140,12 +147,15 @@ func (h *BasicHost) IDService() *identify.IDService {
140147
// host.Mux().SetHandler(proto, handler)
141148
// (Threadsafe)
142149
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) {
143-
h.Mux().SetHandler(pid, handler)
150+
h.Mux().AddHandler(string(pid), func(rwc io.ReadWriteCloser) error {
151+
handler(rwc.(inet.Stream))
152+
return nil
153+
})
144154
}
145155

146156
// RemoveStreamHandler returns ..
147157
func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
148-
h.Mux().RemoveHandler(pid)
158+
h.Mux().RemoveHandler(string(pid))
149159
}
150160

151161
// NewStream opens a new stream to given peer p, and writes a p2p/protocol
@@ -160,7 +170,7 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
160170

161171
logStream := mstream.WrapStream(s, pid, h.bwc)
162172

163-
if err := protocol.WriteHeader(logStream, pid); err != nil {
173+
if err := msmux.SelectProtoOrFail(string(pid), s); err != nil {
164174
logStream.Close()
165175
return nil, err
166176
}

‎p2p/host/host.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
peer "github.com/ipfs/go-ipfs/p2p/peer"
99
protocol "github.com/ipfs/go-ipfs/p2p/protocol"
1010
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
11+
12+
msmux "github.com/whyrusleeping/go-multistream"
1113
)
1214

1315
var log = eventlog.Logger("p2p/host")
@@ -31,7 +33,7 @@ type Host interface {
3133
Network() inet.Network
3234

3335
// Mux returns the Mux multiplexing incoming streams to protocol handlers
34-
Mux() *protocol.Mux
36+
Mux() *msmux.MultistreamMuxer
3537

3638
// Connect ensures there is a connection between this host and the peer with
3739
// given peer.ID. Connect will absorb the addresses in pi into its internal

‎p2p/host/routed/routed.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
peer "github.com/ipfs/go-ipfs/p2p/peer"
1616
protocol "github.com/ipfs/go-ipfs/p2p/protocol"
1717
routing "github.com/ipfs/go-ipfs/routing"
18+
19+
msmux "github.com/whyrusleeping/go-multistream"
1820
)
1921

2022
var log = eventlog.Logger("p2p/host/routed")
@@ -97,7 +99,7 @@ func (rh *RoutedHost) Network() inet.Network {
9799
return rh.host.Network()
98100
}
99101

100-
func (rh *RoutedHost) Mux() *protocol.Mux {
102+
func (rh *RoutedHost) Mux() *msmux.MultistreamMuxer {
101103
return rh.host.Mux()
102104
}
103105

‎p2p/net/swarm/swarm.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
1818
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
1919
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
20-
psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux"
20+
psmss "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream"
2121
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
2222
)
2323

@@ -26,9 +26,7 @@ var log = eventlog.Logger("swarm2")
2626
var PSTransport pst.Transport
2727

2828
func init() {
29-
tpt := *psy.DefaultTransport
30-
tpt.MaxStreamWindowSize = 512 * 1024
31-
PSTransport = &tpt
29+
PSTransport = psmss.NewTransport()
3230
}
3331

3432
// Swarm is a connection muxer, allowing connections to other peers to

‎p2p/net/swarm/swarm_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ func TestSwarm(t *testing.T) {
236236
SubtestSwarm(t, swarms, msgs)
237237
}
238238

239+
func TestBasicSwarm(t *testing.T) {
240+
// t.Skip("skipping for another test")
241+
t.Parallel()
242+
243+
msgs := 1
244+
swarms := 2
245+
SubtestSwarm(t, swarms, msgs)
246+
}
247+
239248
func TestConnHandler(t *testing.T) {
240249
// t.Skip("skipping for another test")
241250
t.Parallel()

0 commit comments

Comments
 (0)
Please sign in to comment.