Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b799a16

Browse files
committedAug 7, 2015
work chunking interface changes up into importer
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent 3c0fbd7 commit b799a16

14 files changed

+185
-173
lines changed
 

‎core/commands/add.go

+63-7
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package commands
22

33
import (
4+
"errors"
45
"fmt"
56
"io"
67
"path"
8+
"strconv"
9+
"strings"
710

811
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
912

@@ -31,6 +34,7 @@ const (
3134
wrapOptionName = "wrap-with-directory"
3235
hiddenOptionName = "hidden"
3336
onlyHashOptionName = "only-hash"
37+
chunkerOptionName = "chunker"
3438
)
3539

3640
type AddedObject struct {
@@ -61,6 +65,7 @@ remains to be implemented.
6165
cmds.BoolOption(onlyHashOptionName, "n", "Only chunk and hash - do not write to disk"),
6266
cmds.BoolOption(wrapOptionName, "w", "Wrap files with a directory object"),
6367
cmds.BoolOption(hiddenOptionName, "Include files that are hidden"),
68+
cmds.StringOption(chunkerOptionName, "c", "chunking algorithm to use"),
6469
},
6570
PreRun: func(req cmds.Request) error {
6671
if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet {
@@ -97,6 +102,7 @@ remains to be implemented.
97102
wrap, _, _ := req.Option(wrapOptionName).Bool()
98103
hash, _, _ := req.Option(onlyHashOptionName).Bool()
99104
hidden, _, _ := req.Option(hiddenOptionName).Bool()
105+
chunker, _, _ := req.Option(chunkerOptionName).String()
100106

101107
if hash {
102108
nilnode, err := core.NewNodeBuilder().NilRepo().Build(n.Context())
@@ -118,6 +124,7 @@ remains to be implemented.
118124
progress: progress,
119125
hidden: hidden,
120126
trickle: trickle,
127+
chunker: chunker,
121128
}
122129

123130
rootnd, err := addParams.addFile(file)
@@ -265,24 +272,73 @@ type adder struct {
265272
progress bool
266273
hidden bool
267274
trickle bool
275+
chunker string
276+
}
277+
278+
func getChunker(r io.Reader, chunker string) (chunk.Splitter, error) {
279+
switch {
280+
case chunker == "":
281+
return chunk.NewSizeSplitter(r, chunk.DefaultBlockSize), nil
282+
case strings.HasPrefix(chunker, "size-"):
283+
sizeStr := strings.Split(chunker, "-")[1]
284+
size, err := strconv.Atoi(sizeStr)
285+
if err != nil {
286+
return nil, err
287+
}
288+
289+
return chunk.NewSizeSplitter(r, int64(size)), nil
290+
case strings.HasPrefix(chunker, "rabin"):
291+
parts := strings.Split(chunker, "-")
292+
switch len(parts) {
293+
case 1:
294+
return chunk.NewRabin(r, uint64(chunk.DefaultBlockSize)), nil
295+
case 2:
296+
size, err := strconv.Atoi(parts[1])
297+
if err != nil {
298+
return nil, err
299+
}
300+
return chunk.NewRabin(r, uint64(size)), nil
301+
case 4:
302+
min, err := strconv.Atoi(parts[1])
303+
if err != nil {
304+
return nil, err
305+
}
306+
avg, err := strconv.Atoi(parts[2])
307+
if err != nil {
308+
return nil, err
309+
}
310+
max, err := strconv.Atoi(parts[3])
311+
if err != nil {
312+
return nil, err
313+
}
314+
315+
return chunk.NewRabinMinMax(r, uint64(min), uint64(avg), uint64(max)), nil
316+
default:
317+
return nil, errors.New("incorrect format (expected 'rabin' 'rabin-[avg]' or 'rabin-[min]-[avg]-[max]'")
318+
}
319+
default:
320+
return nil, fmt.Errorf("unrecognized chunker option: %s", chunker)
321+
}
268322
}
269323

270324
// Perform the actual add & pin locally, outputting results to reader
271-
func add(n *core.IpfsNode, reader io.Reader, useTrickle bool) (*dag.Node, error) {
325+
func add(n *core.IpfsNode, reader io.Reader, useTrickle bool, chunker string) (*dag.Node, error) {
326+
chnk, err := getChunker(reader, chunker)
327+
if err != nil {
328+
return nil, err
329+
}
330+
272331
var node *dag.Node
273-
var err error
274332
if useTrickle {
275333
node, err = importer.BuildTrickleDagFromReader(
276-
reader,
277334
n.DAG,
278-
chunk.DefaultSplitter,
335+
chnk,
279336
importer.PinIndirectCB(n.Pinning.GetManual()),
280337
)
281338
} else {
282339
node, err = importer.BuildDagFromReader(
283-
reader,
284340
n.DAG,
285-
chunk.DefaultSplitter,
341+
chnk,
286342
importer.PinIndirectCB(n.Pinning.GetManual()),
287343
)
288344
}
@@ -314,7 +370,7 @@ func (params *adder) addFile(file files.File) (*dag.Node, error) {
314370
reader = &progressReader{file: file, out: params.out}
315371
}
316372

317-
dagnode, err := add(params.node, reader, params.trickle)
373+
dagnode, err := add(params.node, reader, params.trickle, params.chunker)
318374
if err != nil {
319375
return nil, err
320376
}

‎core/corehttp/gateway_handler.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) {
4646
// TODO(cryptix): change and remove this helper once PR1136 is merged
4747
// return ufs.AddFromReader(i.node, r.Body)
4848
return importer.BuildDagFromReader(
49-
r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning.GetManual()))
49+
i.node.DAG,
50+
chunk.DefaultSplitter(r),
51+
importer.BasicPinnerCB(i.node.Pinning.GetManual()))
5052
}
5153

5254
// TODO(btc): break this apart into separate handlers using a more expressive muxer

‎core/coreunix/add.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ var log = eventlog.Logger("coreunix")
2525
// datastore. Returns a key representing the root node.
2626
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
2727
// TODO more attractive function signature importer.BuildDagFromReader
28+
2829
dagNode, err := importer.BuildDagFromReader(
29-
r,
3030
n.DAG,
31-
chunk.DefaultSplitter,
31+
chunk.NewSizeSplitter(r, chunk.DefaultBlockSize),
3232
importer.BasicPinnerCB(n.Pinning.GetManual()),
3333
)
3434
if err != nil {
@@ -96,9 +96,8 @@ func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) {
9696
mp := n.Pinning.GetManual()
9797

9898
node, err := importer.BuildDagFromReader(
99-
reader,
10099
n.DAG,
101-
chunk.DefaultSplitter,
100+
chunk.DefaultSplitter(reader),
102101
importer.PinIndirectCB(mp),
103102
)
104103
if err != nil {

‎core/coreunix/metadata_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestMetadata(t *testing.T) {
3838
data := make([]byte, 1000)
3939
u.NewTimeSeededRand().Read(data)
4040
r := bytes.NewReader(data)
41-
nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
41+
nd, err := importer.BuildDagFromReader(ds, chunk.DefaultSplitter(r), nil)
4242
if err != nil {
4343
t.Fatal(err)
4444
}

‎importer/balanced/balanced_test.go

+34-85
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,18 @@ import (
1212
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1313
chunk "github.com/ipfs/go-ipfs/importer/chunk"
1414
h "github.com/ipfs/go-ipfs/importer/helpers"
15-
merkledag "github.com/ipfs/go-ipfs/merkledag"
15+
dag "github.com/ipfs/go-ipfs/merkledag"
1616
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
1717
pin "github.com/ipfs/go-ipfs/pin"
1818
uio "github.com/ipfs/go-ipfs/unixfs/io"
1919
u "github.com/ipfs/go-ipfs/util"
2020
)
2121

22-
func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter) (*merkledag.Node, error) {
22+
// TODO: extract these tests and more as a generic layout test suite
23+
24+
func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.Node, error) {
2325
// Start the splitter
24-
blkch, errs := spl.Split(r)
26+
blkch, errs := chunk.Chan(spl)
2527

2628
dbp := h.DagBuilderParams{
2729
Dagserv: ds,
@@ -31,14 +33,28 @@ func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter)
3133
return BalancedLayout(dbp.New(blkch, errs))
3234
}
3335

36+
func getTestDag(t *testing.T, ds dag.DAGService, size int64, blksize int64) (*dag.Node, []byte) {
37+
data := make([]byte, size)
38+
u.NewTimeSeededRand().Read(data)
39+
r := bytes.NewReader(data)
40+
41+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(r, blksize))
42+
if err != nil {
43+
t.Fatal(err)
44+
}
45+
46+
return nd, data
47+
}
48+
3449
//Test where calls to read are smaller than the chunk size
3550
func TestSizeBasedSplit(t *testing.T) {
3651
if testing.Short() {
3752
t.SkipNow()
3853
}
39-
bs := &chunk.SizeSplitter{Size: 512}
54+
55+
bs := chunk.SizeSplitterGen(512)
4056
testFileConsistency(t, bs, 32*512)
41-
bs = &chunk.SizeSplitter{Size: 4096}
57+
bs = chunk.SizeSplitterGen(4096)
4258
testFileConsistency(t, bs, 32*4096)
4359

4460
// Uneven offset
@@ -51,13 +67,13 @@ func dup(b []byte) []byte {
5167
return o
5268
}
5369

54-
func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
70+
func testFileConsistency(t *testing.T, bs chunk.SplitterGen, nbytes int) {
5571
should := make([]byte, nbytes)
5672
u.NewTimeSeededRand().Read(should)
5773

5874
read := bytes.NewReader(should)
5975
ds := mdtest.Mock(t)
60-
nd, err := buildTestDag(read, ds, bs)
76+
nd, err := buildTestDag(ds, bs(read))
6177
if err != nil {
6278
t.Fatal(err)
6379
}
@@ -79,15 +95,9 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
7995
}
8096

8197
func TestBuilderConsistency(t *testing.T) {
82-
nbytes := 100000
83-
buf := new(bytes.Buffer)
84-
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
85-
should := dup(buf.Bytes())
8698
dagserv := mdtest.Mock(t)
87-
nd, err := buildTestDag(buf, dagserv, chunk.DefaultSplitter)
88-
if err != nil {
89-
t.Fatal(err)
90-
}
99+
nd, should := getTestDag(t, dagserv, 100000, chunk.DefaultBlockSize)
100+
91101
r, err := uio.NewDagReader(context.Background(), nd, dagserv)
92102
if err != nil {
93103
t.Fatal(err)
@@ -117,23 +127,13 @@ func arrComp(a, b []byte) error {
117127
}
118128

119129
type dagservAndPinner struct {
120-
ds merkledag.DAGService
130+
ds dag.DAGService
121131
mp pin.ManualPinner
122132
}
123133

124134
func TestIndirectBlocks(t *testing.T) {
125-
splitter := &chunk.SizeSplitter{512}
126-
nbytes := 1024 * 1024
127-
buf := make([]byte, nbytes)
128-
u.NewTimeSeededRand().Read(buf)
129-
130-
read := bytes.NewReader(buf)
131-
132135
ds := mdtest.Mock(t)
133-
dag, err := buildTestDag(read, ds, splitter)
134-
if err != nil {
135-
t.Fatal(err)
136-
}
136+
dag, buf := getTestDag(t, ds, 1024*1024, 512)
137137

138138
reader, err := uio.NewDagReader(context.Background(), dag, ds)
139139
if err != nil {
@@ -152,15 +152,8 @@ func TestIndirectBlocks(t *testing.T) {
152152

153153
func TestSeekingBasic(t *testing.T) {
154154
nbytes := int64(10 * 1024)
155-
should := make([]byte, nbytes)
156-
u.NewTimeSeededRand().Read(should)
157-
158-
read := bytes.NewReader(should)
159155
ds := mdtest.Mock(t)
160-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
161-
if err != nil {
162-
t.Fatal(err)
163-
}
156+
nd, should := getTestDag(t, ds, nbytes, 500)
164157

165158
rs, err := uio.NewDagReader(context.Background(), nd, ds)
166159
if err != nil {
@@ -188,16 +181,8 @@ func TestSeekingBasic(t *testing.T) {
188181
}
189182

190183
func TestSeekToBegin(t *testing.T) {
191-
nbytes := int64(10 * 1024)
192-
should := make([]byte, nbytes)
193-
u.NewTimeSeededRand().Read(should)
194-
195-
read := bytes.NewReader(should)
196184
ds := mdtest.Mock(t)
197-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
198-
if err != nil {
199-
t.Fatal(err)
200-
}
185+
nd, should := getTestDag(t, ds, 10*1024, 500)
201186

202187
rs, err := uio.NewDagReader(context.Background(), nd, ds)
203188
if err != nil {
@@ -232,16 +217,8 @@ func TestSeekToBegin(t *testing.T) {
232217
}
233218

234219
func TestSeekToAlmostBegin(t *testing.T) {
235-
nbytes := int64(10 * 1024)
236-
should := make([]byte, nbytes)
237-
u.NewTimeSeededRand().Read(should)
238-
239-
read := bytes.NewReader(should)
240220
ds := mdtest.Mock(t)
241-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
242-
if err != nil {
243-
t.Fatal(err)
244-
}
221+
nd, should := getTestDag(t, ds, 10*1024, 500)
245222

246223
rs, err := uio.NewDagReader(context.Background(), nd, ds)
247224
if err != nil {
@@ -277,15 +254,8 @@ func TestSeekToAlmostBegin(t *testing.T) {
277254

278255
func TestSeekEnd(t *testing.T) {
279256
nbytes := int64(50 * 1024)
280-
should := make([]byte, nbytes)
281-
u.NewTimeSeededRand().Read(should)
282-
283-
read := bytes.NewReader(should)
284257
ds := mdtest.Mock(t)
285-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
286-
if err != nil {
287-
t.Fatal(err)
288-
}
258+
nd, _ := getTestDag(t, ds, nbytes, 500)
289259

290260
rs, err := uio.NewDagReader(context.Background(), nd, ds)
291261
if err != nil {
@@ -303,15 +273,8 @@ func TestSeekEnd(t *testing.T) {
303273

304274
func TestSeekEndSingleBlockFile(t *testing.T) {
305275
nbytes := int64(100)
306-
should := make([]byte, nbytes)
307-
u.NewTimeSeededRand().Read(should)
308-
309-
read := bytes.NewReader(should)
310276
ds := mdtest.Mock(t)
311-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{5000})
312-
if err != nil {
313-
t.Fatal(err)
314-
}
277+
nd, _ := getTestDag(t, ds, nbytes, 5000)
315278

316279
rs, err := uio.NewDagReader(context.Background(), nd, ds)
317280
if err != nil {
@@ -329,15 +292,8 @@ func TestSeekEndSingleBlockFile(t *testing.T) {
329292

330293
func TestSeekingStress(t *testing.T) {
331294
nbytes := int64(1024 * 1024)
332-
should := make([]byte, nbytes)
333-
u.NewTimeSeededRand().Read(should)
334-
335-
read := bytes.NewReader(should)
336295
ds := mdtest.Mock(t)
337-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{1000})
338-
if err != nil {
339-
t.Fatal(err)
340-
}
296+
nd, should := getTestDag(t, ds, nbytes, 1000)
341297

342298
rs, err := uio.NewDagReader(context.Background(), nd, ds)
343299
if err != nil {
@@ -374,15 +330,8 @@ func TestSeekingStress(t *testing.T) {
374330

375331
func TestSeekingConsistency(t *testing.T) {
376332
nbytes := int64(128 * 1024)
377-
should := make([]byte, nbytes)
378-
u.NewTimeSeededRand().Read(should)
379-
380-
read := bytes.NewReader(should)
381333
ds := mdtest.Mock(t)
382-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
383-
if err != nil {
384-
t.Fatal(err)
385-
}
334+
nd, should := getTestDag(t, ds, nbytes, 500)
386335

387336
rs, err := uio.NewDagReader(context.Background(), nd, ds)
388337
if err != nil {

‎importer/chunk/rabin.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,15 @@ type Rabin struct {
1414
}
1515

1616
func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
17+
min := avgBlkSize / 3
18+
max := avgBlkSize + (avgBlkSize / 2)
19+
20+
return NewRabinMinMax(r, avgBlkSize, min, max)
21+
}
22+
23+
func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
1724
h := fnv.New32a()
18-
ch := chunker.New(r, IpfsRabinPoly, h, avgBlkSize, avgBlkSize/3, avgBlkSize+(avgBlkSize/2))
19-
ch.MinSize = avgBlkSize / 3 //tweaking to get a better average size
25+
ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max)
2026

2127
return &Rabin{
2228
r: ch,

‎importer/chunk/splitting.go

+8-14
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,22 @@ import (
99

1010
var log = util.Logger("chunk")
1111

12-
var DefaultBlockSize = 1024 * 256
13-
14-
// DEPRECATED!
15-
var DefaultSplitter = &SizeSplitter{Size: DefaultBlockSize}
12+
var DefaultBlockSize int64 = 1024 * 256
1613

1714
type Splitter interface {
1815
NextBytes() ([]byte, error)
1916
}
2017

21-
// BlockSplitter is being deprecated!
22-
type BlockSplitter interface {
23-
Split(r io.Reader) (<-chan []byte, <-chan error)
24-
}
18+
type SplitterGen func(r io.Reader) Splitter
2519

26-
// DEPRECATED!
27-
type SizeSplitter struct {
28-
Size int
20+
func DefaultSplitter(r io.Reader) Splitter {
21+
return NewSizeSplitter(r, DefaultBlockSize)
2922
}
3023

31-
// DEPRECATED!
32-
func (ss *SizeSplitter) Split(r io.Reader) (<-chan []byte, <-chan error) {
33-
return Chan(NewSizeSplitter(r, int64(ss.Size)))
24+
func SizeSplitterGen(size int64) SplitterGen {
25+
return func(r io.Reader) Splitter {
26+
return NewSizeSplitter(r, size)
27+
}
3428
}
3529

3630
func Chan(s Splitter) (<-chan []byte, <-chan error) {

‎importer/chunk/splitting_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func TestSizeSplitterIsDeterministic(t *testing.T) {
3232
bufA := copyBuf(bufR)
3333
bufB := copyBuf(bufR)
3434

35-
chunksA, _ := DefaultSplitter.Split(bytes.NewReader(bufA))
36-
chunksB, _ := DefaultSplitter.Split(bytes.NewReader(bufB))
35+
chunksA, _ := Chan(DefaultSplitter(bytes.NewReader(bufA)))
36+
chunksB, _ := Chan(DefaultSplitter(bytes.NewReader(bufB)))
3737

3838
for n := 0; ; n++ {
3939
a, moreA := <-chunksA
@@ -65,8 +65,8 @@ func TestSizeSplitterFillsChunks(t *testing.T) {
6565
max := 10000000
6666
b := randBuf(t, max)
6767
r := &clipReader{r: bytes.NewReader(b), size: 4000}
68-
s := SizeSplitter{Size: 1024 * 256}
69-
c, _ := s.Split(r)
68+
chunksize := int64(1024 * 256)
69+
c, _ := Chan(NewSizeSplitter(r, chunksize))
7070

7171
sofar := 0
7272
whole := make([]byte, max)
@@ -80,7 +80,7 @@ func TestSizeSplitterFillsChunks(t *testing.T) {
8080
copy(whole[sofar:], chunk)
8181

8282
sofar += len(chunk)
83-
if sofar != max && len(chunk) < s.Size {
83+
if sofar != max && len(chunk) < int(chunksize) {
8484
t.Fatal("sizesplitter split at a smaller size")
8585
}
8686
}

‎importer/importer.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package importer
44

55
import (
66
"fmt"
7-
"io"
87
"os"
98

109
bal "github.com/ipfs/go-ipfs/importer/balanced"
@@ -36,12 +35,12 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da
3635
}
3736
defer f.Close()
3837

39-
return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp))
38+
return BuildDagFromReader(ds, chunk.NewSizeSplitter(f, chunk.DefaultBlockSize), BasicPinnerCB(mp))
4039
}
4140

42-
func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) {
41+
func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter, ncb h.NodeCB) (*dag.Node, error) {
4342
// Start the splitter
44-
blkch, errch := spl.Split(r)
43+
blkch, errch := chunk.Chan(spl)
4544

4645
dbp := h.DagBuilderParams{
4746
Dagserv: ds,
@@ -52,9 +51,9 @@ func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter,
5251
return bal.BalancedLayout(dbp.New(blkch, errch))
5352
}
5453

55-
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) {
54+
func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter, ncb h.NodeCB) (*dag.Node, error) {
5655
// Start the splitter
57-
blkch, errch := spl.Split(r)
56+
blkch, errch := chunk.Chan(spl)
5857

5958
dbp := h.DagBuilderParams{
6059
Dagserv: ds,

‎importer/importer_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,20 @@ import (
1414
u "github.com/ipfs/go-ipfs/util"
1515
)
1616

17-
func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
17+
func getBalancedDag(t testing.TB, size int64, blksize int64) (*dag.Node, dag.DAGService) {
1818
ds := mdtest.Mock(t)
1919
r := io.LimitReader(u.NewTimeSeededRand(), size)
20-
nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
20+
nd, err := BuildDagFromReader(ds, chunk.NewSizeSplitter(r, blksize), nil)
2121
if err != nil {
2222
t.Fatal(err)
2323
}
2424
return nd, ds
2525
}
2626

27-
func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
27+
func getTrickleDag(t testing.TB, size int64, blksize int64) (*dag.Node, dag.DAGService) {
2828
ds := mdtest.Mock(t)
2929
r := io.LimitReader(u.NewTimeSeededRand(), size)
30-
nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
30+
nd, err := BuildTrickleDagFromReader(ds, chunk.NewSizeSplitter(r, blksize), nil)
3131
if err != nil {
3232
t.Fatal(err)
3333
}
@@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) {
4040
u.NewTimeSeededRand().Read(buf)
4141
r := bytes.NewReader(buf)
4242

43-
nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
43+
nd, err := BuildDagFromReader(ds, chunk.DefaultSplitter(r), nil)
4444
if err != nil {
4545
t.Fatal(err)
4646
}

‎importer/trickle/trickle_test.go

+26-25
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import (
2020
u "github.com/ipfs/go-ipfs/util"
2121
)
2222

23-
func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter) (*merkledag.Node, error) {
23+
func buildTestDag(ds merkledag.DAGService, spl chunk.Splitter) (*merkledag.Node, error) {
2424
// Start the splitter
25-
blkch, errs := spl.Split(r)
25+
blkch, errs := chunk.Chan(spl)
2626

2727
dbp := h.DagBuilderParams{
2828
Dagserv: ds,
@@ -42,9 +42,10 @@ func TestSizeBasedSplit(t *testing.T) {
4242
if testing.Short() {
4343
t.SkipNow()
4444
}
45-
bs := &chunk.SizeSplitter{Size: 512}
45+
bs := chunk.SizeSplitterGen(512)
4646
testFileConsistency(t, bs, 32*512)
47-
bs = &chunk.SizeSplitter{Size: 4096}
47+
48+
bs = chunk.SizeSplitterGen(4096)
4849
testFileConsistency(t, bs, 32*4096)
4950

5051
// Uneven offset
@@ -57,13 +58,13 @@ func dup(b []byte) []byte {
5758
return o
5859
}
5960

60-
func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
61+
func testFileConsistency(t *testing.T, bs chunk.SplitterGen, nbytes int) {
6162
should := make([]byte, nbytes)
6263
u.NewTimeSeededRand().Read(should)
6364

6465
read := bytes.NewReader(should)
6566
ds := mdtest.Mock(t)
66-
nd, err := buildTestDag(read, ds, bs)
67+
nd, err := buildTestDag(ds, bs(read))
6768
if err != nil {
6869
t.Fatal(err)
6970
}
@@ -90,7 +91,7 @@ func TestBuilderConsistency(t *testing.T) {
9091
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
9192
should := dup(buf.Bytes())
9293
dagserv := mdtest.Mock(t)
93-
nd, err := buildTestDag(buf, dagserv, chunk.DefaultSplitter)
94+
nd, err := buildTestDag(dagserv, chunk.DefaultSplitter(buf))
9495
if err != nil {
9596
t.Fatal(err)
9697
}
@@ -128,15 +129,15 @@ type dagservAndPinner struct {
128129
}
129130

130131
func TestIndirectBlocks(t *testing.T) {
131-
splitter := &chunk.SizeSplitter{512}
132+
splitter := chunk.SizeSplitterGen(512)
132133
nbytes := 1024 * 1024
133134
buf := make([]byte, nbytes)
134135
u.NewTimeSeededRand().Read(buf)
135136

136137
read := bytes.NewReader(buf)
137138

138139
ds := mdtest.Mock(t)
139-
dag, err := buildTestDag(read, ds, splitter)
140+
dag, err := buildTestDag(ds, splitter(read))
140141
if err != nil {
141142
t.Fatal(err)
142143
}
@@ -163,7 +164,7 @@ func TestSeekingBasic(t *testing.T) {
163164

164165
read := bytes.NewReader(should)
165166
ds := mdtest.Mock(t)
166-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
167+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 512))
167168
if err != nil {
168169
t.Fatal(err)
169170
}
@@ -200,7 +201,7 @@ func TestSeekToBegin(t *testing.T) {
200201

201202
read := bytes.NewReader(should)
202203
ds := mdtest.Mock(t)
203-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
204+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500))
204205
if err != nil {
205206
t.Fatal(err)
206207
}
@@ -244,7 +245,7 @@ func TestSeekToAlmostBegin(t *testing.T) {
244245

245246
read := bytes.NewReader(should)
246247
ds := mdtest.Mock(t)
247-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
248+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500))
248249
if err != nil {
249250
t.Fatal(err)
250251
}
@@ -288,7 +289,7 @@ func TestSeekEnd(t *testing.T) {
288289

289290
read := bytes.NewReader(should)
290291
ds := mdtest.Mock(t)
291-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
292+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500))
292293
if err != nil {
293294
t.Fatal(err)
294295
}
@@ -314,7 +315,7 @@ func TestSeekEndSingleBlockFile(t *testing.T) {
314315

315316
read := bytes.NewReader(should)
316317
ds := mdtest.Mock(t)
317-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{5000})
318+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 5000))
318319
if err != nil {
319320
t.Fatal(err)
320321
}
@@ -340,7 +341,7 @@ func TestSeekingStress(t *testing.T) {
340341

341342
read := bytes.NewReader(should)
342343
ds := mdtest.Mock(t)
343-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{1000})
344+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 1000))
344345
if err != nil {
345346
t.Fatal(err)
346347
}
@@ -385,7 +386,7 @@ func TestSeekingConsistency(t *testing.T) {
385386

386387
read := bytes.NewReader(should)
387388
ds := mdtest.Mock(t)
388-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
389+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500))
389390
if err != nil {
390391
t.Fatal(err)
391392
}
@@ -429,7 +430,7 @@ func TestAppend(t *testing.T) {
429430
// Reader for half the bytes
430431
read := bytes.NewReader(should[:nbytes/2])
431432
ds := mdtest.Mock(t)
432-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
433+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500))
433434
if err != nil {
434435
t.Fatal(err)
435436
}
@@ -439,8 +440,8 @@ func TestAppend(t *testing.T) {
439440
Maxlinks: h.DefaultLinksPerBlock,
440441
}
441442

442-
spl := &chunk.SizeSplitter{500}
443-
blks, errs := spl.Split(bytes.NewReader(should[nbytes/2:]))
443+
r := bytes.NewReader(should[nbytes/2:])
444+
blks, errs := chunk.Chan(chunk.NewSizeSplitter(r, 500))
444445

445446
nnode, err := TrickleAppend(nd, dbp.New(blks, errs))
446447
if err != nil {
@@ -478,7 +479,7 @@ func TestMultipleAppends(t *testing.T) {
478479
u.NewTimeSeededRand().Read(should)
479480

480481
read := bytes.NewReader(nil)
481-
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
482+
nd, err := buildTestDag(ds, chunk.NewSizeSplitter(read, 500))
482483
if err != nil {
483484
t.Fatal(err)
484485
}
@@ -488,10 +489,10 @@ func TestMultipleAppends(t *testing.T) {
488489
Maxlinks: 4,
489490
}
490491

491-
spl := &chunk.SizeSplitter{500}
492+
spl := chunk.SizeSplitterGen(500)
492493

493494
for i := 0; i < len(should); i++ {
494-
blks, errs := spl.Split(bytes.NewReader(should[i : i+1]))
495+
blks, errs := chunk.Chan(spl(bytes.NewReader(should[i : i+1])))
495496

496497
nnode, err := TrickleAppend(nd, dbp.New(blks, errs))
497498
if err != nil {
@@ -533,16 +534,16 @@ func TestAppendSingleBytesToEmpty(t *testing.T) {
533534
Maxlinks: 4,
534535
}
535536

536-
spl := &chunk.SizeSplitter{500}
537+
spl := chunk.SizeSplitterGen(500)
537538

538-
blks, errs := spl.Split(bytes.NewReader(data[:1]))
539+
blks, errs := chunk.Chan(spl(bytes.NewReader(data[:1])))
539540

540541
nnode, err := TrickleAppend(nd, dbp.New(blks, errs))
541542
if err != nil {
542543
t.Fatal(err)
543544
}
544545

545-
blks, errs = spl.Split(bytes.NewReader(data[1:]))
546+
blks, errs = chunk.Chan(spl(bytes.NewReader(data[1:])))
546547

547548
nnode, err = TrickleAppend(nnode, dbp.New(blks, errs))
548549
if err != nil {

‎merkledag/merkledag_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
163163
dagservs = append(dagservs, NewDAGService(bsi))
164164
}
165165

166-
spl := &chunk.SizeSplitter{512}
166+
spl := chunk.NewSizeSplitter(read, 512)
167167

168-
root, err := imp.BuildDagFromReader(read, dagservs[0], spl, nil)
168+
root, err := imp.BuildDagFromReader(dagservs[0], spl, nil)
169169
if err != nil {
170170
t.Fatal(err)
171171
}

‎unixfs/mod/dagmodifier.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type DagModifier struct {
4040
curNode *mdag.Node
4141
mp pin.ManualPinner
4242

43-
splitter chunk.BlockSplitter
43+
splitter chunk.SplitterGen
4444
ctx context.Context
4545
readCancel func()
4646

@@ -51,7 +51,7 @@ type DagModifier struct {
5151
read *uio.DagReader
5252
}
5353

54-
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*DagModifier, error) {
54+
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.SplitterGen) (*DagModifier, error) {
5555
return &DagModifier{
5656
curNode: from.Copy(),
5757
dagserv: serv,
@@ -106,9 +106,9 @@ func (zr zeroReader) Read(b []byte) (int, error) {
106106
// expandSparse grows the file with zero blocks of 4096
107107
// A small blocksize is chosen to aid in deduplication
108108
func (dm *DagModifier) expandSparse(size int64) error {
109-
spl := chunk.SizeSplitter{4096}
110109
r := io.LimitReader(zeroReader{}, size)
111-
blks, errs := spl.Split(r)
110+
spl := chunk.NewSizeSplitter(r, 4096)
111+
blks, errs := chunk.Chan(spl)
112112
nnode, err := dm.appendData(dm.curNode, blks, errs)
113113
if err != nil {
114114
return err
@@ -196,7 +196,7 @@ func (dm *DagModifier) Sync() error {
196196

197197
// need to write past end of current dag
198198
if !done {
199-
blks, errs := dm.splitter.Split(dm.wrBuf)
199+
blks, errs := chunk.Chan(dm.splitter(dm.wrBuf))
200200
nd, err = dm.appendData(dm.curNode, blks, errs)
201201
if err != nil {
202202
return err

‎unixfs/mod/dagmodifier_test.go

+17-11
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto
5353

5454
func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) {
5555
in := io.LimitReader(u.NewTimeSeededRand(), size)
56-
node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}, imp.BasicPinnerCB(pinner))
56+
node, err := imp.BuildTrickleDagFromReader(dserv, sizeSplitterGen(500)(in), imp.BasicPinnerCB(pinner))
5757
if err != nil {
5858
t.Fatal(err)
5959
}
@@ -117,13 +117,19 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier)
117117
return orig
118118
}
119119

120+
func sizeSplitterGen(size int64) chunk.SplitterGen {
121+
return func(r io.Reader) chunk.Splitter {
122+
return chunk.NewSizeSplitter(r, size)
123+
}
124+
}
125+
120126
func TestDagModifierBasic(t *testing.T) {
121127
dserv, pin := getMockDagServ(t)
122128
b, n := getNode(t, dserv, 50000, pin)
123129
ctx, cancel := context.WithCancel(context.Background())
124130
defer cancel()
125131

126-
dagmod, err := NewDagModifier(ctx, n, dserv, pin, &chunk.SizeSplitter{Size: 512})
132+
dagmod, err := NewDagModifier(ctx, n, dserv, pin, sizeSplitterGen(512))
127133
if err != nil {
128134
t.Fatal(err)
129135
}
@@ -178,7 +184,7 @@ func TestMultiWrite(t *testing.T) {
178184
ctx, cancel := context.WithCancel(context.Background())
179185
defer cancel()
180186

181-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
187+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
182188
if err != nil {
183189
t.Fatal(err)
184190
}
@@ -231,7 +237,7 @@ func TestMultiWriteAndFlush(t *testing.T) {
231237
ctx, cancel := context.WithCancel(context.Background())
232238
defer cancel()
233239

234-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
240+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
235241
if err != nil {
236242
t.Fatal(err)
237243
}
@@ -279,7 +285,7 @@ func TestWriteNewFile(t *testing.T) {
279285
ctx, cancel := context.WithCancel(context.Background())
280286
defer cancel()
281287

282-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
288+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
283289
if err != nil {
284290
t.Fatal(err)
285291
}
@@ -322,7 +328,7 @@ func TestMultiWriteCoal(t *testing.T) {
322328
ctx, cancel := context.WithCancel(context.Background())
323329
defer cancel()
324330

325-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
331+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
326332
if err != nil {
327333
t.Fatal(err)
328334
}
@@ -368,7 +374,7 @@ func TestLargeWriteChunks(t *testing.T) {
368374
ctx, cancel := context.WithCancel(context.Background())
369375
defer cancel()
370376

371-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
377+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
372378
if err != nil {
373379
t.Fatal(err)
374380
}
@@ -406,7 +412,7 @@ func TestDagTruncate(t *testing.T) {
406412
ctx, cancel := context.WithCancel(context.Background())
407413
defer cancel()
408414

409-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
415+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
410416
if err != nil {
411417
t.Fatal(err)
412418
}
@@ -437,7 +443,7 @@ func TestSparseWrite(t *testing.T) {
437443
ctx, cancel := context.WithCancel(context.Background())
438444
defer cancel()
439445

440-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
446+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
441447
if err != nil {
442448
t.Fatal(err)
443449
}
@@ -491,7 +497,7 @@ func TestCorrectPinning(t *testing.T) {
491497
ctx, cancel := context.WithCancel(context.Background())
492498
defer cancel()
493499

494-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
500+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
495501
if err != nil {
496502
t.Fatal(err)
497503
}
@@ -598,7 +604,7 @@ func BenchmarkDagmodWrite(b *testing.B) {
598604

599605
wrsize := 4096
600606

601-
dagmod, err := NewDagModifier(ctx, n, dserv, pins, &chunk.SizeSplitter{Size: 512})
607+
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
602608
if err != nil {
603609
b.Fatal(err)
604610
}

0 commit comments

Comments
 (0)
Please sign in to comment.