Skip to content

Commit a19ad97

Browse files
committedJul 14, 2015
make ping its own protocol
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent 9ec3c1a commit a19ad97

File tree

7 files changed

+181
-64
lines changed

7 files changed

+181
-64
lines changed
 

‎core/commands/ping.go

+21-16
Original file line numberDiff line numberDiff line change
@@ -138,30 +138,35 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int)
138138

139139
outChan <- &PingResult{Text: fmt.Sprintf("PING %s.", pid.Pretty())}
140140

141+
ctx, cancel := context.WithTimeout(ctx, kPingTimeout*time.Duration(numPings))
142+
defer cancel()
143+
pings, err := n.Ping.Ping(ctx, pid)
144+
if err != nil {
145+
log.Debugf("Ping error: %s", err)
146+
outChan <- &PingResult{Text: fmt.Sprintf("Ping error: %s", err)}
147+
return
148+
}
149+
141150
var done bool
142151
var total time.Duration
143152
for i := 0; i < numPings && !done; i++ {
144153
select {
145154
case <-ctx.Done():
146155
done = true
147-
continue
148-
default:
149-
}
150-
151-
ctx, cancel := context.WithTimeout(ctx, kPingTimeout)
152-
defer cancel()
153-
took, err := n.Routing.Ping(ctx, pid)
154-
if err != nil {
155-
log.Debugf("Ping error: %s", err)
156-
outChan <- &PingResult{Text: fmt.Sprintf("Ping error: %s", err)}
157156
break
157+
case t, ok := <-pings:
158+
if !ok {
159+
done = true
160+
break
161+
}
162+
163+
outChan <- &PingResult{
164+
Success: true,
165+
Time: t,
166+
}
167+
total += t
168+
time.Sleep(time.Second)
158169
}
159-
outChan <- &PingResult{
160-
Success: true,
161-
Time: took,
162-
}
163-
total += took
164-
time.Sleep(time.Second)
165170
}
166171
averagems := total.Seconds() * 1000 / float64(numPings)
167172
outChan <- &PingResult{

‎core/core.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
swarm "github.com/ipfs/go-ipfs/p2p/net/swarm"
3434
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
3535
peer "github.com/ipfs/go-ipfs/p2p/peer"
36+
ping "github.com/ipfs/go-ipfs/p2p/protocol/ping"
3637
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
3738

3839
routing "github.com/ipfs/go-ipfs/routing"
@@ -102,7 +103,8 @@ type IpfsNode struct {
102103
Exchange exchange.Interface // the block exchange + strategy (bitswap)
103104
Namesys namesys.NameSystem // the name system, resolves paths to hashes
104105
Diagnostics *diag.Diagnostics // the diagnostics service
105-
Reprovider *rp.Reprovider // the value reprovider system
106+
Ping *ping.PingService
107+
Reprovider *rp.Reprovider // the value reprovider system
106108

107109
IpnsFs *ipnsfs.Filesystem
108110

@@ -324,6 +326,7 @@ func (n *IpfsNode) HandlePeerFound(p peer.PeerInfo) {
324326
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error {
325327
// setup diagnostics service
326328
n.Diagnostics = diag.NewDiagnostics(n.Identity, host)
329+
n.Ping = ping.NewPingService(host)
327330

328331
// setup routing service
329332
r, err := routingOption(ctx, host, n.Repo.Datastore())

‎p2p/protocol/ping/ping.go

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package ping
2+
3+
import (
4+
"bytes"
5+
"errors"
6+
"io"
7+
"time"
8+
9+
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
10+
11+
host "github.com/ipfs/go-ipfs/p2p/host"
12+
inet "github.com/ipfs/go-ipfs/p2p/net"
13+
peer "github.com/ipfs/go-ipfs/p2p/peer"
14+
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
15+
u "github.com/ipfs/go-ipfs/util"
16+
)
17+
18+
var log = eventlog.Logger("ping")
19+
20+
const PingSize = 32
21+
22+
const ID = "/ipfs/ping"
23+
24+
type PingService struct {
25+
Host host.Host
26+
}
27+
28+
func NewPingService(h host.Host) *PingService {
29+
ps := &PingService{h}
30+
h.SetStreamHandler(ID, ps.PingHandler)
31+
return ps
32+
}
33+
34+
func (p *PingService) PingHandler(s inet.Stream) {
35+
buf := make([]byte, PingSize)
36+
37+
for {
38+
_, err := io.ReadFull(s, buf)
39+
if err != nil {
40+
log.Debug(err)
41+
return
42+
}
43+
44+
_, err = s.Write(buf)
45+
if err != nil {
46+
log.Debug(err)
47+
return
48+
}
49+
}
50+
}
51+
52+
func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) {
53+
s, err := ps.Host.NewStream(ID, p)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
out := make(chan time.Duration)
59+
go func() {
60+
defer close(out)
61+
for {
62+
select {
63+
case <-ctx.Done():
64+
return
65+
default:
66+
t, err := ping(s)
67+
if err != nil {
68+
log.Debugf("ping error: %s", err)
69+
return
70+
}
71+
72+
select {
73+
case out <- t:
74+
case <-ctx.Done():
75+
return
76+
}
77+
}
78+
}
79+
}()
80+
81+
return out, nil
82+
}
83+
84+
func ping(s inet.Stream) (time.Duration, error) {
85+
buf := make([]byte, PingSize)
86+
u.NewTimeSeededRand().Read(buf)
87+
88+
before := time.Now()
89+
_, err := s.Write(buf)
90+
if err != nil {
91+
return 0, err
92+
}
93+
94+
rbuf := make([]byte, PingSize)
95+
_, err = io.ReadFull(s, rbuf)
96+
if err != nil {
97+
return 0, err
98+
}
99+
100+
if !bytes.Equal(buf, rbuf) {
101+
return 0, errors.New("ping packet was incorrect!")
102+
}
103+
104+
return time.Now().Sub(before), nil
105+
}

‎p2p/protocol/ping/ping_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package ping
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
8+
peer "github.com/ipfs/go-ipfs/p2p/peer"
9+
netutil "github.com/ipfs/go-ipfs/p2p/test/util"
10+
)
11+
12+
func TestPing(t *testing.T) {
13+
ctx, cancel := context.WithCancel(context.Background())
14+
defer cancel()
15+
h1 := netutil.GenHostSwarm(t, ctx)
16+
h2 := netutil.GenHostSwarm(t, ctx)
17+
18+
err := h1.Connect(ctx, peer.PeerInfo{
19+
ID: h2.ID(),
20+
Addrs: h2.Addrs(),
21+
})
22+
23+
if err != nil {
24+
t.Fatal(err)
25+
}
26+
27+
ps1 := NewPingService(h1)
28+
ps2 := NewPingService(h2)
29+
30+
testPing(t, ps1, h2.ID())
31+
testPing(t, ps2, h1.ID())
32+
}
33+
34+
func testPing(t *testing.T, ps *PingService, p peer.ID) {
35+
pctx, cancel := context.WithCancel(context.Background())
36+
defer cancel()
37+
ts, err := ps.Ping(pctx, p)
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
42+
for i := 0; i < 5; i++ {
43+
select {
44+
case took := <-ts:
45+
t.Log("ping took: ", took)
46+
case <-time.After(time.Second * 4):
47+
t.Fatal("failed to receive ping")
48+
}
49+
}
50+
51+
}

‎routing/dht/dht_test.go

-29
Original file line numberDiff line numberDiff line change
@@ -102,35 +102,6 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
102102
cancel()
103103
}
104104

105-
func TestPing(t *testing.T) {
106-
// t.Skip("skipping test to debug another")
107-
ctx := context.Background()
108-
109-
dhtA := setupDHT(ctx, t)
110-
dhtB := setupDHT(ctx, t)
111-
112-
peerA := dhtA.self
113-
peerB := dhtB.self
114-
115-
defer dhtA.Close()
116-
defer dhtB.Close()
117-
defer dhtA.host.Close()
118-
defer dhtB.host.Close()
119-
120-
connect(t, ctx, dhtA, dhtB)
121-
122-
//Test that we can ping the node
123-
ctxT, _ := context.WithTimeout(ctx, 100*time.Millisecond)
124-
if _, err := dhtA.Ping(ctxT, peerB); err != nil {
125-
t.Fatal(err)
126-
}
127-
128-
ctxT, _ = context.WithTimeout(ctx, 100*time.Millisecond)
129-
if _, err := dhtB.Ping(ctxT, peerA); err != nil {
130-
t.Fatal(err)
131-
}
132-
}
133-
134105
func TestValueGetSet(t *testing.T) {
135106
// t.Skip("skipping test to debug another")
136107

‎routing/dht/routing.go

-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package dht
22

33
import (
44
"sync"
5-
"time"
65

76
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
87
key "github.com/ipfs/go-ipfs/blocks/key"
@@ -397,16 +396,3 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
397396

398397
return peerchan, nil
399398
}
400-
401-
// Ping a peer, log the time it took
402-
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) (time.Duration, error) {
403-
// Thoughts: maybe this should accept an ID and do a peer lookup?
404-
log.Debugf("ping %s start", p)
405-
before := time.Now()
406-
407-
pmes := pb.NewMessage(pb.Message_PING, "", 0)
408-
_, err := dht.sendRequest(ctx, p, pmes)
409-
log.Debugf("ping %s end (err = %s)", p, err)
410-
411-
return time.Now().Sub(before), err
412-
}

‎routing/routing.go

-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package routing
33

44
import (
55
"errors"
6-
"time"
76

87
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
98
key "github.com/ipfs/go-ipfs/blocks/key"
@@ -38,9 +37,6 @@ type IpfsRouting interface {
3837
// with relevant addresses.
3938
FindPeer(context.Context, peer.ID) (peer.PeerInfo, error)
4039

41-
// Ping a peer, log the time it took
42-
Ping(context.Context, peer.ID) (time.Duration, error)
43-
4440
// Bootstrap allows callers to hint to the routing system to get into a
4541
// Boostrapped state
4642
Bootstrap(context.Context) error

0 commit comments

Comments
 (0)
Please sign in to comment.