@@ -111,56 +111,29 @@ func (n *dagService) Remove(nd *Node) error {
111
111
112
112
// FetchGraph fetches all nodes that are children of the given node
113
113
func FetchGraph (ctx context.Context , root * Node , serv DAGService ) error {
114
- toprocess := make (chan []key.Key , 8 )
115
- nodes := make (chan * Node , 8 )
116
- errs := make (chan error , 1 )
117
-
118
- ctx , cancel := context .WithCancel (ctx )
119
- defer cancel ()
120
- defer close (toprocess )
121
-
122
- go fetchNodes (ctx , serv , toprocess , nodes , errs )
123
-
124
- nodes <- root
125
- live := 1
126
-
127
- for {
128
- select {
129
- case nd , ok := <- nodes :
130
- if ! ok {
131
- return nil
132
- }
133
-
134
- var keys []key.Key
135
- for _ , lnk := range nd .Links {
136
- keys = append (keys , key .Key (lnk .Hash ))
137
- }
138
- keys = dedupeKeys (keys )
114
+ return EnumerateChildrenAsync (ctx , serv , root , key .NewKeySet ())
115
+ }
139
116
140
- // keep track of open request, when zero, we're done
141
- live += len ( keys ) - 1
117
+ func fetchNodes ( ctx context. Context , ds DAGService , in <- chan []key. Key , out chan <- * Node , errs chan <- error ) {
118
+ defer close ( out )
142
119
143
- if live == 0 {
144
- return nil
120
+ get := func (g NodeGetter ) {
121
+ nd , err := g .Get (ctx )
122
+ if err != nil {
123
+ select {
124
+ case errs <- err :
125
+ case <- ctx .Done ():
145
126
}
127
+ return
128
+ }
146
129
147
- if len (keys ) > 0 {
148
- select {
149
- case toprocess <- keys :
150
- case <- ctx .Done ():
151
- return ctx .Err ()
152
- }
153
- }
154
- case err := <- errs :
155
- return err
130
+ select {
131
+ case out <- nd :
156
132
case <- ctx .Done ():
157
- return ctx . Err ()
133
+ return
158
134
}
159
135
}
160
- }
161
136
162
- func fetchNodes (ctx context.Context , ds DAGService , in <- chan []key.Key , out chan <- * Node , errs chan <- error ) {
163
- defer close (out )
164
137
for {
165
138
select {
166
139
case ks , ok := <- in :
@@ -170,22 +143,7 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out cha
170
143
171
144
ng := ds .GetNodes (ctx , ks )
172
145
for _ , g := range ng {
173
- go func (g NodeGetter ) {
174
- nd , err := g .Get (ctx )
175
- if err != nil {
176
- select {
177
- case errs <- err :
178
- case <- ctx .Done ():
179
- }
180
- return
181
- }
182
-
183
- select {
184
- case out <- nd :
185
- case <- ctx .Done ():
186
- return
187
- }
188
- }(g )
146
+ go get (g )
189
147
}
190
148
}
191
149
}
@@ -334,3 +292,55 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K
334
292
}
335
293
return nil
336
294
}
295
+
296
+ func EnumerateChildrenAsync (ctx context.Context , ds DAGService , root * Node , set key.KeySet ) error {
297
+ toprocess := make (chan []key.Key , 8 )
298
+ nodes := make (chan * Node , 8 )
299
+ errs := make (chan error , 1 )
300
+
301
+ ctx , cancel := context .WithCancel (ctx )
302
+ defer cancel ()
303
+ defer close (toprocess )
304
+
305
+ go fetchNodes (ctx , ds , toprocess , nodes , errs )
306
+
307
+ nodes <- root
308
+ live := 1
309
+
310
+ for {
311
+ select {
312
+ case nd , ok := <- nodes :
313
+ if ! ok {
314
+ return nil
315
+ }
316
+ // a node has been fetched
317
+ live --
318
+
319
+ var keys []key.Key
320
+ for _ , lnk := range nd .Links {
321
+ k := key .Key (lnk .Hash )
322
+ if ! set .Has (k ) {
323
+ set .Add (k )
324
+ live ++
325
+ keys = append (keys , k )
326
+ }
327
+ }
328
+
329
+ if live == 0 {
330
+ return nil
331
+ }
332
+
333
+ if len (keys ) > 0 {
334
+ select {
335
+ case toprocess <- keys :
336
+ case <- ctx .Done ():
337
+ return ctx .Err ()
338
+ }
339
+ }
340
+ case err := <- errs :
341
+ return err
342
+ case <- ctx .Done ():
343
+ return ctx .Err ()
344
+ }
345
+ }
346
+ }
0 commit comments