Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 71ab52e

Browse files
committedNov 27, 2015
first pass at provide many mechanism
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent b4ab684 commit 71ab52e

File tree

10 files changed

+389
-15
lines changed

10 files changed

+389
-15
lines changed
 

‎p2p/net/swarm/swarm.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type Swarm struct {
7474

7575
proc goprocess.Process
7676
ctx context.Context
77-
bwc metrics.Reporter
77+
Bwc metrics.Reporter
7878
}
7979

8080
// NewSwarm constructs a Swarm, with a Chan.
@@ -98,7 +98,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
9898
dialT: DialTimeout,
9999
notifs: make(map[inet.Notifiee]ps.Notifiee),
100100
transports: []transport.Transport{transport.NewTCPTransport()},
101-
bwc: bwc,
101+
Bwc: bwc,
102102
fdRateLimit: make(chan struct{}, concurrentFdDials),
103103
Filters: filter.NewFilters(),
104104
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),

‎p2p/net/swarm/swarm_listen.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (s *Swarm) addListener(tptlist transport.Listener) error {
8686

8787
if cw, ok := list.(conn.ListenerConnWrapper); ok {
8888
cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
89-
return mconn.WrapConn(s.bwc, c)
89+
return mconn.WrapConn(s.Bwc, c)
9090
})
9191
}
9292

‎p2p/test/util/util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func DivulgeAddresses(a, b inet.Network) {
3535

3636
func GenHostSwarm(t *testing.T, ctx context.Context) *bhost.BasicHost {
3737
n := GenSwarmNetwork(t, ctx)
38-
return bhost.New(n)
38+
return bhost.New(n, n.Bwc)
3939
}
4040

4141
var RandPeerID = tu.RandPeerID

‎routing/dht/dht.go

+56-3
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,44 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
122122
return nil
123123
}
124124

125+
// putProvider sends a message to peer 'p' saying that the local node
126+
// can provide the value of 'key'
127+
func (dht *IpfsDHT) putProviders(ctx context.Context, p peer.ID, keys []key.Key) error {
128+
129+
// add self as the provider
130+
pi := peer.PeerInfo{
131+
ID: dht.self,
132+
Addrs: dht.host.Addrs(),
133+
}
134+
135+
// // only share WAN-friendly addresses ??
136+
// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
137+
if len(pi.Addrs) < 1 {
138+
// log.Infof("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, key.Key(key), pi.Addrs)
139+
return fmt.Errorf("no known addresses for self. cannot put provider.")
140+
}
141+
142+
var skeys []string
143+
for _, k := range keys {
144+
skeys = append(skeys, string(k))
145+
}
146+
147+
t := pb.Message_ADD_PROVIDER
148+
pmes := &pb.Message{
149+
Type: &t,
150+
Keys: skeys,
151+
}
152+
153+
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.PeerInfo{pi})
154+
err := dht.sendMessage(ctx, p, pmes)
155+
if err != nil {
156+
return err
157+
}
158+
159+
log.Debugf("%s putProvider: %s for %d keys (%s)", dht.self, p, len(keys), pi.Addrs)
160+
return nil
161+
}
162+
125163
// putProvider sends a message to peer 'p' saying that the local node
126164
// can provide the value of 'key'
127165
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, skey string) error {
@@ -279,9 +317,24 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.
279317

280318
// nearestPeersToQuery returns the routing tables closest peers.
281319
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
282-
key := key.Key(pmes.GetKey())
283-
closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
284-
return closer
320+
if pmes.Key != nil {
321+
return dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(pmes.GetKey())), count)
322+
} else {
323+
closer := make(map[peer.ID]struct{})
324+
for _, sk := range pmes.GetKeys() {
325+
np := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(sk)), count)
326+
for _, p := range np {
327+
closer[p] = struct{}{}
328+
}
329+
}
330+
331+
var out []peer.ID
332+
for p, _ := range closer {
333+
out = append(out, p)
334+
}
335+
336+
return out
337+
}
285338
}
286339

287340
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.

‎routing/dht/handlers.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,18 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
253253
lm["peer"] = func() interface{} { return p.Pretty() }
254254

255255
defer log.EventBegin(ctx, "handleAddProvider", lm).Done()
256-
key := key.Key(pmes.GetKey())
257-
lm["key"] = func() interface{} { return key.Pretty() }
256+
var keys []key.Key
257+
if pmes.Key != nil {
258+
k := key.Key(pmes.GetKey())
259+
lm["key"] = func() interface{} { return k.Pretty() }
260+
keys = []key.Key{k}
261+
} else {
262+
for _, sk := range pmes.GetKeys() {
263+
keys = append(keys, key.Key(sk))
264+
}
265+
}
258266

259-
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)
267+
log.Debugf("%s adding %s as a provider for '%s'", dht.self, p, keys)
260268

261269
// add provider should use the address given in the message
262270
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
@@ -273,12 +281,14 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
273281
continue
274282
}
275283

276-
log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
284+
log.Infof("received provider %s for %s (addrs: %s)", p, keys, pi.Addrs)
277285
if pi.ID != dht.self { // dont add own addrs.
278286
// add the received addresses to our peerstore.
279287
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peer.ProviderAddrTTL)
280288
}
281-
dht.providers.AddProvider(ctx, key, p)
289+
for _, key := range keys {
290+
dht.providers.AddProvider(ctx, key, p)
291+
}
282292
}
283293

284294
return nil, nil

‎routing/dht/pb/dht.pb.go

+14-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎routing/dht/pb/dht.proto

+3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ message Message {
5959
// Used to return Providers
6060
// GET_VALUE, ADD_PROVIDER, GET_PROVIDERS
6161
repeated Peer providerPeers = 9;
62+
63+
// Used to query for multiple targets in one request
64+
repeated string keys = 11;
6265
}
6366

6467
// Record represents a dht record that contains a value

‎routing/dht/provide_many.go

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package dht
2+
3+
import (
4+
"sync"
5+
6+
key "github.com/ipfs/go-ipfs/blocks/key"
7+
peer "github.com/ipfs/go-ipfs/p2p/peer"
8+
pqueue "github.com/ipfs/go-ipfs/p2p/peer/queue"
9+
routing "github.com/ipfs/go-ipfs/routing"
10+
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
11+
kb "github.com/ipfs/go-ipfs/routing/kbucket"
12+
pset "github.com/ipfs/go-ipfs/util/peerset"
13+
todoctr "github.com/ipfs/go-ipfs/util/todocounter"
14+
15+
ctxproc "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
16+
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
17+
)
18+
19+
// peerFifo implements the peerQueue interface, but provides no additional
20+
// ordering beyond that of insertion order
21+
type peerFifo struct {
22+
peers []peer.ID
23+
lk sync.Mutex
24+
}
25+
26+
func (oq *peerFifo) Enqueue(p peer.ID) {
27+
oq.lk.Lock()
28+
defer oq.lk.Unlock()
29+
oq.peers = append(oq.peers, p)
30+
}
31+
32+
func (oq *peerFifo) Dequeue() peer.ID {
33+
oq.lk.Lock()
34+
defer oq.lk.Unlock()
35+
out := oq.peers[0]
36+
oq.peers = oq.peers[1:]
37+
return out
38+
}
39+
40+
func (oq *peerFifo) Len() int {
41+
oq.lk.Lock()
42+
defer oq.lk.Unlock()
43+
return len(oq.peers)
44+
}
45+
46+
type provManyReq struct {
47+
closest map[key.Key]pqueue.PeerQueue
48+
perKeyQuery map[key.Key]pqueue.PeerQueue
49+
queried *pset.PeerSet
50+
51+
keys []key.Key
52+
keyStrs []string
53+
54+
nextTarget int
55+
gntLock sync.Mutex
56+
}
57+
58+
func newProvManyReq(ctx context.Context, dht *IpfsDHT, keys []key.Key) *provManyReq {
59+
closest := make(map[key.Key]pqueue.PeerQueue)
60+
perKeyQuery := make(map[key.Key]pqueue.PeerQueue)
61+
var keyStrs []string
62+
63+
for _, k := range keys {
64+
keyStrs = append(keyStrs, string(k))
65+
dht.providers.AddProvider(ctx, k, dht.self)
66+
67+
closest[k] = pqueue.NewXORDistancePQ(k)
68+
perKeyQuery[k] = pqueue.NewXORDistancePQ(k)
69+
70+
peers := dht.routingTable.NearestPeers(kb.ConvertKey(k), 20)
71+
for _, p := range peers {
72+
closest[k].Enqueue(p)
73+
perKeyQuery[k].Enqueue(p)
74+
}
75+
}
76+
77+
return &provManyReq{
78+
closest: closest,
79+
perKeyQuery: perKeyQuery,
80+
keys: keys,
81+
keyStrs: keyStrs,
82+
queried: pset.New(),
83+
}
84+
}
85+
86+
func (pmr *provManyReq) getNextTarget() (peer.ID, bool) {
87+
pmr.gntLock.Lock()
88+
defer pmr.gntLock.Unlock()
89+
// iterate through entire list once, starting at last offset
90+
for i := pmr.nextTarget + 1; i != pmr.nextTarget; i = (i + 1) % len(pmr.keys) {
91+
k := pmr.keys[i]
92+
for pmr.perKeyQuery[k].Len() > 0 {
93+
p := pmr.perKeyQuery[k].Dequeue()
94+
if pmr.queried.TryAdd(p) {
95+
pmr.nextTarget = i
96+
return p, true
97+
}
98+
}
99+
}
100+
101+
return "", false
102+
}
103+
104+
func (pmr *provManyReq) addCloserPeers(ps []peer.ID) {
105+
pmr.gntLock.Lock()
106+
defer pmr.gntLock.Unlock()
107+
for _, p := range ps {
108+
if pmr.queried.Contains(p) {
109+
continue
110+
}
111+
112+
for _, k := range pmr.keys {
113+
pmr.perKeyQuery[k].Enqueue(p)
114+
pmr.closest[k].Enqueue(p)
115+
}
116+
}
117+
}
118+
119+
func (pmr *provManyReq) finalProvideSet() map[peer.ID][]key.Key {
120+
final := make(map[peer.ID][]key.Key)
121+
for k, cq := range pmr.closest {
122+
for i := 0; i < KValue && cq.Len() > 0; i++ {
123+
p := cq.Dequeue()
124+
final[p] = append(final[p], k)
125+
}
126+
}
127+
128+
return final
129+
}
130+
131+
func (dht *IpfsDHT) ProvideMany(ctx context.Context, keys []key.Key) error {
132+
defer log.EventBegin(ctx, "provideMany").Done()
133+
134+
pmreq := newProvManyReq(ctx, dht, keys)
135+
136+
t := pb.Message_FIND_NODE
137+
mes := &pb.Message{
138+
Type: &t,
139+
Keys: pmreq.keyStrs,
140+
}
141+
142+
query := dht.newQuery("", func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
143+
resp, err := dht.sendRequest(ctx, p, mes)
144+
if err != nil {
145+
return nil, err
146+
}
147+
148+
peers := pb.PBPeersToPeerInfos(resp.GetCloserPeers())
149+
var pids []peer.ID
150+
for _, clpeer := range peers {
151+
dht.peerstore.AddAddrs(clpeer.ID, clpeer.Addrs, peer.TempAddrTTL)
152+
pids = append(pids, clpeer.ID)
153+
}
154+
155+
pmreq.addCloserPeers(pids)
156+
157+
result := new(dhtQueryResult)
158+
next, ok := pmreq.getNextTarget()
159+
if ok {
160+
result.closerPeers = []peer.PeerInfo{{ID: next}}
161+
}
162+
163+
return result, nil
164+
})
165+
166+
dqr := dhtQueryRunner{
167+
query: query,
168+
peersToQuery: pqueue.NewChanQueue(ctx, new(peerFifo)),
169+
peersSeen: pset.New(),
170+
rateLimit: make(chan struct{}, query.concurrency),
171+
peersRemaining: todoctr.NewSyncCounter(),
172+
proc: ctxproc.WithContext(ctx),
173+
}
174+
175+
var starter []peer.ID
176+
for i := 0; i < 5; i++ {
177+
p, ok := pmreq.getNextTarget()
178+
if ok {
179+
starter = append(starter, p)
180+
} else {
181+
log.Warning("not enough peers to fully start ProvideMany query")
182+
break
183+
}
184+
}
185+
186+
_, err := dqr.Run(ctx, starter)
187+
if err != nil && err != routing.ErrNotFound {
188+
return err
189+
}
190+
191+
final := pmreq.finalProvideSet()
192+
for p, keys := range final {
193+
// TODO: maybe this in parallel?
194+
err := dht.putProviders(ctx, p, keys)
195+
if err != nil {
196+
log.Errorf("putProviders: %s", err)
197+
continue
198+
}
199+
}
200+
201+
return nil
202+
}

‎routing/dht/provide_many_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package dht
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
8+
9+
key "github.com/ipfs/go-ipfs/blocks/key"
10+
)
11+
12+
func TestProvideMany(t *testing.T) {
13+
ctx, cancel := context.WithCancel(context.Background())
14+
defer cancel()
15+
16+
numkeys := 50
17+
num := 10
18+
_, _, dhts := setupDHTS(ctx, num, t)
19+
20+
for i := 1; i < num; i++ {
21+
connect(t, ctx, dhts[0], dhts[i])
22+
}
23+
24+
var keys []key.Key
25+
for i := 0; i < numkeys; i++ {
26+
keys = append(keys, key.Key(fmt.Sprint(i)))
27+
}
28+
29+
err := dhts[1].ProvideMany(ctx, keys)
30+
if err != nil {
31+
t.Fatal(err)
32+
}
33+
34+
// in this setup (len(dhts) == 10), every node should know every provider
35+
for _, d := range dhts {
36+
for _, k := range keys {
37+
pids := d.providers.GetProviders(ctx, k)
38+
if len(pids) != 1 {
39+
t.Fatalf("expected 1 provider for %s, got %d", k, len(pids))
40+
}
41+
}
42+
}
43+
44+
fmt.Printf("total bandwidth used: %d\n", totalBandwidth(dhts))
45+
}
46+
47+
func TestProvideManyOld(t *testing.T) {
48+
ctx, cancel := context.WithCancel(context.Background())
49+
defer cancel()
50+
51+
numkeys := 50
52+
num := 10
53+
_, _, dhts := setupDHTS(ctx, num, t)
54+
55+
for i := 1; i < num; i++ {
56+
connect(t, ctx, dhts[0], dhts[i])
57+
}
58+
59+
var keys []key.Key
60+
for i := 0; i < numkeys; i++ {
61+
keys = append(keys, key.Key(fmt.Sprint(i)))
62+
}
63+
64+
for _, k := range keys {
65+
err := dhts[1].Provide(ctx, k)
66+
if err != nil {
67+
t.Fatal(err)
68+
}
69+
}
70+
71+
// in this setup (len(dhts) == 10), every node should know every provider
72+
for i, d := range dhts {
73+
for _, k := range keys {
74+
pids := d.providers.GetProviders(ctx, k)
75+
if len(pids) != 1 {
76+
// don't fail... this doesnt always work
77+
t.Logf("[%d] expected 1 provider for %s, got %d", i, k, len(pids))
78+
}
79+
}
80+
}
81+
82+
fmt.Printf("total bandwidth used: %d\n", totalBandwidth(dhts))
83+
}
84+
func totalBandwidth(dhts []*IpfsDHT) int64 {
85+
var sum int64
86+
for _, d := range dhts {
87+
bwrp := d.host.GetBandwidthReporter()
88+
totals := bwrp.GetBandwidthTotals()
89+
sum += totals.TotalOut
90+
}
91+
return sum
92+
}

‎routing/routing.go

+3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ type IpfsRouting interface {
4444
// Announce that this node can provide value for given key
4545
Provide(context.Context, key.Key) error
4646

47+
// Announce that this node can provide all the given values
48+
ProvideMany(context.Context, []key.Key) error
49+
4750
// Find specific Peer
4851
// FindPeer searches for a peer with given ID, returns a peer.PeerInfo
4952
// with relevant addresses.

0 commit comments

Comments
 (0)
Please sign in to comment.