Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d4df3ec

Browse files
committedJan 3, 2016
Merge pull request #2106 from ipfs/feat/mfs-locking-perf
Feat/mfs locking perf
2 parents 96698d3 + 8a8c6d1 commit d4df3ec

File tree

8 files changed

+393
-65
lines changed

8 files changed

+393
-65
lines changed
 

‎core/commands/files/files.go

+50-25
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ var FilesCmd = &cmds.Command{
2929
Files is an API for manipulating ipfs objects as if they were a unix filesystem.
3030
`,
3131
},
32+
Options: []cmds.Option{
33+
cmds.BoolOption("f", "flush", "flush target and ancestors after write (default: true)"),
34+
},
3235
Subcommands: map[string]*cmds.Command{
3336
"read": FilesReadCmd,
3437
"write": FilesWriteCmd,
@@ -460,7 +463,6 @@ Warning:
460463
cmds.BoolOption("e", "create", "create the file if it does not exist"),
461464
cmds.BoolOption("t", "truncate", "truncate the file before writing"),
462465
cmds.IntOption("n", "count", "maximum number of bytes to read"),
463-
cmds.BoolOption("f", "flush", "flush file and ancestors after write (default: true)"),
464466
},
465467
Run: func(req cmds.Request, res cmds.Response) {
466468
path, err := checkPath(req.Arguments()[0])
@@ -482,6 +484,16 @@ Warning:
482484
return
483485
}
484486

487+
offset, _, err := req.Option("offset").Int()
488+
if err != nil {
489+
res.SetError(err, cmds.ErrNormal)
490+
return
491+
}
492+
if offset < 0 {
493+
res.SetError(fmt.Errorf("cannot have negative write offset"), cmds.ErrNormal)
494+
return
495+
}
496+
485497
fi, err := getFileHandle(nd.FilesRoot, path, create)
486498
if err != nil {
487499
res.SetError(err, cmds.ErrNormal)
@@ -501,16 +513,6 @@ Warning:
501513
}
502514
}
503515

504-
offset, _, err := req.Option("offset").Int()
505-
if err != nil {
506-
res.SetError(err, cmds.ErrNormal)
507-
return
508-
}
509-
if offset < 0 {
510-
res.SetError(fmt.Errorf("cannot have negative write offset"), cmds.ErrNormal)
511-
return
512-
}
513-
514516
count, countfound, err := req.Option("count").Int()
515517
if err != nil {
516518
res.SetError(err, cmds.ErrNormal)
@@ -584,11 +586,17 @@ Examples:
584586
return
585587
}
586588

587-
err = mfs.Mkdir(n.FilesRoot, dirtomake, dashp)
589+
flush, found, _ := req.Option("flush").Bool()
590+
if !found {
591+
flush = true
592+
}
593+
594+
err = mfs.Mkdir(n.FilesRoot, dirtomake, dashp, flush)
588595
if err != nil {
589596
res.SetError(err, cmds.ErrNormal)
590597
return
591598
}
599+
592600
},
593601
}
594602

@@ -639,7 +647,7 @@ remove files or directories
639647
dir, name := gopath.Split(path)
640648
parent, err := mfs.Lookup(nd.FilesRoot, dir)
641649
if err != nil {
642-
res.SetError(err, cmds.ErrNormal)
650+
res.SetError(fmt.Errorf("parent lookup: %s", err), cmds.ErrNormal)
643651
return
644652
}
645653

@@ -649,32 +657,49 @@ remove files or directories
649657
return
650658
}
651659

652-
childi, err := pdir.Child(name)
653-
if err != nil {
654-
res.SetError(err, cmds.ErrNormal)
655-
return
656-
}
657-
658660
dashr, _, _ := req.Option("r").Bool()
659661

660-
switch childi.(type) {
661-
case *mfs.Directory:
662-
if dashr {
663-
err := pdir.Unlink(name)
662+
var success bool
663+
defer func() {
664+
if success {
665+
err := pdir.Flush()
664666
if err != nil {
665667
res.SetError(err, cmds.ErrNormal)
666668
return
667669
}
668-
} else {
669-
res.SetError(fmt.Errorf("%s is a directory, use -r to remove directories", path), cmds.ErrNormal)
670+
}
671+
}()
672+
673+
// if '-r' specified, don't check file type (in bad scenarios, the block may not exist)
674+
if dashr {
675+
err := pdir.Unlink(name)
676+
if err != nil {
677+
res.SetError(err, cmds.ErrNormal)
670678
return
671679
}
680+
681+
success = true
682+
return
683+
}
684+
685+
childi, err := pdir.Child(name)
686+
if err != nil {
687+
res.SetError(err, cmds.ErrNormal)
688+
return
689+
}
690+
691+
switch childi.(type) {
692+
case *mfs.Directory:
693+
res.SetError(fmt.Errorf("%s is a directory, use -r to remove directories", path), cmds.ErrNormal)
694+
return
672695
default:
673696
err := pdir.Unlink(name)
674697
if err != nil {
675698
res.SetError(err, cmds.ErrNormal)
676699
return
677700
}
701+
702+
success = true
678703
}
679704
},
680705
}

‎core/coreunix/add.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func (adder *Adder) addNode(node *dag.Node, path string) error {
330330

331331
dir := gopath.Dir(path)
332332
if dir != "." {
333-
if err := mfs.Mkdir(adder.mr, dir, true); err != nil {
333+
if err := mfs.Mkdir(adder.mr, dir, true, false); err != nil {
334334
return err
335335
}
336336
}
@@ -403,7 +403,7 @@ func (adder *Adder) addFile(file files.File) error {
403403
func (adder *Adder) addDir(dir files.File) error {
404404
log.Infof("adding directory: %s", dir.FileName())
405405

406-
err := mfs.Mkdir(adder.mr, dir.FileName(), true)
406+
err := mfs.Mkdir(adder.mr, dir.FileName(), true, false)
407407
if err != nil {
408408
return err
409409
}

‎mfs/dir.go

+53-19
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"os"
7+
"path"
78
"sync"
89
"time"
910

@@ -48,21 +49,36 @@ func NewDirectory(ctx context.Context, name string, node *dag.Node, parent child
4849
}
4950

5051
// closeChild updates the child by the given name to the dag node 'nd'
51-
// and changes its own dag node, then propogates the changes upward
52+
// and changes its own dag node
5253
func (d *Directory) closeChild(name string, nd *dag.Node) error {
53-
_, err := d.dserv.Add(nd)
54+
mynd, err := d.closeChildUpdate(name, nd)
5455
if err != nil {
5556
return err
5657
}
5758

59+
return d.parent.closeChild(d.name, mynd)
60+
}
61+
62+
// closeChildUpdate is the portion of closeChild that needs to be locked around
63+
func (d *Directory) closeChildUpdate(name string, nd *dag.Node) (*dag.Node, error) {
5864
d.lock.Lock()
5965
defer d.lock.Unlock()
60-
err = d.updateChild(name, nd)
66+
67+
err := d.updateChild(name, nd)
6168
if err != nil {
62-
return err
69+
return nil, err
6370
}
6471

65-
return d.parent.closeChild(d.name, d.node)
72+
return d.flushCurrentNode()
73+
}
74+
75+
func (d *Directory) flushCurrentNode() (*dag.Node, error) {
76+
_, err := d.dserv.Add(d.node)
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
return d.node.Copy(), nil
6682
}
6783

6884
func (d *Directory) updateChild(name string, nd *dag.Node) error {
@@ -242,9 +258,9 @@ func (d *Directory) Mkdir(name string) (*Directory, error) {
242258
d.lock.Lock()
243259
defer d.lock.Unlock()
244260

245-
_, err := d.childDir(name)
261+
child, err := d.childDir(name)
246262
if err == nil {
247-
return nil, os.ErrExist
263+
return child, os.ErrExist
248264
}
249265
_, err = d.childFile(name)
250266
if err == nil {
@@ -263,11 +279,6 @@ func (d *Directory) Mkdir(name string) (*Directory, error) {
263279
return nil, err
264280
}
265281

266-
err = d.parent.closeChild(d.name, d.node)
267-
if err != nil {
268-
return nil, err
269-
}
270-
271282
dirobj := NewDirectory(d.ctx, name, ndir, d, d.dserv)
272283
d.childDirs[name] = dirobj
273284
return dirobj, nil
@@ -285,13 +296,27 @@ func (d *Directory) Unlink(name string) error {
285296
return err
286297
}
287298

288-
return d.parent.closeChild(d.name, d.node)
299+
_, err = d.dserv.Add(d.node)
300+
if err != nil {
301+
return err
302+
}
303+
304+
return nil
305+
}
306+
307+
func (d *Directory) Flush() error {
308+
nd, err := d.flushCurrentNode()
309+
if err != nil {
310+
return err
311+
}
312+
313+
return d.parent.closeChild(d.name, nd)
289314
}
290315

291316
// AddChild adds the node 'nd' under this directory giving it the name 'name'
292317
func (d *Directory) AddChild(name string, nd *dag.Node) error {
293-
d.Lock()
294-
defer d.Unlock()
318+
d.lock.Lock()
319+
defer d.lock.Unlock()
295320

296321
_, err := d.childUnsync(name)
297322
if err == nil {
@@ -310,7 +335,6 @@ func (d *Directory) AddChild(name string, nd *dag.Node) error {
310335

311336
d.modTime = time.Now()
312337

313-
//return d.parent.closeChild(d.name, d.node)
314338
return nil
315339
}
316340

@@ -352,16 +376,26 @@ func (d *Directory) sync() error {
352376
return nil
353377
}
354378

379+
func (d *Directory) Path() string {
380+
cur := d
381+
var out string
382+
for cur != nil {
383+
out = path.Join(cur.name, out)
384+
cur = cur.parent.(*Directory)
385+
}
386+
return out
387+
}
388+
355389
func (d *Directory) GetNode() (*dag.Node, error) {
356-
d.Lock()
357-
defer d.Unlock()
390+
d.lock.Lock()
391+
defer d.lock.Unlock()
358392

359393
err := d.sync()
360394
if err != nil {
361395
return nil, err
362396
}
363397

364-
return d.node, nil
398+
return d.node.Copy(), nil
365399
}
366400

367401
func (d *Directory) Lock() {

‎mfs/file.go

+32-13
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ type File struct {
1616
name string
1717
hasChanges bool
1818

19-
mod *mod.DagModifier
20-
lock sync.Mutex
19+
dserv dag.DAGService
20+
mod *mod.DagModifier
21+
lock sync.Mutex
2122
}
2223

2324
// NewFile returns a NewFile object with the given parameters
@@ -28,6 +29,7 @@ func NewFile(name string, node *dag.Node, parent childCloser, dserv dag.DAGServi
2829
}
2930

3031
return &File{
32+
dserv: dserv,
3133
parent: parent,
3234
name: name,
3335
mod: dmod,
@@ -60,29 +62,46 @@ func (fi *File) CtxReadFull(ctx context.Context, b []byte) (int, error) {
6062
// and signals a republish to occur
6163
func (fi *File) Close() error {
6264
fi.Lock()
63-
defer fi.Unlock()
6465
if fi.hasChanges {
6566
err := fi.mod.Sync()
6667
if err != nil {
68+
fi.Unlock()
6769
return err
6870
}
6971

70-
nd, err := fi.mod.GetNode()
71-
if err != nil {
72-
return err
73-
}
72+
fi.hasChanges = false
73+
74+
// explicitly stay locked for flushUp call,
75+
// it will manage the lock for us
76+
return fi.flushUp()
77+
}
78+
fi.Unlock()
79+
80+
return nil
81+
}
7482

83+
// flushUp syncs the file and adds it to the dagservice
84+
// it *must* be called with the File's lock taken
85+
func (fi *File) flushUp() error {
86+
nd, err := fi.mod.GetNode()
87+
if err != nil {
7588
fi.Unlock()
76-
err = fi.parent.closeChild(fi.name, nd)
77-
fi.Lock()
78-
if err != nil {
79-
return err
80-
}
89+
return err
90+
}
8191

82-
fi.hasChanges = false
92+
_, err = fi.dserv.Add(nd)
93+
if err != nil {
94+
fi.Unlock()
95+
return err
8396
}
8497

98+
//name := fi.name
99+
//parent := fi.parent
100+
101+
// explicit unlock *only* before closeChild call
102+
fi.Unlock()
85103
return nil
104+
//return parent.closeChild(name, nd)
86105
}
87106

88107
// Sync flushes the changes in the file to disk

‎mfs/mfs_test.go

+201
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"fmt"
77
"io"
88
"io/ioutil"
9+
"math/rand"
910
"os"
1011
"sort"
1112
"testing"
1213

14+
randbo "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/dustin/randbo"
1315
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore"
1416
dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/sync"
1517
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
@@ -474,3 +476,202 @@ func TestMfsFile(t *testing.T) {
474476
t.Fatal(err)
475477
}
476478
}
479+
480+
func randomWalk(d *Directory, n int) (*Directory, error) {
481+
for i := 0; i < n; i++ {
482+
dirents, err := d.List()
483+
if err != nil {
484+
return nil, err
485+
}
486+
487+
var childdirs []NodeListing
488+
for _, child := range dirents {
489+
if child.Type == int(TDir) {
490+
childdirs = append(childdirs, child)
491+
}
492+
}
493+
if len(childdirs) == 0 {
494+
return d, nil
495+
}
496+
497+
next := childdirs[rand.Intn(len(childdirs))].Name
498+
499+
nextD, err := d.Child(next)
500+
if err != nil {
501+
return nil, err
502+
}
503+
504+
d = nextD.(*Directory)
505+
}
506+
return d, nil
507+
}
508+
509+
func randomName() string {
510+
set := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_"
511+
length := rand.Intn(10) + 2
512+
var out string
513+
for i := 0; i < length; i++ {
514+
j := rand.Intn(len(set))
515+
out += set[j : j+1]
516+
}
517+
return out
518+
}
519+
520+
func actorMakeFile(d *Directory) error {
521+
d, err := randomWalk(d, rand.Intn(7))
522+
if err != nil {
523+
return err
524+
}
525+
526+
name := randomName()
527+
f, err := NewFile(name, &dag.Node{Data: ft.FilePBData(nil, 0)}, d, d.dserv)
528+
if err != nil {
529+
return err
530+
}
531+
532+
r := io.LimitReader(randbo.New(), int64(77*rand.Intn(123)))
533+
_, err = io.Copy(f, r)
534+
if err != nil {
535+
return err
536+
}
537+
538+
err = f.Close()
539+
if err != nil {
540+
return err
541+
}
542+
543+
return nil
544+
}
545+
func actorMkdir(d *Directory) error {
546+
d, err := randomWalk(d, rand.Intn(7))
547+
if err != nil {
548+
return err
549+
}
550+
551+
_, err = d.Mkdir(randomName())
552+
if err != nil {
553+
return err
554+
}
555+
556+
return nil
557+
}
558+
559+
func actorRemoveFile(d *Directory) error {
560+
d, err := randomWalk(d, rand.Intn(7))
561+
if err != nil {
562+
return err
563+
}
564+
565+
ents, err := d.List()
566+
if err != nil {
567+
return err
568+
}
569+
570+
if len(ents) == 0 {
571+
return nil
572+
}
573+
574+
re := ents[rand.Intn(len(ents))]
575+
576+
return d.Unlink(re.Name)
577+
}
578+
579+
func actorReadFile(d *Directory) error {
580+
d, err := randomWalk(d, rand.Intn(6))
581+
if err != nil {
582+
return err
583+
}
584+
585+
ents, err := d.List()
586+
if err != nil {
587+
return err
588+
}
589+
590+
var files []string
591+
for _, e := range ents {
592+
if e.Type == int(TFile) {
593+
files = append(files, e.Name)
594+
}
595+
}
596+
597+
if len(files) == 0 {
598+
return nil
599+
}
600+
601+
fname := files[rand.Intn(len(files))]
602+
fsn, err := d.Child(fname)
603+
if err != nil {
604+
return err
605+
}
606+
607+
fi, ok := fsn.(*File)
608+
if !ok {
609+
return errors.New("file wasnt a file, race?")
610+
}
611+
612+
_, err = fi.Size()
613+
if err != nil {
614+
return err
615+
}
616+
617+
_, err = ioutil.ReadAll(fi)
618+
if err != nil {
619+
return err
620+
}
621+
622+
return fi.Close()
623+
}
624+
625+
func testActor(rt *Root, iterations int, errs chan error) {
626+
d := rt.GetValue().(*Directory)
627+
for i := 0; i < iterations; i++ {
628+
switch rand.Intn(5) {
629+
case 0:
630+
if err := actorMkdir(d); err != nil {
631+
errs <- err
632+
return
633+
}
634+
case 1, 2:
635+
if err := actorMakeFile(d); err != nil {
636+
errs <- err
637+
return
638+
}
639+
case 3:
640+
continue
641+
// randomly deleting things
642+
// doesnt really give us any sort of useful test results.
643+
// you will never have this in a real environment where
644+
// you expect anything productive to happen...
645+
if err := actorRemoveFile(d); err != nil {
646+
errs <- err
647+
return
648+
}
649+
case 4:
650+
if err := actorReadFile(d); err != nil {
651+
errs <- err
652+
return
653+
}
654+
}
655+
}
656+
errs <- nil
657+
}
658+
659+
func TestMfsStress(t *testing.T) {
660+
ctx, cancel := context.WithCancel(context.Background())
661+
defer cancel()
662+
_, rt := setupRoot(ctx, t)
663+
664+
numroutines := 10
665+
666+
errs := make(chan error)
667+
for i := 0; i < numroutines; i++ {
668+
go testActor(rt, 50, errs)
669+
}
670+
671+
for i := 0; i < numroutines; i++ {
672+
err := <-errs
673+
if err != nil {
674+
t.Fatal(err)
675+
}
676+
}
677+
}

‎mfs/ops.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ func PutNode(r *Root, path string, nd *dag.Node) error {
9999
}
100100

101101
// Mkdir creates a directory at 'path' under the directory 'd', creating
102-
// intermediary directories as needed if 'parents' is set to true
103-
func Mkdir(r *Root, pth string, parents bool) error {
102+
// intermediary directories as needed if 'mkparents' is set to true
103+
func Mkdir(r *Root, pth string, mkparents bool, flush bool) error {
104104
if pth == "" {
105105
return nil
106106
}
@@ -116,7 +116,7 @@ func Mkdir(r *Root, pth string, parents bool) error {
116116

117117
if len(parts) == 0 {
118118
// this will only happen on 'mkdir /'
119-
if parents {
119+
if mkparents {
120120
return nil
121121
}
122122
return fmt.Errorf("cannot create directory '/': Already exists")
@@ -125,7 +125,7 @@ func Mkdir(r *Root, pth string, parents bool) error {
125125
cur := r.GetValue().(*Directory)
126126
for i, d := range parts[:len(parts)-1] {
127127
fsn, err := cur.Child(d)
128-
if err == os.ErrNotExist && parents {
128+
if err == os.ErrNotExist && mkparents {
129129
mkd, err := cur.Mkdir(d)
130130
if err != nil {
131131
return err
@@ -142,9 +142,16 @@ func Mkdir(r *Root, pth string, parents bool) error {
142142
cur = next
143143
}
144144

145-
_, err := cur.Mkdir(parts[len(parts)-1])
145+
final, err := cur.Mkdir(parts[len(parts)-1])
146146
if err != nil {
147-
if !parents || err != os.ErrExist {
147+
if !mkparents || err != os.ErrExist || final == nil {
148+
return err
149+
}
150+
}
151+
152+
if flush {
153+
err := final.Flush()
154+
if err != nil {
148155
return err
149156
}
150157
}

‎mfs/system.go

+17
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,23 @@ func (kr *Root) GetValue() FSNode {
109109
return kr.val
110110
}
111111

112+
func (kr *Root) Flush() error {
113+
nd, err := kr.GetValue().GetNode()
114+
if err != nil {
115+
return err
116+
}
117+
118+
k, err := kr.dserv.Add(nd)
119+
if err != nil {
120+
return err
121+
}
122+
123+
if kr.repub != nil {
124+
kr.repub.Update(k)
125+
}
126+
return nil
127+
}
128+
112129
// closeChild implements the childCloser interface, and signals to the publisher that
113130
// there are changes ready to be published
114131
func (kr *Root) closeChild(name string, nd *dag.Node) error {

‎test/sharness/t0250-files-api.sh

+25
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,31 @@ test_files_api() {
352352
test_expect_success "cleanup looks good" '
353353
verify_dir_contents /
354354
'
355+
356+
# test flush flags
357+
test_expect_success "mkdir --flush works" '
358+
ipfs files mkdir --flush --parents /flushed/deep
359+
'
360+
361+
test_expect_success "mkdir --flush works a second time" '
362+
ipfs files mkdir --flush --parents /flushed/deep
363+
'
364+
365+
test_expect_success "dir looks right" '
366+
verify_dir_contents / flushed
367+
'
368+
369+
test_expect_success "child dir looks right" '
370+
verify_dir_contents /flushed deep
371+
'
372+
373+
test_expect_success "cleanup" '
374+
ipfs files rm -r /flushed
375+
'
376+
377+
test_expect_success "child dir looks right" '
378+
verify_dir_contents /
379+
'
355380
}
356381

357382
# test offline and online

0 commit comments

Comments
 (0)
Please sign in to comment.