Skip to content

Commit b910d8a

Browse files
committedOct 19, 2015
Merge pull request #1858 from ipfs/fix/bitswap-limiter
fix panic in bitswap working limit spawning
2 parents 9ca0be3 + 22f0b87 commit b910d8a

File tree

1 file changed

+26
-22
lines changed

1 file changed

+26
-22
lines changed
 

‎exchange/bitswap/workers.go

+26-22
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
77
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
8-
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
98
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
109

1110
key "github.com/ipfs/go-ipfs/blocks/key"
@@ -74,43 +73,48 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
7473

7574
func (bs *Bitswap) provideWorker(px process.Process) {
7675

77-
limiter := ratelimit.NewRateLimiter(px, provideWorkerMax)
76+
limit := make(chan struct{}, provideWorkerMax)
7877

7978
limitedGoProvide := func(k key.Key, wid int) {
79+
defer func() {
80+
// replace token when done
81+
<-limit
82+
}()
8083
ev := logging.LoggableMap{"ID": wid}
81-
limiter.LimitedGo(func(px process.Process) {
8284

83-
ctx := procctx.OnClosingContext(px) // derive ctx from px
84-
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
85+
ctx := procctx.OnClosingContext(px) // derive ctx from px
86+
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
8587

86-
ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
87-
defer cancel()
88+
ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
89+
defer cancel()
8890

89-
if err := bs.network.Provide(ctx, k); err != nil {
90-
log.Error(err)
91-
}
92-
})
91+
if err := bs.network.Provide(ctx, k); err != nil {
92+
log.Error(err)
93+
}
9394
}
9495

9596
// worker spawner, reads from bs.provideKeys until it closes, spawning a
9697
// _ratelimited_ number of workers to handle each key.
97-
limiter.Go(func(px process.Process) {
98-
for wid := 2; ; wid++ {
99-
ev := logging.LoggableMap{"ID": 1}
100-
log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
98+
for wid := 2; ; wid++ {
99+
ev := logging.LoggableMap{"ID": 1}
100+
log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
101101

102+
select {
103+
case <-px.Closing():
104+
return
105+
case k, ok := <-bs.provideKeys:
106+
if !ok {
107+
log.Debug("provideKeys channel closed")
108+
return
109+
}
102110
select {
103111
case <-px.Closing():
104112
return
105-
case k, ok := <-bs.provideKeys:
106-
if !ok {
107-
log.Debug("provideKeys channel closed")
108-
return
109-
}
110-
limitedGoProvide(k, wid)
113+
case limit <- struct{}{}:
114+
go limitedGoProvide(k, wid)
111115
}
112116
}
113-
})
117+
}
114118
}
115119

116120
func (bs *Bitswap) provideCollector(ctx context.Context) {

0 commit comments

Comments
 (0)
Please sign in to comment.