|
5 | 5 |
|
6 | 6 | process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
7 | 7 | procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
8 |
| - ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" |
9 | 8 | context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
10 | 9 |
|
11 | 10 | key "github.com/ipfs/go-ipfs/blocks/key"
|
@@ -74,43 +73,48 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
|
74 | 73 |
|
75 | 74 | func (bs *Bitswap) provideWorker(px process.Process) {
|
76 | 75 |
|
77 |
| - limiter := ratelimit.NewRateLimiter(px, provideWorkerMax) |
| 76 | + limit := make(chan struct{}, provideWorkerMax) |
78 | 77 |
|
79 | 78 | limitedGoProvide := func(k key.Key, wid int) {
|
| 79 | + defer func() { |
| 80 | + // replace token when done |
| 81 | + <-limit |
| 82 | + }() |
80 | 83 | ev := logging.LoggableMap{"ID": wid}
|
81 |
| - limiter.LimitedGo(func(px process.Process) { |
82 | 84 |
|
83 |
| - ctx := procctx.OnClosingContext(px) // derive ctx from px |
84 |
| - defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() |
| 85 | + ctx := procctx.OnClosingContext(px) // derive ctx from px |
| 86 | + defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() |
85 | 87 |
|
86 |
| - ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx |
87 |
| - defer cancel() |
| 88 | + ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx |
| 89 | + defer cancel() |
88 | 90 |
|
89 |
| - if err := bs.network.Provide(ctx, k); err != nil { |
90 |
| - log.Error(err) |
91 |
| - } |
92 |
| - }) |
| 91 | + if err := bs.network.Provide(ctx, k); err != nil { |
| 92 | + log.Error(err) |
| 93 | + } |
93 | 94 | }
|
94 | 95 |
|
95 | 96 | // worker spawner, reads from bs.provideKeys until it closes, spawning a
|
96 | 97 | // _ratelimited_ number of workers to handle each key.
|
97 |
| - limiter.Go(func(px process.Process) { |
98 |
| - for wid := 2; ; wid++ { |
99 |
| - ev := logging.LoggableMap{"ID": 1} |
100 |
| - log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) |
| 98 | + for wid := 2; ; wid++ { |
| 99 | + ev := logging.LoggableMap{"ID": 1} |
| 100 | + log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) |
101 | 101 |
|
| 102 | + select { |
| 103 | + case <-px.Closing(): |
| 104 | + return |
| 105 | + case k, ok := <-bs.provideKeys: |
| 106 | + if !ok { |
| 107 | + log.Debug("provideKeys channel closed") |
| 108 | + return |
| 109 | + } |
102 | 110 | select {
|
103 | 111 | case <-px.Closing():
|
104 | 112 | return
|
105 |
| - case k, ok := <-bs.provideKeys: |
106 |
| - if !ok { |
107 |
| - log.Debug("provideKeys channel closed") |
108 |
| - return |
109 |
| - } |
110 |
| - limitedGoProvide(k, wid) |
| 113 | + case limit <- struct{}{}: |
| 114 | + go limitedGoProvide(k, wid) |
111 | 115 | }
|
112 | 116 | }
|
113 |
| - }) |
| 117 | + } |
114 | 118 | }
|
115 | 119 |
|
116 | 120 | func (bs *Bitswap) provideCollector(ctx context.Context) {
|
|
0 commit comments