Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 4f019d2

Browse files
committedJul 8, 2015
address concerns from PR
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent 18688a2 commit 4f019d2

File tree

2 files changed

+81
-74
lines changed

2 files changed

+81
-74
lines changed
 

‎merkledag/merkledag.go

+68-58
Original file line numberDiff line numberDiff line change
@@ -111,56 +111,29 @@ func (n *dagService) Remove(nd *Node) error {
111111

112112
// FetchGraph fetches all nodes that are children of the given node
113113
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
114-
toprocess := make(chan []key.Key, 8)
115-
nodes := make(chan *Node, 8)
116-
errs := make(chan error, 1)
117-
118-
ctx, cancel := context.WithCancel(ctx)
119-
defer cancel()
120-
defer close(toprocess)
121-
122-
go fetchNodes(ctx, serv, toprocess, nodes, errs)
123-
124-
nodes <- root
125-
live := 1
126-
127-
for {
128-
select {
129-
case nd, ok := <-nodes:
130-
if !ok {
131-
return nil
132-
}
133-
134-
var keys []key.Key
135-
for _, lnk := range nd.Links {
136-
keys = append(keys, key.Key(lnk.Hash))
137-
}
138-
keys = dedupeKeys(keys)
114+
return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet())
115+
}
139116

140-
// keep track of open request, when zero, we're done
141-
live += len(keys) - 1
117+
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
118+
defer close(out)
142119

143-
if live == 0 {
144-
return nil
120+
get := func(g NodeGetter) {
121+
nd, err := g.Get(ctx)
122+
if err != nil {
123+
select {
124+
case errs <- err:
125+
case <-ctx.Done():
145126
}
127+
return
128+
}
146129

147-
if len(keys) > 0 {
148-
select {
149-
case toprocess <- keys:
150-
case <-ctx.Done():
151-
return ctx.Err()
152-
}
153-
}
154-
case err := <-errs:
155-
return err
130+
select {
131+
case out <- nd:
156132
case <-ctx.Done():
157-
return ctx.Err()
133+
return
158134
}
159135
}
160-
}
161136

162-
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
163-
defer close(out)
164137
for {
165138
select {
166139
case ks, ok := <-in:
@@ -170,22 +143,7 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out cha
170143

171144
ng := ds.GetNodes(ctx, ks)
172145
for _, g := range ng {
173-
go func(g NodeGetter) {
174-
nd, err := g.Get(ctx)
175-
if err != nil {
176-
select {
177-
case errs <- err:
178-
case <-ctx.Done():
179-
}
180-
return
181-
}
182-
183-
select {
184-
case out <- nd:
185-
case <-ctx.Done():
186-
return
187-
}
188-
}(g)
146+
go get(g)
189147
}
190148
}
191149
}
@@ -334,3 +292,55 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K
334292
}
335293
return nil
336294
}
295+
296+
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
297+
toprocess := make(chan []key.Key, 8)
298+
nodes := make(chan *Node, 8)
299+
errs := make(chan error, 1)
300+
301+
ctx, cancel := context.WithCancel(ctx)
302+
defer cancel()
303+
defer close(toprocess)
304+
305+
go fetchNodes(ctx, ds, toprocess, nodes, errs)
306+
307+
nodes <- root
308+
live := 1
309+
310+
for {
311+
select {
312+
case nd, ok := <-nodes:
313+
if !ok {
314+
return nil
315+
}
316+
// a node has been fetched
317+
live--
318+
319+
var keys []key.Key
320+
for _, lnk := range nd.Links {
321+
k := key.Key(lnk.Hash)
322+
if !set.Has(k) {
323+
set.Add(k)
324+
live++
325+
keys = append(keys, k)
326+
}
327+
}
328+
329+
if live == 0 {
330+
return nil
331+
}
332+
333+
if len(keys) > 0 {
334+
select {
335+
case toprocess <- keys:
336+
case <-ctx.Done():
337+
return ctx.Err()
338+
}
339+
}
340+
case err := <-errs:
341+
return err
342+
case <-ctx.Done():
343+
return ctx.Err()
344+
}
345+
}
346+
}

‎merkledag/merkledag_test.go

+13-16
Original file line numberDiff line numberDiff line change
@@ -216,38 +216,35 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
216216
}
217217

218218
func TestFetchGraph(t *testing.T) {
219-
bsi := bstest.Mocks(t, 1)[0]
220-
ds := NewDAGService(bsi)
219+
var dservs []DAGService
220+
bsis := bstest.Mocks(t, 2)
221+
for _, bsi := range bsis {
222+
dservs = append(dservs, NewDAGService(bsi))
223+
}
221224

222225
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
223226
spl := &chunk.SizeSplitter{512}
224227

225-
root, err := imp.BuildDagFromReader(read, ds, spl, nil)
228+
root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil)
226229
if err != nil {
227230
t.Fatal(err)
228231
}
229232

230-
err = FetchGraph(context.TODO(), root, ds)
233+
err = FetchGraph(context.TODO(), root, dservs[1])
231234
if err != nil {
232235
t.Fatal(err)
233236
}
234-
}
235-
236-
func TestFetchGraphOther(t *testing.T) {
237-
var dservs []DAGService
238-
for _, bsi := range bstest.Mocks(t, 2) {
239-
dservs = append(dservs, NewDAGService(bsi))
240-
}
241-
242-
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
243-
spl := &chunk.SizeSplitter{512}
244237

245-
root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil)
238+
// create an offline dagstore and ensure all blocks were fetched
239+
bs, err := bserv.New(bsis[1].Blockstore, offline.Exchange(bsis[1].Blockstore))
246240
if err != nil {
247241
t.Fatal(err)
248242
}
249243

250-
err = FetchGraph(context.TODO(), root, dservs[1])
244+
offline_ds := NewDAGService(bs)
245+
ks := key.NewKeySet()
246+
247+
err = EnumerateChildren(context.Background(), offline_ds, root, ks)
251248
if err != nil {
252249
t.Fatal(err)
253250
}

0 commit comments

Comments
 (0)
Please sign in to comment.