Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit fe40df8

Browse files
committedNov 20, 2015
wire contexts into bitswap requests more deeply
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent 856e250 commit fe40df8

File tree

6 files changed

+95
-85
lines changed

6 files changed

+95
-85
lines changed
 

‎exchange/bitswap/bitswap.go

+11-34
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
7575
notifications: notif,
7676
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
7777
network: network,
78-
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
78+
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
7979
process: px,
8080
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
8181
provideKeys: make(chan key.Key, provideKeysBufferSize),
@@ -118,7 +118,7 @@ type Bitswap struct {
118118
notifications notifications.PubSub
119119

120120
// send keys to a worker to find and connect to providers for them
121-
findKeys chan *blockRequest
121+
findKeys chan *wantlist.Entry
122122

123123
engine *decision.Engine
124124

@@ -135,8 +135,8 @@ type Bitswap struct {
135135
}
136136

137137
type blockRequest struct {
138-
keys []key.Key
139-
ctx context.Context
138+
key key.Key
139+
ctx context.Context
140140
}
141141

142142
// GetBlock attempts to retrieve a particular block from peers within the
@@ -208,11 +208,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block
208208
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
209209
}
210210

211-
bs.wm.WantBlocks(keys)
211+
bs.wm.WantBlocks(ctx, keys)
212212

213-
req := &blockRequest{
214-
keys: keys,
215-
ctx: ctx,
213+
// NB: Optimization. Assumes that providers of key[0] are likely to
214+
// be able to provide for all keys. This currently holds true in most
215+
// every situation. Later, this assumption may not hold as true.
216+
req := &wantlist.Entry{
217+
Key: keys[0],
218+
Ctx: ctx,
216219
}
217220
select {
218221
case bs.findKeys <- req:
@@ -265,32 +268,6 @@ func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error {
265268
return err
266269
}
267270

268-
func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) {
269-
270-
ctx, cancel := context.WithCancel(ctx)
271-
defer cancel()
272-
273-
// Get providers for all entries in wantlist (could take a while)
274-
wg := sync.WaitGroup{}
275-
for _, e := range entries {
276-
wg.Add(1)
277-
go func(k key.Key) {
278-
defer wg.Done()
279-
280-
child, cancel := context.WithTimeout(ctx, providerRequestTimeout)
281-
defer cancel()
282-
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
283-
for prov := range providers {
284-
go func(p peer.ID) {
285-
bs.network.ConnectTo(ctx, p)
286-
}(prov)
287-
}
288-
}(e.Key)
289-
}
290-
291-
wg.Wait() // make sure all our children do finish.
292-
}
293-
294271
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
295272
// This call records changes to wantlists, blocks received,
296273
// and number of bytes transfered.

‎exchange/bitswap/decision/engine.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
215215
e.peerRequestQueue.Remove(entry.Key, p)
216216
} else {
217217
log.Debugf("wants %s - %d", entry.Key, entry.Priority)
218-
l.Wants(entry.Key, entry.Priority)
218+
l.Wants(entry.Ctx, entry.Key, entry.Priority)
219219
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
220220
e.peerRequestQueue.Push(entry.Entry, p)
221221
newWorkExists = true

‎exchange/bitswap/decision/ledger.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
key "github.com/ipfs/go-ipfs/blocks/key"
77
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
88
peer "github.com/ipfs/go-ipfs/p2p/peer"
9+
10+
"golang.org/x/net/context"
911
)
1012

1113
// keySet is just a convenient alias for maps of keys, where we only care
@@ -68,9 +70,9 @@ func (l *ledger) ReceivedBytes(n int) {
6870
}
6971

7072
// TODO: this needs to be different. We need timeouts.
71-
func (l *ledger) Wants(k key.Key, priority int) {
73+
func (l *ledger) Wants(ctx context.Context, k key.Key, priority int) {
7274
log.Debugf("peer %s wants %s", l.Partner, k)
73-
l.wantList.Add(k, priority)
75+
l.wantList.Add(ctx, k, priority)
7476
}
7577

7678
func (l *ledger) CancelWant(k key.Key) {

‎exchange/bitswap/wantlist/wantlist.go

+22-8
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
package wantlist
44

55
import (
6-
key "github.com/ipfs/go-ipfs/blocks/key"
76
"sort"
87
"sync"
8+
9+
key "github.com/ipfs/go-ipfs/blocks/key"
10+
11+
"golang.org/x/net/context"
912
)
1013

1114
type ThreadSafe struct {
@@ -16,14 +19,14 @@ type ThreadSafe struct {
1619
// not threadsafe
1720
type Wantlist struct {
1821
set map[key.Key]Entry
19-
// TODO provide O(1) len accessor if cost becomes an issue
2022
}
2123

2224
type Entry struct {
2325
// TODO consider making entries immutable so they can be shared safely and
2426
// slices can be copied efficiently.
2527
Key key.Key
2628
Priority int
29+
Ctx context.Context
2730
}
2831

2932
type entrySlice []Entry
@@ -44,22 +47,25 @@ func New() *Wantlist {
4447
}
4548
}
4649

47-
func (w *ThreadSafe) Add(k key.Key, priority int) {
48-
// TODO rm defer for perf
50+
func (w *ThreadSafe) Add(ctx context.Context, k key.Key, priority int) {
51+
w.lk.Lock()
52+
defer w.lk.Unlock()
53+
w.Wantlist.Add(ctx, k, priority)
54+
}
55+
56+
func (w *ThreadSafe) AddEntry(e Entry) {
4957
w.lk.Lock()
5058
defer w.lk.Unlock()
51-
w.Wantlist.Add(k, priority)
59+
w.Wantlist.AddEntry(e)
5260
}
5361

5462
func (w *ThreadSafe) Remove(k key.Key) {
55-
// TODO rm defer for perf
5663
w.lk.Lock()
5764
defer w.lk.Unlock()
5865
w.Wantlist.Remove(k)
5966
}
6067

6168
func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) {
62-
// TODO rm defer for perf
6369
w.lk.RLock()
6470
defer w.lk.RUnlock()
6571
return w.Wantlist.Contains(k)
@@ -87,14 +93,22 @@ func (w *Wantlist) Len() int {
8793
return len(w.set)
8894
}
8995

90-
func (w *Wantlist) Add(k key.Key, priority int) {
96+
func (w *Wantlist) Add(ctx context.Context, k key.Key, priority int) {
9197
if _, ok := w.set[k]; ok {
9298
return
9399
}
94100
w.set[k] = Entry{
95101
Key: k,
96102
Priority: priority,
103+
Ctx: ctx,
104+
}
105+
}
106+
107+
func (w *Wantlist) AddEntry(e Entry) {
108+
if _, ok := w.set[e.Key]; ok {
109+
return
97110
}
111+
w.set[e.Key] = e
98112
}
99113

100114
func (w *Wantlist) Remove(k key.Key) {

‎exchange/bitswap/wantmanager.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,24 @@ type msgQueue struct {
6262
done chan struct{}
6363
}
6464

65-
func (pm *WantManager) WantBlocks(ks []key.Key) {
65+
func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
6666
log.Infof("want blocks: %s", ks)
67-
pm.addEntries(ks, false)
67+
pm.addEntries(ctx, ks, false)
6868
}
6969

7070
func (pm *WantManager) CancelWants(ks []key.Key) {
71-
pm.addEntries(ks, true)
71+
pm.addEntries(context.TODO(), ks, true)
7272
}
7373

74-
func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
74+
func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) {
7575
var entries []*bsmsg.Entry
7676
for i, k := range ks {
7777
entries = append(entries, &bsmsg.Entry{
7878
Cancel: cancel,
7979
Entry: wantlist.Entry{
8080
Key: k,
8181
Priority: kMaxPriority - i,
82+
Ctx: ctx,
8283
},
8384
})
8485
}
@@ -216,7 +217,7 @@ func (pm *WantManager) Run() {
216217
if e.Cancel {
217218
pm.wl.Remove(e.Key)
218219
} else {
219-
pm.wl.Add(e.Key, e.Priority)
220+
pm.wl.AddEntry(e.Entry)
220221
}
221222
}
222223

@@ -229,6 +230,14 @@ func (pm *WantManager) Run() {
229230
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
230231
var es []*bsmsg.Entry
231232
for _, e := range pm.wl.Entries() {
233+
select {
234+
case <-e.Ctx.Done():
235+
// entry has been cancelled
236+
// simply continue, the entry will be removed from the
237+
// wantlist soon enough
238+
continue
239+
default:
240+
}
232241
es = append(es, &bsmsg.Entry{Entry: e})
233242
}
234243
for _, p := range pm.peers {

‎exchange/bitswap/workers.go

+43-35
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package bitswap
22

33
import (
4+
"sync"
45
"time"
56

67
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
78
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
89
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
910

1011
key "github.com/ipfs/go-ipfs/blocks/key"
12+
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
13+
peer "github.com/ipfs/go-ipfs/p2p/peer"
1114
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
1215
)
1316

@@ -16,7 +19,7 @@ var TaskWorkerCount = 8
1619
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
1720
// Start up a worker to handle block requests this node is making
1821
px.Go(func(px process.Process) {
19-
bs.providerConnector(ctx)
22+
bs.providerQueryManager(ctx)
2023
})
2124

2225
// Start up workers to handle requests from other nodes for the data on this node
@@ -149,37 +152,6 @@ func (bs *Bitswap) provideCollector(ctx context.Context) {
149152
}
150153
}
151154

152-
// connects to providers for the given keys
153-
func (bs *Bitswap) providerConnector(parent context.Context) {
154-
defer log.Info("bitswap client worker shutting down...")
155-
156-
for {
157-
log.Event(parent, "Bitswap.ProviderConnector.Loop")
158-
select {
159-
case req := <-bs.findKeys:
160-
keys := req.keys
161-
if len(keys) == 0 {
162-
log.Warning("Received batch request for zero blocks")
163-
continue
164-
}
165-
log.Event(parent, "Bitswap.ProviderConnector.Work", logging.LoggableMap{"Keys": keys})
166-
167-
// NB: Optimization. Assumes that providers of key[0] are likely to
168-
// be able to provide for all keys. This currently holds true in most
169-
// every situation. Later, this assumption may not hold as true.
170-
child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
171-
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
172-
for p := range providers {
173-
go bs.network.ConnectTo(req.ctx, p)
174-
}
175-
cancel()
176-
177-
case <-parent.Done():
178-
return
179-
}
180-
}
181-
}
182-
183155
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
184156
ctx, cancel := context.WithCancel(parent)
185157
defer cancel()
@@ -200,12 +172,48 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
200172
}
201173
case <-broadcastSignal.C: // resend unfulfilled wantlist keys
202174
log.Event(ctx, "Bitswap.Rebroadcast.active")
203-
entries := bs.wm.wl.Entries()
204-
if len(entries) > 0 {
205-
bs.connectToProviders(ctx, entries)
175+
for _, e := range bs.wm.wl.Entries() {
176+
bs.findKeys <- &e
206177
}
207178
case <-parent.Done():
208179
return
209180
}
210181
}
211182
}
183+
184+
func (bs *Bitswap) providerQueryManager(ctx context.Context) {
185+
var activeLk sync.Mutex
186+
active := make(map[key.Key]*wantlist.Entry)
187+
188+
for {
189+
select {
190+
case e := <-bs.findKeys:
191+
activeLk.Lock()
192+
if _, ok := active[e.Key]; ok {
193+
continue
194+
}
195+
active[e.Key] = e
196+
activeLk.Unlock()
197+
198+
go func(e *wantlist.Entry) {
199+
child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
200+
defer cancel()
201+
providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest)
202+
for p := range providers {
203+
go func(p peer.ID) {
204+
err := bs.network.ConnectTo(child, p)
205+
if err != nil {
206+
log.Debug("failed to connect to provider %s: %s", p, err)
207+
}
208+
}(p)
209+
}
210+
activeLk.Lock()
211+
delete(active, e.Key)
212+
activeLk.Unlock()
213+
}(e)
214+
215+
case <-ctx.Done():
216+
return
217+
}
218+
}
219+
}

0 commit comments

Comments
 (0)
Please sign in to comment.