Skip to content

Commit 6f943d7

Browse files
committedAug 5, 2015
Merge pull request #1553 from ipfs/fix-providing-speed
bitswap/provide: improved rate limiting
2 parents b30d9d4 + 6e705e1 commit 6f943d7

File tree

2 files changed

+48
-46
lines changed

2 files changed

+48
-46
lines changed
 

‎exchange/bitswap/bitswap.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ const (
3939
// kMaxPriority is the max priority as defined by the bitswap protocol
4040
kMaxPriority = math.MaxInt32
4141

42-
HasBlockBufferSize = 256
43-
provideWorkers = 4
42+
HasBlockBufferSize = 256
43+
provideKeysBufferSize = 2048
44+
provideWorkerMax = 512
4445
)
4546

4647
var rebroadcastDelay = delay.Fixed(time.Second * 10)
@@ -85,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
8586
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
8687
process: px,
8788
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
88-
provideKeys: make(chan key.Key),
89+
provideKeys: make(chan key.Key, provideKeysBufferSize),
8990
wm: NewWantManager(ctx, network),
9091
}
9192
go bs.wm.Run()

‎exchange/bitswap/workers.go

+44-43
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,19 @@
11
package bitswap
22

33
import (
4-
"os"
5-
"strconv"
64
"time"
75

86
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
7+
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
98
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
9+
waitable "github.com/ipfs/go-ipfs/thirdparty/waitable"
1010

1111
key "github.com/ipfs/go-ipfs/blocks/key"
1212
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
1313
)
1414

1515
var TaskWorkerCount = 8
1616

17-
func init() {
18-
twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
19-
if twc != "" {
20-
n, err := strconv.Atoi(twc)
21-
if err != nil {
22-
log.Error(err)
23-
return
24-
}
25-
if n > 0 {
26-
TaskWorkerCount = n
27-
} else {
28-
log.Errorf("Invalid value of '%d' for IPFS_BITSWAP_TASK_WORKERS", n)
29-
}
30-
}
31-
}
32-
3317
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
3418
// Start up a worker to handle block requests this node is making
3519
px.Go(func(px process.Process) {
@@ -57,12 +41,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
5741
// Spawn up multiple workers to handle incoming blocks
5842
// consider increasing number if providing blocks bottlenecks
5943
// file transfers
60-
for i := 0; i < provideWorkers; i++ {
61-
i := i
62-
px.Go(func(px process.Process) {
63-
bs.provideWorker(ctx, i)
64-
})
65-
}
44+
px.Go(bs.provideWorker)
6645
}
6746

6847
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
@@ -77,7 +56,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
7756
if !ok {
7857
continue
7958
}
80-
log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()})
59+
log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{
60+
"ID": id,
61+
"Target": envelope.Peer.Pretty(),
62+
"Block": envelope.Block.Multihash.B58String(),
63+
})
8164

8265
bs.wm.SendBlock(ctx, envelope)
8366
case <-ctx.Done():
@@ -89,27 +72,45 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
8972
}
9073
}
9174

92-
func (bs *Bitswap) provideWorker(ctx context.Context, id int) {
93-
idmap := eventlog.LoggableMap{"ID": id}
94-
for {
95-
log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap)
96-
select {
97-
case k, ok := <-bs.provideKeys:
98-
log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k)
99-
if !ok {
100-
log.Debug("provideKeys channel closed")
101-
return
102-
}
103-
ctx, cancel := context.WithTimeout(ctx, provideTimeout)
104-
err := bs.network.Provide(ctx, k)
105-
if err != nil {
75+
func (bs *Bitswap) provideWorker(px process.Process) {
76+
77+
limiter := ratelimit.NewRateLimiter(px, provideWorkerMax)
78+
79+
limitedGoProvide := func(k key.Key, wid int) {
80+
ev := eventlog.LoggableMap{"ID": wid}
81+
limiter.LimitedGo(func(px process.Process) {
82+
83+
ctx := waitable.Context(px) // derive ctx from px
84+
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
85+
86+
ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
87+
defer cancel()
88+
89+
if err := bs.network.Provide(ctx, k); err != nil {
10690
log.Error(err)
10791
}
108-
cancel()
109-
case <-ctx.Done():
110-
return
111-
}
92+
})
11293
}
94+
95+
// worker spawner, reads from bs.provideKeys until it closes, spawning a
96+
// _ratelimited_ number of workers to handle each key.
97+
limiter.Go(func(px process.Process) {
98+
for wid := 2; ; wid++ {
99+
ev := eventlog.LoggableMap{"ID": 1}
100+
log.Event(waitable.Context(px), "Bitswap.ProvideWorker.Loop", ev)
101+
102+
select {
103+
case <-px.Closing():
104+
return
105+
case k, ok := <-bs.provideKeys:
106+
if !ok {
107+
log.Debug("provideKeys channel closed")
108+
return
109+
}
110+
limitedGoProvide(k, wid)
111+
}
112+
}
113+
})
113114
}
114115

115116
func (bs *Bitswap) provideCollector(ctx context.Context) {

0 commit comments

Comments
 (0)
Please sign in to comment.