Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ipfs/kubo
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 9ec3c1aac1ea^
Choose a base ref
...
head repository: ipfs/kubo
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: a5d0e133da01
Choose a head ref
  • 2 commits
  • 7 files changed
  • 1 contributor

Commits on Jul 13, 2015

  1. clean up unused dht methods

    License: MIT
    Signed-off-by: Jeromy <jeromyj@gmail.com>
    whyrusleeping committed Jul 13, 2015

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    9ec3c1a View commit details

Commits on Jul 14, 2015

  1. make ping its own protocol

    License: MIT
    Signed-off-by: Jeromy <jeromyj@gmail.com>
    whyrusleeping committed Jul 14, 2015

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    a5d0e13 View commit details
Showing with 137 additions and 95 deletions.
  1. +22 −16 core/commands/ping.go
  2. +4 −1 core/core.go
  3. +105 −0 p2p/protocol/ping/ping.go
  4. +0 −57 routing/dht/dht.go
  5. +6 −3 routing/dht/dht_test.go
  6. +0 −14 routing/dht/routing.go
  7. +0 −4 routing/routing.go
38 changes: 22 additions & 16 deletions core/commands/ping.go
Original file line number Diff line number Diff line change
@@ -138,30 +138,36 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int)

outChan <- &PingResult{Text: fmt.Sprintf("PING %s.", pid.Pretty())}

log.Error("PING TIME: ", numPings)
ctx, cancel := context.WithTimeout(ctx, kPingTimeout*time.Duration(numPings))
defer cancel()
pings, err := n.Ping.Ping(ctx, pid)
if err != nil {
log.Debugf("Ping error: %s", err)
outChan <- &PingResult{Text: fmt.Sprintf("Ping error: %s", err)}
return
}

var done bool
var total time.Duration
for i := 0; i < numPings && !done; i++ {
select {
case <-ctx.Done():
done = true
continue
default:
}

ctx, cancel := context.WithTimeout(ctx, kPingTimeout)
defer cancel()
took, err := n.Routing.Ping(ctx, pid)
if err != nil {
log.Debugf("Ping error: %s", err)
outChan <- &PingResult{Text: fmt.Sprintf("Ping error: %s", err)}
break
case t, ok := <-pings:
if !ok {
done = true
break
}

outChan <- &PingResult{
Success: true,
Time: t,
}
total += t
time.Sleep(time.Second)
}
outChan <- &PingResult{
Success: true,
Time: took,
}
total += took
time.Sleep(time.Second)
}
averagems := total.Seconds() * 1000 / float64(numPings)
outChan <- &PingResult{
5 changes: 4 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ import (
swarm "github.com/ipfs/go-ipfs/p2p/net/swarm"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
peer "github.com/ipfs/go-ipfs/p2p/peer"
ping "github.com/ipfs/go-ipfs/p2p/protocol/ping"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"

routing "github.com/ipfs/go-ipfs/routing"
@@ -102,7 +103,8 @@ type IpfsNode struct {
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Diagnostics *diag.Diagnostics // the diagnostics service
Reprovider *rp.Reprovider // the value reprovider system
Ping *ping.PingService
Reprovider *rp.Reprovider // the value reprovider system

IpnsFs *ipnsfs.Filesystem

@@ -324,6 +326,7 @@ func (n *IpfsNode) HandlePeerFound(p peer.PeerInfo) {
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error {
// setup diagnostics service
n.Diagnostics = diag.NewDiagnostics(n.Identity, host)
n.Ping = ping.NewPingService(host)

// setup routing service
r, err := routingOption(ctx, host, n.Repo.Datastore())
105 changes: 105 additions & 0 deletions p2p/protocol/ping/ping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package ping

import (
"bytes"
"errors"
"io"
"time"

context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

host "github.com/ipfs/go-ipfs/p2p/host"
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
u "github.com/ipfs/go-ipfs/util"
)

var log = eventlog.Logger("ping")

const PingSize = 32

const ID = "/ipfs/ping"

type PingService struct {
Host host.Host
}

func NewPingService(h host.Host) *PingService {
ps := &PingService{h}
h.SetStreamHandler(ID, ps.PingHandler)
return ps
}

func (p *PingService) PingHandler(s inet.Stream) {
buf := make([]byte, PingSize)

for {
_, err := io.ReadFull(s, buf)
if err != nil {
log.Debug(err)
return
}

_, err = s.Write(buf)
if err != nil {
log.Debug(err)
return
}
}
}

func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) {
s, err := ps.Host.NewStream(ID, p)
if err != nil {
return nil, err
}

out := make(chan time.Duration)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
default:
t, err := ping(s)
if err != nil {
log.Debugf("ping error: %s", err)
return
}

select {
case out <- t:
case <-ctx.Done():
return
}
}
}
}()

return out, nil
}

func ping(s inet.Stream) (time.Duration, error) {
buf := make([]byte, PingSize)
u.NewTimeSeededRand().Read(buf)

before := time.Now()
_, err := s.Write(buf)
if err != nil {
return 0, err
}

rbuf := make([]byte, PingSize)
_, err = io.ReadFull(s, rbuf)
if err != nil {
return 0, err
}

if !bytes.Equal(buf, rbuf) {
return 0, errors.New("ping packet was incorrect!")
}

return time.Now().Sub(before), nil
}
57 changes: 0 additions & 57 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ package dht

import (
"bytes"
"crypto/rand"
"errors"
"fmt"
"sync"
@@ -33,8 +32,6 @@ var log = eventlog.Logger("dht")

var ProtocolDHT protocol.ID = "/ipfs/dht"

const doPinging = false

// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5
@@ -92,11 +89,6 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.Validator = make(record.Validator)
dht.Validator["pk"] = record.PublicKeyValidator

if doPinging {
dht.proc.Go(func(p goprocess.Process) {
dht.PingRoutine(time.Second * 10)
})
}
return dht
}

@@ -110,23 +102,6 @@ func (dht *IpfsDHT) log() eventlog.EventLogger {
return log // TODO rm
}

// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
// TODO: change interface to accept a PeerInfo as well.
if err := dht.host.Connect(ctx, peer.PeerInfo{ID: npeer}); err != nil {
return err
}

// Ping new peer to register in their routing table
// NOTE: this should be done better...
if _, err := dht.Ping(ctx, npeer); err != nil {
return fmt.Errorf("failed to ping newly connected peer: %s", err)
}
log.Event(ctx, "connect", dht.self, npeer)
dht.Update(ctx, npeer)
return nil
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key key.Key, rec *pb.Record) error {
@@ -343,38 +318,6 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
return filtered
}

func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error {
if p == dht.self {
return errors.New("attempting to ensure connection to self")
}

// dial connection
return dht.host.Connect(ctx, peer.PeerInfo{ID: p})
}

// PingRoutine periodically pings nearest neighbors.
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
tick := time.Tick(t)
for {
select {
case <-tick:
id := make([]byte, 16)
rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(id)), 5)
for _, p := range peers {
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Debugf("Ping error: %s", err)
}
cancel()
}
case <-dht.proc.Closing():
return
}
}
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
9 changes: 6 additions & 3 deletions routing/dht/dht_test.go
Original file line number Diff line number Diff line change
@@ -74,7 +74,8 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}

a.peerstore.AddAddrs(idB, addrB, peer.TempAddrTTL)
if err := a.Connect(ctx, idB); err != nil {
pi := peer.PeerInfo{ID: idB}
if err := a.host.Connect(ctx, pi); err != nil {
t.Fatal(err)
}
}
@@ -789,12 +790,14 @@ func TestConnectCollision(t *testing.T) {
errs := make(chan error)
go func() {
dhtA.peerstore.AddAddr(peerB, addrB, peer.TempAddrTTL)
err := dhtA.Connect(ctx, peerB)
pi := peer.PeerInfo{ID: peerB}
err := dhtA.host.Connect(ctx, pi)
errs <- err
}()
go func() {
dhtB.peerstore.AddAddr(peerA, addrA, peer.TempAddrTTL)
err := dhtB.Connect(ctx, peerA)
pi := peer.PeerInfo{ID: peerA}
err := dhtB.host.Connect(ctx, pi)
errs <- err
}()

14 changes: 0 additions & 14 deletions routing/dht/routing.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package dht

import (
"sync"
"time"

context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
@@ -397,16 +396,3 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<

return peerchan, nil
}

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) (time.Duration, error) {
// Thoughts: maybe this should accept an ID and do a peer lookup?
log.Debugf("ping %s start", p)
before := time.Now()

pmes := pb.NewMessage(pb.Message_PING, "", 0)
_, err := dht.sendRequest(ctx, p, pmes)
log.Debugf("ping %s end (err = %s)", p, err)

return time.Now().Sub(before), err
}
4 changes: 0 additions & 4 deletions routing/routing.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ package routing

import (
"errors"
"time"

context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
@@ -38,9 +37,6 @@ type IpfsRouting interface {
// with relevant addresses.
FindPeer(context.Context, peer.ID) (peer.PeerInfo, error)

// Ping a peer, log the time it took
Ping(context.Context, peer.ID) (time.Duration, error)

// Bootstrap allows callers to hint to the routing system to get into a
// Boostrapped state
Bootstrap(context.Context) error