Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5c05b27

Browse files
committedNov 28, 2015
make bitswap use provide many
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent 6c42ba3 commit 5c05b27

File tree

6 files changed

+57
-25
lines changed

6 files changed

+57
-25
lines changed
 

‎exchange/bitswap/bitswap.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
7878
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
7979
process: px,
8080
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
81-
provideKeys: make(chan key.Key, provideKeysBufferSize),
81+
provideKeys: make(chan []key.Key, provideKeysBufferSize),
8282
wm: NewWantManager(ctx, network),
8383
}
8484
go bs.wm.Run()
@@ -126,7 +126,7 @@ type Bitswap struct {
126126

127127
newBlocks chan *blocks.Block
128128

129-
provideKeys chan key.Key
129+
provideKeys chan []key.Key
130130

131131
counterLk sync.Mutex
132132
blocksRecvd int

‎exchange/bitswap/network/interface.go

+3
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,7 @@ type Routing interface {
4848

4949
// Provide provides the key to the network
5050
Provide(context.Context, key.Key) error
51+
52+
// Provide multiple keys to the network at the same time, sharing RPCs
53+
ProvideMany(context.Context, []key.Key) error
5154
}

‎exchange/bitswap/network/ipfs_impl.go

+5
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ func (bsnet *impl) Provide(ctx context.Context, k key.Key) error {
142142
return bsnet.routing.Provide(ctx, k)
143143
}
144144

145+
// Provide provides the key to the network
146+
func (bsnet *impl) ProvideMany(ctx context.Context, ks []key.Key) error {
147+
return bsnet.routing.ProvideMany(ctx, ks)
148+
}
149+
145150
// handleNewStream receives a new stream from the network.
146151
func (bsnet *impl) handleNewStream(s inet.Stream) {
147152
defer s.Close()

‎exchange/bitswap/testnet/virtual.go

+5
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
117117
return nc.routing.Provide(ctx, k)
118118
}
119119

120+
// Provide provides the key to the network
121+
func (nc *networkClient) ProvideMany(ctx context.Context, ks []key.Key) error {
122+
return nc.routing.ProvideMany(ctx, ks)
123+
}
124+
120125
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
121126
nc.Receiver = r
122127
}

‎exchange/bitswap/workers.go

+33-21
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,20 @@ func (bs *Bitswap) provideWorker(px process.Process) {
7575

7676
limit := make(chan struct{}, provideWorkerMax)
7777

78-
limitedGoProvide := func(k key.Key, wid int) {
78+
limitedGoProvide := func(ks []key.Key, wid int) {
7979
defer func() {
8080
// replace token when done
8181
<-limit
8282
}()
8383
ev := logging.LoggableMap{"ID": wid}
8484

8585
ctx := procctx.OnClosingContext(px) // derive ctx from px
86-
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
86+
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev).Done()
8787

8888
ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
8989
defer cancel()
9090

91-
if err := bs.network.Provide(ctx, k); err != nil {
91+
if err := bs.network.ProvideMany(ctx, ks); err != nil {
9292
log.Error(err)
9393
}
9494
}
@@ -102,7 +102,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
102102
select {
103103
case <-px.Closing():
104104
return
105-
case k, ok := <-bs.provideKeys:
105+
case ks, ok := <-bs.provideKeys:
106106
if !ok {
107107
log.Debug("provideKeys channel closed")
108108
return
@@ -111,38 +111,50 @@ func (bs *Bitswap) provideWorker(px process.Process) {
111111
case <-px.Closing():
112112
return
113113
case limit <- struct{}{}:
114-
go limitedGoProvide(k, wid)
114+
go limitedGoProvide(ks, wid)
115115
}
116116
}
117117
}
118118
}
119119

120+
var batchProvideSize = 50
121+
var batchTimeout = time.Second
122+
120123
func (bs *Bitswap) provideCollector(ctx context.Context) {
121-
defer close(bs.provideKeys)
122-
var toProvide []key.Key
123-
var nextKey key.Key
124-
var keysOut chan key.Key
124+
var keys []key.Key
125+
var keysOut chan []key.Key
126+
var timer *time.Timer
127+
var timerChan <-chan time.Time
125128

126129
for {
127130
select {
128-
case blk, ok := <-bs.newBlocks:
131+
case blk, ok := <-bs.newBlocks: // TODO: only send keys down this channel
129132
if !ok {
130-
log.Debug("newBlocks channel closed")
131133
return
132134
}
133-
if keysOut == nil {
134-
nextKey = blk.Key()
135+
keys = append(keys, blk.Key())
136+
137+
if len(keys) >= batchProvideSize {
135138
keysOut = bs.provideKeys
136-
} else {
137-
toProvide = append(toProvide, blk.Key())
138139
}
139-
case keysOut <- nextKey:
140-
if len(toProvide) > 0 {
141-
nextKey = toProvide[0]
142-
toProvide = toProvide[1:]
143-
} else {
144-
keysOut = nil
140+
141+
if timer != nil {
142+
timer.Stop()
143+
timer = time.NewTimer(batchTimeout)
144+
timerChan = timer.C
145145
}
146+
147+
case <-timerChan:
148+
if len(keys) == 0 {
149+
timer.Stop()
150+
continue
151+
}
152+
keysOut = bs.provideKeys
153+
154+
case keysOut <- keys:
155+
keysOut = nil
156+
keys = nil
157+
146158
case <-ctx.Done():
147159
return
148160
}

‎routing/dht/provide_many_test.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dht
33
import (
44
"fmt"
55
"testing"
6+
"time"
67

78
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
89

@@ -31,12 +32,15 @@ func TestProvideMany(t *testing.T) {
3132
t.Fatal(err)
3233
}
3334

35+
// sleep a small amount to make sure messages all arrive
36+
time.Sleep(time.Millisecond * 100)
37+
3438
// in this setup (len(dhts) == 10), every node should know every provider
35-
for _, d := range dhts {
39+
for i, d := range dhts {
3640
for _, k := range keys {
3741
pids := d.providers.GetProviders(ctx, k)
3842
if len(pids) != 1 {
39-
t.Fatalf("expected 1 provider for %s, got %d", k, len(pids))
43+
t.Fatalf("[%d] expected 1 provider for %s, got %d", i, k, len(pids))
4044
}
4145
}
4246
}
@@ -68,6 +72,9 @@ func TestProvideManyOld(t *testing.T) {
6872
}
6973
}
7074

75+
// sleep a small amount to make sure messages all arrive
76+
time.Sleep(time.Millisecond * 100)
77+
7178
// in this setup (len(dhts) == 10), every node should know every provider
7279
for i, d := range dhts {
7380
for _, k := range keys {

0 commit comments

Comments
 (0)
Please sign in to comment.