Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
cleanup, use a workgroup over channels
  • Loading branch information
whyrusleeping committed Dec 1, 2014
1 parent a65a9eb commit 618a4c7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
11 changes: 6 additions & 5 deletions exchange/bitswap/bitswap.go
Expand Up @@ -3,6 +3,7 @@
package bitswap

import (
"sync"
"time"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Expand Down Expand Up @@ -180,21 +181,20 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
}

func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
done := make(chan struct{})
wg := sync.WaitGroup{}
for _, k := range ks {
wg.Add(1)
go func(k u.Key) {
providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)

err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
done <- struct{}{}
wg.Done()
}(k)
}
for _ = range ks {
<-done
}
wg.Wait()
}

// TODO ensure only one active request per key
Expand Down Expand Up @@ -255,6 +255,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return bs.routing.Provide(ctx, blk.Key())
}

// receiveBlock handles storing the block in the blockstore and calling HasBlock
func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) {
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions merkledag/merkledag.go
Expand Up @@ -28,7 +28,10 @@ type DAGService interface {
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
GetKeysAsync(context.Context, *Node) <-chan *Node

// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG(context.Context, *Node) <-chan *Node
}

func NewDAGService(bs *bserv.BlockService) DAGService {
Expand Down Expand Up @@ -298,10 +301,10 @@ func FindLink(n *Node, k u.Key, found []*Node) (int, error) {
return -1, u.ErrNotFound
}

// GetKeysAsync will fill out all of the links of the given Node.
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetKeysAsync(ctx context.Context, root *Node) <-chan *Node {
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
sig := make(chan *Node)
go func() {
var keys []u.Key
Expand Down
3 changes: 2 additions & 1 deletion unixfs/io/dagreader.go
Expand Up @@ -40,7 +40,7 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
case ftpb.Data_File:
var fetchChan <-chan *mdag.Node
if serv != nil {
fetchChan = serv.GetKeysAsync(context.TODO(), n)
fetchChan = serv.GetDAG(context.TODO(), n)
}
return &DagReader{
node: n,
Expand All @@ -62,6 +62,7 @@ func (dr *DagReader) precalcNextBuf() error {
var nxt *mdag.Node
var ok bool

// TODO: require non-nil dagservice, use offline bitswap exchange
if dr.serv == nil {
// Only used when fetchChan is nil,
// which only happens when passed in a nil dagservice
Expand Down

0 comments on commit 618a4c7

Please sign in to comment.