1
1
package bitswap
2
2
3
3
import (
4
- "os"
5
- "strconv"
6
4
"time"
7
5
8
6
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
7
+ ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
9
8
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
9
+ waitable "github.com/ipfs/go-ipfs/thirdparty/waitable"
10
10
11
11
key "github.com/ipfs/go-ipfs/blocks/key"
12
12
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
13
13
)
14
14
15
15
var TaskWorkerCount = 8
16
16
17
- func init () {
18
- twc := os .Getenv ("IPFS_BITSWAP_TASK_WORKERS" )
19
- if twc != "" {
20
- n , err := strconv .Atoi (twc )
21
- if err != nil {
22
- log .Error (err )
23
- return
24
- }
25
- if n > 0 {
26
- TaskWorkerCount = n
27
- } else {
28
- log .Errorf ("Invalid value of '%d' for IPFS_BITSWAP_TASK_WORKERS" , n )
29
- }
30
- }
31
- }
32
-
33
17
func (bs * Bitswap ) startWorkers (px process.Process , ctx context.Context ) {
34
18
// Start up a worker to handle block requests this node is making
35
19
px .Go (func (px process.Process ) {
@@ -57,12 +41,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
57
41
// Spawn up multiple workers to handle incoming blocks
58
42
// consider increasing number if providing blocks bottlenecks
59
43
// file transfers
60
- for i := 0 ; i < provideWorkers ; i ++ {
61
- i := i
62
- px .Go (func (px process.Process ) {
63
- bs .provideWorker (ctx , i )
64
- })
65
- }
44
+ px .Go (bs .provideWorker )
66
45
}
67
46
68
47
func (bs * Bitswap ) taskWorker (ctx context.Context , id int ) {
@@ -77,7 +56,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
77
56
if ! ok {
78
57
continue
79
58
}
80
- log .Event (ctx , "Bitswap.TaskWorker.Work" , eventlog.LoggableMap {"ID" : id , "Target" : envelope .Peer .Pretty (), "Block" : envelope .Block .Multihash .B58String ()})
59
+ log .Event (ctx , "Bitswap.TaskWorker.Work" , eventlog.LoggableMap {
60
+ "ID" : id ,
61
+ "Target" : envelope .Peer .Pretty (),
62
+ "Block" : envelope .Block .Multihash .B58String (),
63
+ })
81
64
82
65
bs .wm .SendBlock (ctx , envelope )
83
66
case <- ctx .Done ():
@@ -89,27 +72,45 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
89
72
}
90
73
}
91
74
92
- func (bs * Bitswap ) provideWorker (ctx context.Context , id int ) {
93
- idmap := eventlog.LoggableMap {"ID" : id }
94
- for {
95
- log .Event (ctx , "Bitswap.ProvideWorker.Loop" , idmap )
96
- select {
97
- case k , ok := <- bs .provideKeys :
98
- log .Event (ctx , "Bitswap.ProvideWorker.Work" , idmap , & k )
99
- if ! ok {
100
- log .Debug ("provideKeys channel closed" )
101
- return
102
- }
103
- ctx , cancel := context .WithTimeout (ctx , provideTimeout )
104
- err := bs .network .Provide (ctx , k )
105
- if err != nil {
75
+ func (bs * Bitswap ) provideWorker (px process.Process ) {
76
+
77
+ limiter := ratelimit .NewRateLimiter (px , provideWorkerMax )
78
+
79
+ limitedGoProvide := func (k key.Key , wid int ) {
80
+ ev := eventlog.LoggableMap {"ID" : wid }
81
+ limiter .LimitedGo (func (px process.Process ) {
82
+
83
+ ctx := waitable .Context (px ) // derive ctx from px
84
+ defer log .EventBegin (ctx , "Bitswap.ProvideWorker.Work" , ev , & k ).Done ()
85
+
86
+ ctx , cancel := context .WithTimeout (ctx , provideTimeout ) // timeout ctx
87
+ defer cancel ()
88
+
89
+ if err := bs .network .Provide (ctx , k ); err != nil {
106
90
log .Error (err )
107
91
}
108
- cancel ()
109
- case <- ctx .Done ():
110
- return
111
- }
92
+ })
112
93
}
94
+
95
+ // worker spawner, reads from bs.provideKeys until it closes, spawning a
96
+ // _ratelimited_ number of workers to handle each key.
97
+ limiter .Go (func (px process.Process ) {
98
+ for wid := 2 ; ; wid ++ {
99
+ ev := eventlog.LoggableMap {"ID" : 1 }
100
+ log .Event (waitable .Context (px ), "Bitswap.ProvideWorker.Loop" , ev )
101
+
102
+ select {
103
+ case <- px .Closing ():
104
+ return
105
+ case k , ok := <- bs .provideKeys :
106
+ if ! ok {
107
+ log .Debug ("provideKeys channel closed" )
108
+ return
109
+ }
110
+ limitedGoProvide (k , wid )
111
+ }
112
+ }
113
+ })
113
114
}
114
115
115
116
func (bs * Bitswap ) provideCollector (ctx context.Context ) {
0 commit comments