1
1
package bitswap
2
2
3
3
import (
4
+ "sync"
4
5
"time"
5
6
6
7
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
7
8
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
8
9
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
9
10
10
11
key "github.com/ipfs/go-ipfs/blocks/key"
12
+ wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
13
+ peer "github.com/ipfs/go-ipfs/p2p/peer"
11
14
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
12
15
)
13
16
@@ -16,7 +19,7 @@ var TaskWorkerCount = 8
16
19
func (bs * Bitswap ) startWorkers (px process.Process , ctx context.Context ) {
17
20
// Start up a worker to handle block requests this node is making
18
21
px .Go (func (px process.Process ) {
19
- bs .providerConnector (ctx )
22
+ bs .providerQueryManager (ctx )
20
23
})
21
24
22
25
// Start up workers to handle requests from other nodes for the data on this node
@@ -149,37 +152,6 @@ func (bs *Bitswap) provideCollector(ctx context.Context) {
149
152
}
150
153
}
151
154
152
- // connects to providers for the given keys
153
- func (bs * Bitswap ) providerConnector (parent context.Context ) {
154
- defer log .Info ("bitswap client worker shutting down..." )
155
-
156
- for {
157
- log .Event (parent , "Bitswap.ProviderConnector.Loop" )
158
- select {
159
- case req := <- bs .findKeys :
160
- keys := req .keys
161
- if len (keys ) == 0 {
162
- log .Warning ("Received batch request for zero blocks" )
163
- continue
164
- }
165
- log .Event (parent , "Bitswap.ProviderConnector.Work" , logging.LoggableMap {"Keys" : keys })
166
-
167
- // NB: Optimization. Assumes that providers of key[0] are likely to
168
- // be able to provide for all keys. This currently holds true in most
169
- // every situation. Later, this assumption may not hold as true.
170
- child , cancel := context .WithTimeout (req .ctx , providerRequestTimeout )
171
- providers := bs .network .FindProvidersAsync (child , keys [0 ], maxProvidersPerRequest )
172
- for p := range providers {
173
- go bs .network .ConnectTo (req .ctx , p )
174
- }
175
- cancel ()
176
-
177
- case <- parent .Done ():
178
- return
179
- }
180
- }
181
- }
182
-
183
155
func (bs * Bitswap ) rebroadcastWorker (parent context.Context ) {
184
156
ctx , cancel := context .WithCancel (parent )
185
157
defer cancel ()
@@ -200,12 +172,48 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
200
172
}
201
173
case <- broadcastSignal .C : // resend unfulfilled wantlist keys
202
174
log .Event (ctx , "Bitswap.Rebroadcast.active" )
203
- entries := bs .wm .wl .Entries ()
204
- if len (entries ) > 0 {
205
- bs .connectToProviders (ctx , entries )
175
+ for _ , e := range bs .wm .wl .Entries () {
176
+ bs .findKeys <- & e
206
177
}
207
178
case <- parent .Done ():
208
179
return
209
180
}
210
181
}
211
182
}
183
+
184
+ func (bs * Bitswap ) providerQueryManager (ctx context.Context ) {
185
+ var activeLk sync.Mutex
186
+ active := make (map [key.Key ]* wantlist.Entry )
187
+
188
+ for {
189
+ select {
190
+ case e := <- bs .findKeys :
191
+ activeLk .Lock ()
192
+ if _ , ok := active [e .Key ]; ok {
193
+ continue
194
+ }
195
+ active [e .Key ] = e
196
+ activeLk .Unlock ()
197
+
198
+ go func (e * wantlist.Entry ) {
199
+ child , cancel := context .WithTimeout (e .Ctx , providerRequestTimeout )
200
+ defer cancel ()
201
+ providers := bs .network .FindProvidersAsync (child , e .Key , maxProvidersPerRequest )
202
+ for p := range providers {
203
+ go func (p peer.ID ) {
204
+ err := bs .network .ConnectTo (child , p )
205
+ if err != nil {
206
+ log .Debug ("failed to connect to provider %s: %s" , p , err )
207
+ }
208
+ }(p )
209
+ }
210
+ activeLk .Lock ()
211
+ delete (active , e .Key )
212
+ activeLk .Unlock ()
213
+ }(e )
214
+
215
+ case <- ctx .Done ():
216
+ return
217
+ }
218
+ }
219
+ }
0 commit comments