@@ -75,20 +75,20 @@ func (bs *Bitswap) provideWorker(px process.Process) {
75
75
76
76
limit := make (chan struct {}, provideWorkerMax )
77
77
78
- limitedGoProvide := func (k key.Key , wid int ) {
78
+ limitedGoProvide := func (ks [] key.Key , wid int ) {
79
79
defer func () {
80
80
// replace token when done
81
81
<- limit
82
82
}()
83
83
ev := logging.LoggableMap {"ID" : wid }
84
84
85
85
ctx := procctx .OnClosingContext (px ) // derive ctx from px
86
- defer log .EventBegin (ctx , "Bitswap.ProvideWorker.Work" , ev , & k ).Done ()
86
+ defer log .EventBegin (ctx , "Bitswap.ProvideWorker.Work" , ev ).Done ()
87
87
88
88
ctx , cancel := context .WithTimeout (ctx , provideTimeout ) // timeout ctx
89
89
defer cancel ()
90
90
91
- if err := bs .network .Provide (ctx , k ); err != nil {
91
+ if err := bs .network .ProvideMany (ctx , ks ); err != nil {
92
92
log .Error (err )
93
93
}
94
94
}
@@ -102,7 +102,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
102
102
select {
103
103
case <- px .Closing ():
104
104
return
105
- case k , ok := <- bs .provideKeys :
105
+ case ks , ok := <- bs .provideKeys :
106
106
if ! ok {
107
107
log .Debug ("provideKeys channel closed" )
108
108
return
@@ -111,38 +111,50 @@ func (bs *Bitswap) provideWorker(px process.Process) {
111
111
case <- px .Closing ():
112
112
return
113
113
case limit <- struct {}{}:
114
- go limitedGoProvide (k , wid )
114
+ go limitedGoProvide (ks , wid )
115
115
}
116
116
}
117
117
}
118
118
}
119
119
120
+ var batchProvideSize = 50
121
+ var batchTimeout = time .Second
122
+
120
123
func (bs * Bitswap ) provideCollector (ctx context.Context ) {
121
- defer close ( bs . provideKeys )
122
- var toProvide []key.Key
123
- var nextKey key. Key
124
- var keysOut chan key. Key
124
+ var keys []key. Key
125
+ var keysOut chan []key.Key
126
+ var timer * time. Timer
127
+ var timerChan <- chan time. Time
125
128
126
129
for {
127
130
select {
128
- case blk , ok := <- bs .newBlocks :
131
+ case blk , ok := <- bs .newBlocks : // TODO: only send keys down this channel
129
132
if ! ok {
130
- log .Debug ("newBlocks channel closed" )
131
133
return
132
134
}
133
- if keysOut == nil {
134
- nextKey = blk .Key ()
135
+ keys = append (keys , blk .Key ())
136
+
137
+ if len (keys ) >= batchProvideSize {
135
138
keysOut = bs .provideKeys
136
- } else {
137
- toProvide = append (toProvide , blk .Key ())
138
139
}
139
- case keysOut <- nextKey :
140
- if len (toProvide ) > 0 {
141
- nextKey = toProvide [0 ]
142
- toProvide = toProvide [1 :]
143
- } else {
144
- keysOut = nil
140
+
141
+ if timer != nil {
142
+ timer .Stop ()
143
+ timer = time .NewTimer (batchTimeout )
144
+ timerChan = timer .C
145
145
}
146
+
147
+ case <- timerChan :
148
+ if len (keys ) == 0 {
149
+ timer .Stop ()
150
+ continue
151
+ }
152
+ keysOut = bs .provideKeys
153
+
154
+ case keysOut <- keys :
155
+ keysOut = nil
156
+ keys = nil
157
+
146
158
case <- ctx .Done ():
147
159
return
148
160
}
0 commit comments