Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a2d55ae

Browse files
committedMay 2, 2015
WIP: a slightly different approach to the network simulator
1 parent 612ca19 commit a2d55ae

File tree

4 files changed

+164
-8
lines changed

4 files changed

+164
-8
lines changed
 

‎exchange/bitswap/workers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (bs *Bitswap) provideWorker(ctx context.Context) {
9494
ctx, cancel := context.WithTimeout(ctx, provideTimeout)
9595
err := bs.network.Provide(ctx, k)
9696
if err != nil {
97-
log.Error(err)
97+
log.Warning(err)
9898
}
9999
cancel()
100100
case <-ctx.Done():

‎p2p/net/mock2/network.go

+56-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package netsim
22

33
import (
4+
"bufio"
5+
"bytes"
46
"errors"
57
"fmt"
68
"io"
@@ -217,7 +219,13 @@ func (ns *NetworkSimulator) NewConnPair(local, remote peer.ID, fullsync bool) (*
217219

218220
ctx, cancel := context.WithCancel(context.Background())
219221

220-
bufsize := 50
222+
lread := bufio.NewReader(conl)
223+
rread := bufio.NewReader(conr)
224+
225+
lwrite := bufio.NewWriter(conl)
226+
rwrite := bufio.NewWriter(conr)
227+
228+
bufsize := 10
221229
lc := &Conn{
222230
Conn: conl,
223231
local: local,
@@ -229,6 +237,8 @@ func (ns *NetworkSimulator) NewConnPair(local, remote peer.ID, fullsync bool) (*
229237
delay: make(chan time.Time, bufsize),
230238
ctx: ctx,
231239
cancel: cancel,
240+
read: lread,
241+
write: lwrite,
232242
}
233243
go lc.transport()
234244

@@ -243,6 +253,8 @@ func (ns *NetworkSimulator) NewConnPair(local, remote peer.ID, fullsync bool) (*
243253
delay: make(chan time.Time, bufsize),
244254
ctx: ctx,
245255
cancel: cancel,
256+
read: rread,
257+
write: rwrite,
246258
}
247259
go rc.transport()
248260

@@ -252,27 +264,60 @@ func (ns *NetworkSimulator) NewConnPair(local, remote peer.ID, fullsync bool) (*
252264
// transport will grab message arrival times, wait until that time, and
253265
// then write the message out when it is scheduled to arrive
254266
func (c *Conn) transport() {
267+
bufsize := 256
268+
buf := new(bytes.Buffer)
269+
ticker := time.NewTicker(time.Millisecond * 4)
270+
loop:
255271
for {
256272
select {
257273
case t := <-c.delay:
274+
msg := <-c.msgs
275+
258276
now := time.Now()
277+
buffered := len(msg) + buf.Len()
278+
259279
if !now.After(t) {
260-
time.Sleep(t.Sub(now))
280+
if buffered < bufsize {
281+
buf.Write(msg)
282+
continue loop
283+
} else {
284+
time.Sleep(t.Sub(now))
285+
}
261286
}
262-
msg := <-c.msgs
263287

288+
if buf.Len() > 0 {
289+
_, err := c.Conn.Write(buf.Bytes())
290+
if err != nil {
291+
return
292+
}
293+
buf.Reset()
294+
}
264295
_, err := c.Conn.Write(msg)
265296
if err != nil {
266297
return
267298
}
299+
case <-ticker.C:
300+
if buf.Len() > 0 {
301+
_, err := c.Conn.Write(buf.Bytes())
302+
if err != nil {
303+
return
304+
}
305+
buf.Reset()
306+
}
268307
case <-c.ctx.Done():
269308
return
270309
}
271310
}
272311
}
273312

313+
func (c *Conn) Read(b []byte) (int, error) {
314+
return c.read.Read(b)
315+
}
316+
274317
type Conn struct {
275318
net.Conn
319+
read io.Reader
320+
write io.Writer
276321
local peer.ID
277322
remote peer.ID
278323
laddr ma.Multiaddr
@@ -326,3 +371,11 @@ func (co *ConnectionOpts) GetLatency() time.Duration {
326371
}
327372
return co.Delay + jitter
328373
}
374+
375+
func (co *ConnectionOpts) MinLatency() time.Duration {
376+
return co.Delay - co.Jitter
377+
}
378+
379+
func (co *ConnectionOpts) MaxLatency() time.Duration {
380+
return co.Delay + co.Jitter
381+
}

‎test/integration/bench_cat_test.go

+37-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"io"
66
"testing"
7+
"time"
78

89
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
910
"github.com/ipfs/go-ipfs/core"
@@ -15,15 +16,47 @@ import (
1516
testutil "github.com/ipfs/go-ipfs/util/testutil"
1617
)
1718

18-
func BenchmarkCat1MB(b *testing.B) { benchmarkVarCat(b, unit.MB*1) }
19-
func BenchmarkCat2MB(b *testing.B) { benchmarkVarCat(b, unit.MB*2) }
20-
func BenchmarkCat4MB(b *testing.B) { benchmarkVarCat(b, unit.MB*4) }
19+
func BenchmarkCat1MB(b *testing.B) { benchmarkVarCat(b, unit.MB*1) }
20+
func BenchmarkCat2MB(b *testing.B) { benchmarkVarCat(b, unit.MB*2) }
21+
func BenchmarkCat4MB(b *testing.B) { benchmarkVarCat(b, unit.MB*4) }
22+
func BenchmarkCat8MB(b *testing.B) { benchmarkVarCat(b, unit.MB*8) }
23+
func BenchmarkCat16MB(b *testing.B) { benchmarkVarCat(b, unit.MB*16) }
24+
func BenchmarkCat32MB(b *testing.B) { benchmarkVarCat(b, unit.MB*32) }
25+
26+
func BenchmarkCat16MB_0Ms(b *testing.B) {
27+
benchmarkVarCatConf(b, unit.MB*16, instant)
28+
}
29+
30+
func BenchmarkCat16MB_25Ms(b *testing.B) {
31+
cfg := testutil.LatencyConfig{
32+
NetworkLatency: time.Millisecond * 25,
33+
}
34+
benchmarkVarCatConf(b, unit.MB*16, cfg)
35+
}
36+
37+
func BenchmarkCat16MB_50Ms(b *testing.B) {
38+
cfg := testutil.LatencyConfig{
39+
NetworkLatency: time.Millisecond * 50,
40+
}
41+
benchmarkVarCatConf(b, unit.MB*16, cfg)
42+
}
43+
44+
func BenchmarkCat16MB_100Ms(b *testing.B) {
45+
cfg := testutil.LatencyConfig{
46+
NetworkLatency: time.Millisecond * 100,
47+
}
48+
benchmarkVarCatConf(b, unit.MB*16, cfg)
49+
}
2150

2251
func benchmarkVarCat(b *testing.B, size int64) {
52+
benchmarkVarCatConf(b, size, instant)
53+
}
54+
55+
func benchmarkVarCatConf(b *testing.B, size int64, conf testutil.LatencyConfig) {
2356
data := RandomBytes(size)
2457
b.SetBytes(size)
2558
for n := 0; n < b.N; n++ {
26-
err := benchCat(b, data, instant)
59+
err := benchCat(b, data, conf)
2760
if err != nil {
2861
b.Fatal(err)
2962
}

‎test/integration/routing_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package integrationtest
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
8+
core "github.com/ipfs/go-ipfs/core"
9+
mn2 "github.com/ipfs/go-ipfs/p2p/net/mock2"
10+
"github.com/ipfs/go-ipfs/p2p/peer"
11+
"github.com/ipfs/go-ipfs/thirdparty/unit"
12+
testutil "github.com/ipfs/go-ipfs/util/testutil"
13+
)
14+
15+
func TestRoutingPing(t *testing.T) {
16+
conf := testutil.LatencyConfig{
17+
NetworkLatency: time.Millisecond * 100,
18+
}
19+
ctx, cancel := context.WithCancel(context.Background())
20+
defer cancel()
21+
const numPeers = 2
22+
23+
// create network
24+
ns, err := mn2.NewNetworkSimulator(numPeers)
25+
if err != nil {
26+
t.Fatal(err)
27+
}
28+
defer ns.Close()
29+
30+
ns.ConOpts = mn2.ConnectionOpts{
31+
Bandwidth: 500 * unit.MB,
32+
Delay: conf.NetworkLatency,
33+
}
34+
35+
peers := ns.Peers()
36+
if len(peers) < numPeers {
37+
t.Fatal("test initialization error")
38+
}
39+
40+
node1, err := core.NewIPFSNode(ctx, MocknetTestRepo(peers[0], ns, conf, core.DHTOption))
41+
if err != nil {
42+
t.Fatal(err)
43+
}
44+
defer node1.Close()
45+
node2, err := core.NewIPFSNode(ctx, MocknetTestRepo(peers[1], ns, conf, core.DHTOption))
46+
if err != nil {
47+
t.Fatal(err)
48+
}
49+
defer node2.Close()
50+
51+
bs1 := []peer.PeerInfo{node1.Peerstore.PeerInfo(node1.Identity)}
52+
bs2 := []peer.PeerInfo{node2.Peerstore.PeerInfo(node2.Identity)}
53+
54+
if err := node2.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil {
55+
t.Fatal(err)
56+
}
57+
if err := node1.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil {
58+
t.Fatal(err)
59+
}
60+
61+
ctx, _ = context.WithTimeout(context.Background(), time.Second*5)
62+
took, err := node1.Routing.Ping(ctx, node2.Identity)
63+
if err != nil {
64+
t.Fatal(err)
65+
}
66+
67+
if took < conf.NetworkLatency*2 || took > (conf.NetworkLatency*2)+(time.Millisecond*10) {
68+
t.Fatalf("ping took a weird amount of time: %s, expected ~200ms", took)
69+
}
70+
}

0 commit comments

Comments
 (0)
Please sign in to comment.