Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 61a9e34

Browse files
committedAug 3, 2015
WIP: implement mfs API
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent ec51450 commit 61a9e34

File tree

7 files changed

+845
-125
lines changed

7 files changed

+845
-125
lines changed
 

‎core/commands/mfs.go

+458
Large diffs are not rendered by default.

‎core/commands/root.go

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ var rootSubcommands = map[string]*cmds.Command{
9393
"id": IDCmd,
9494
"log": LogCmd,
9595
"ls": LsCmd,
96+
"mfs": MfsCmd,
9697
"mount": MountCmd,
9798
"name": NameCmd,
9899
"object": ObjectCmd,

‎core/core.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ type IpfsNode struct {
106106
Ping *ping.PingService
107107
Reprovider *rp.Reprovider // the value reprovider system
108108

109-
IpnsFs *ipnsfs.Filesystem
109+
Mfs *ipnsfs.Filesystem
110110

111111
proc goprocess.Process
112112
ctx context.Context
@@ -164,11 +164,11 @@ func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
164164

165165
// Setup the mutable ipns filesystem structure
166166
if node.OnlineMode() {
167-
fs, err := ipnsfs.NewFilesystem(ctx, node.DAG, node.Namesys, node.Pinning, node.PrivateKey)
167+
fs, err := ipnsfs.NewFilesystem(ctx, node.DAG, node.Pinning)
168168
if err != nil && err != kb.ErrLookupFailure {
169169
return nil, err
170170
}
171-
node.IpnsFs = fs
171+
node.Mfs = fs
172172
}
173173

174174
success = true
@@ -391,8 +391,8 @@ func (n *IpfsNode) teardown() error {
391391

392392
// Filesystem needs to be closed before network, dht, and blockservice
393393
// so it can use them as its shutting down
394-
if n.IpnsFs != nil {
395-
closers = append(closers, n.IpnsFs)
394+
if n.Mfs != nil {
395+
closers = append(closers, n.Mfs)
396396
}
397397

398398
if n.Blocks != nil {

‎fuse/ipns/ipns_unix.go

+10-18
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func CreateRoot(ipfs *core.IpfsNode, keys []ci.PrivKey, ipfspath, ipnspath strin
7777
return nil, err
7878
}
7979
name := key.Key(pkh).B58String()
80-
root, err := ipfs.IpnsFs.GetRoot(name)
80+
root, err := ipfs.Mfs.GetRoot(name)
8181
if err != nil {
8282
return nil, err
8383
}
@@ -95,7 +95,7 @@ func CreateRoot(ipfs *core.IpfsNode, keys []ci.PrivKey, ipfspath, ipnspath strin
9595
}
9696

9797
return &Root{
98-
fs: ipfs.IpnsFs,
98+
fs: ipfs.Mfs,
9999
Ipfs: ipfs,
100100
IpfsRoot: ipfspath,
101101
IpnsRoot: ipnspath,
@@ -159,13 +159,7 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) {
159159
}
160160

161161
func (r *Root) Close() error {
162-
for _, kr := range r.Roots {
163-
err := kr.Publish(r.Ipfs.Context())
164-
if err != nil {
165-
return err
166-
}
167-
}
168-
return nil
162+
return r.fs.Close()
169163
}
170164

171165
// Forget is called when the filesystem is unmounted. probably.
@@ -267,16 +261,14 @@ func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) {
267261
// ReadDirAll reads the link structure as directory entries
268262
func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
269263
var entries []fuse.Dirent
270-
for _, name := range dir.dir.List() {
271-
dirent := fuse.Dirent{Name: name}
272-
273-
// TODO: make dir.dir.List() return dirinfos
274-
child, err := dir.dir.Child(name)
275-
if err != nil {
276-
return nil, err
277-
}
264+
listing, err := dir.dir.List()
265+
if err != nil {
266+
return nil, err
267+
}
268+
for _, entry := range listing {
269+
dirent := fuse.Dirent{Name: entry.Name}
278270

279-
switch child.Type() {
271+
switch nsfs.NodeType(entry.Type) {
280272
case nsfs.TDir:
281273
dirent.Type = fuse.DT_Dir
282274
case nsfs.TFile:

‎ipnsfs/dir.go

+62-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"os"
7+
"strings"
78
"sync"
89
"time"
910

@@ -138,7 +139,7 @@ func (d *Directory) childDir(name string) (*Directory, error) {
138139
func (d *Directory) childFromDag(name string) (*dag.Node, error) {
139140
for _, lnk := range d.node.Links {
140141
if lnk.Name == name {
141-
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
142+
ctx, cancel := context.WithTimeout(d.fs.ctx, time.Minute)
142143
defer cancel()
143144

144145
return lnk.GetNode(ctx, d.fs.dserv)
@@ -170,15 +171,39 @@ func (d *Directory) childUnsync(name string) (FSNode, error) {
170171
return nil, os.ErrNotExist
171172
}
172173

173-
func (d *Directory) List() []string {
174+
type NodeListing struct {
175+
Name string
176+
Type int
177+
Size int64
178+
}
179+
180+
func (d *Directory) List() ([]NodeListing, error) {
174181
d.lock.Lock()
175182
defer d.lock.Unlock()
176183

177-
var out []string
178-
for _, lnk := range d.node.Links {
179-
out = append(out, lnk.Name)
184+
var out []NodeListing
185+
for _, l := range d.node.Links {
186+
child := NodeListing{}
187+
child.Name = l.Name
188+
189+
c, err := d.childUnsync(l.Name)
190+
if err != nil {
191+
return nil, err
192+
}
193+
194+
child.Type = int(c.Type())
195+
if c, ok := c.(*File); ok {
196+
size, err := c.Size()
197+
if err != nil {
198+
return nil, err
199+
}
200+
child.Size = size
201+
}
202+
203+
out = append(out, child)
180204
}
181-
return out
205+
206+
return out, nil
182207
}
183208

184209
func (d *Directory) Mkdir(name string) (*Directory, error) {
@@ -195,6 +220,12 @@ func (d *Directory) Mkdir(name string) (*Directory, error) {
195220
}
196221

197222
ndir := &dag.Node{Data: ft.FolderPBData()}
223+
224+
_, err = d.fs.dserv.Add(ndir)
225+
if err != nil {
226+
return nil, err
227+
}
228+
198229
err = d.node.AddNodeLinkClean(name, ndir)
199230
if err != nil {
200231
return nil, err
@@ -268,3 +299,28 @@ func (d *Directory) Lock() {
268299
func (d *Directory) Unlock() {
269300
d.lock.Unlock()
270301
}
302+
303+
func DirLookup(d *Directory, path string) (FSNode, error) {
304+
path = strings.Trim(path, "/")
305+
parts := strings.Split(path, "/")
306+
if len(parts) == 1 && parts[0] == "" {
307+
return d, nil
308+
}
309+
310+
var cur FSNode
311+
cur = d
312+
for i, p := range parts {
313+
chdir, ok := cur.(*Directory)
314+
if !ok {
315+
return nil, fmt.Errorf("cannot access %s: Not a directory", strings.Join(parts[:i+1], "/"))
316+
}
317+
318+
child, err := chdir.Child(p)
319+
if err != nil {
320+
return nil, err
321+
}
322+
323+
cur = child
324+
}
325+
return cur, nil
326+
}

‎ipnsfs/ipnsfs_test.go

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package ipnsfs
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"io/ioutil"
8+
"os"
9+
"strings"
10+
"testing"
11+
12+
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
13+
dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
14+
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
15+
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
16+
key "github.com/ipfs/go-ipfs/blocks/key"
17+
bserv "github.com/ipfs/go-ipfs/blockservice"
18+
offline "github.com/ipfs/go-ipfs/exchange/offline"
19+
importer "github.com/ipfs/go-ipfs/importer"
20+
chunk "github.com/ipfs/go-ipfs/importer/chunk"
21+
dag "github.com/ipfs/go-ipfs/merkledag"
22+
"github.com/ipfs/go-ipfs/pin"
23+
ft "github.com/ipfs/go-ipfs/unixfs"
24+
uio "github.com/ipfs/go-ipfs/unixfs/io"
25+
u "github.com/ipfs/go-ipfs/util"
26+
)
27+
28+
type dagservAndPinner struct {
29+
ds dag.DAGService
30+
mp pin.ManualPinner
31+
}
32+
33+
func getDagservAndPinner(t *testing.T) dagservAndPinner {
34+
db := dssync.MutexWrap(ds.NewMapDatastore())
35+
bs := bstore.NewBlockstore(db)
36+
blockserv, err := bserv.New(bs, offline.Exchange(bs))
37+
if err != nil {
38+
t.Fatal(err)
39+
}
40+
dserv := dag.NewDAGService(blockserv)
41+
mpin := pin.NewPinner(db, dserv).GetManual()
42+
return dagservAndPinner{
43+
ds: dserv,
44+
mp: mpin,
45+
}
46+
}
47+
48+
func getRandFile(t *testing.T, ds dag.DAGService, size int64) *dag.Node {
49+
r := io.LimitReader(u.NewTimeSeededRand(), size)
50+
nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
51+
if err != nil {
52+
t.Fatal(err)
53+
}
54+
return nd
55+
}
56+
57+
func mkdirP(t *testing.T, root *Directory, path string) *Directory {
58+
dirs := strings.Split(path, "/")
59+
cur := root
60+
for _, d := range dirs {
61+
n, err := cur.Mkdir(d)
62+
if err != nil && err != os.ErrExist {
63+
t.Fatal(err)
64+
}
65+
if err == os.ErrExist {
66+
fsn, err := cur.Child(d)
67+
if err != nil {
68+
t.Fatal(err)
69+
}
70+
switch fsn := fsn.(type) {
71+
case *Directory:
72+
n = fsn
73+
case *File:
74+
t.Fatal("tried to make a directory where a file already exists")
75+
}
76+
}
77+
78+
cur = n
79+
}
80+
return cur
81+
}
82+
83+
func assertFileAtPath(ds dag.DAGService, root *Directory, exp *dag.Node, path string) error {
84+
parts := strings.Split(path, "/")
85+
cur := root
86+
for i, d := range parts[:len(parts)-1] {
87+
next, err := cur.Child(d)
88+
if err != nil {
89+
return fmt.Errorf("looking for %s failed: %s", path, err)
90+
}
91+
92+
nextDir, ok := next.(*Directory)
93+
if !ok {
94+
return fmt.Errorf("%s points to a non-directory", parts[:i+1])
95+
}
96+
97+
cur = nextDir
98+
}
99+
100+
last := parts[len(parts)-1]
101+
finaln, err := cur.Child(last)
102+
if err != nil {
103+
return err
104+
}
105+
106+
file, ok := finaln.(*File)
107+
if !ok {
108+
return fmt.Errorf("%s was not a file!", path)
109+
}
110+
111+
out, err := ioutil.ReadAll(file)
112+
if err != nil {
113+
return err
114+
}
115+
116+
expbytes, err := catNode(ds, exp)
117+
if err != nil {
118+
return err
119+
}
120+
121+
if !bytes.Equal(out, expbytes) {
122+
return fmt.Errorf("Incorrect data at path!")
123+
}
124+
return nil
125+
}
126+
127+
func catNode(ds dag.DAGService, nd *dag.Node) ([]byte, error) {
128+
r, err := uio.NewDagReader(context.TODO(), nd, ds)
129+
if err != nil {
130+
return nil, err
131+
}
132+
defer r.Close()
133+
134+
return ioutil.ReadAll(r)
135+
}
136+
137+
func TestBasic(t *testing.T) {
138+
dsp := getDagservAndPinner(t)
139+
140+
ctx := context.TODO()
141+
142+
fs, err := NewFilesystem(ctx, dsp.ds, dsp.mp)
143+
if err != nil {
144+
t.Fatal(err)
145+
}
146+
147+
root := &dag.Node{Data: ft.FolderPBData()}
148+
rt, err := fs.NewRoot("test", root, func(k key.Key) error {
149+
fmt.Println("PUBLISHED: ", k)
150+
return nil
151+
})
152+
153+
if err != nil {
154+
t.Fatal(err)
155+
}
156+
157+
rootdir := rt.GetValue().(*Directory)
158+
159+
// test making a basic dir
160+
_, err = rootdir.Mkdir("a")
161+
if err != nil {
162+
t.Fatal(err)
163+
}
164+
165+
d := mkdirP(t, rootdir, "a/b/c/d/e/f/g")
166+
167+
fi := getRandFile(t, dsp.ds, 1000)
168+
169+
err = d.AddChild("afile", fi)
170+
if err != nil {
171+
t.Fatal(err)
172+
}
173+
174+
err = assertFileAtPath(dsp.ds, rootdir, fi, "a/b/c/d/e/f/g/afile")
175+
if err != nil {
176+
t.Fatal(err)
177+
}
178+
}

‎ipnsfs/system.go

+131-96
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,11 @@ package ipnsfs
1212

1313
import (
1414
"errors"
15-
"os"
1615
"sync"
1716
"time"
1817

1918
key "github.com/ipfs/go-ipfs/blocks/key"
2019
dag "github.com/ipfs/go-ipfs/merkledag"
21-
namesys "github.com/ipfs/go-ipfs/namesys"
22-
ci "github.com/ipfs/go-ipfs/p2p/crypto"
2320
path "github.com/ipfs/go-ipfs/path"
2421
pin "github.com/ipfs/go-ipfs/pin"
2522
ft "github.com/ipfs/go-ipfs/unixfs"
@@ -28,6 +25,8 @@ import (
2825
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
2926
)
3027

28+
var ErrNotExist = errors.New("no such rootfs")
29+
3130
var log = eventlog.Logger("ipnsfs")
3231

3332
var ErrIsDirectory = errors.New("error: is a directory")
@@ -38,40 +37,44 @@ type Filesystem struct {
3837

3938
dserv dag.DAGService
4039

41-
nsys namesys.NameSystem
42-
4340
resolver *path.Resolver
4441

4542
pins pin.Pinner
4643

4744
roots map[string]*KeyRoot
45+
46+
lk sync.Mutex
4847
}
4948

5049
// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys
51-
func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) {
50+
func NewFilesystem(ctx context.Context, ds dag.DAGService, pins pin.Pinner) (*Filesystem, error) {
5251
roots := make(map[string]*KeyRoot)
5352
fs := &Filesystem{
5453
ctx: ctx,
5554
roots: roots,
56-
nsys: nsys,
5755
dserv: ds,
5856
pins: pins,
5957
resolver: &path.Resolver{DAG: ds},
6058
}
61-
for _, k := range keys {
62-
pkh, err := k.GetPublic().Hash()
63-
if err != nil {
64-
return nil, err
65-
}
6659

67-
root, err := fs.newKeyRoot(ctx, k)
68-
if err != nil {
69-
return nil, err
70-
}
71-
roots[key.Key(pkh).Pretty()] = root
60+
return fs, nil
61+
}
62+
63+
func (fs *Filesystem) NewRoot(name string, root *dag.Node, pf PubFunc) (*KeyRoot, error) {
64+
fs.lk.Lock()
65+
defer fs.lk.Unlock()
66+
_, ok := fs.roots[name]
67+
if ok {
68+
return nil, errors.New("already exists")
7269
}
7370

74-
return fs, nil
71+
kr, err := fs.newKeyRoot(fs.ctx, root, pf)
72+
if err != nil {
73+
return nil, err
74+
}
75+
76+
fs.roots[name] = kr
77+
return kr, nil
7578
}
7679

7780
func (fs *Filesystem) Close() error {
@@ -80,11 +83,7 @@ func (fs *Filesystem) Close() error {
8083
wg.Add(1)
8184
go func(r *KeyRoot) {
8285
defer wg.Done()
83-
err := r.Publish(fs.ctx)
84-
if err != nil {
85-
log.Info(err)
86-
return
87-
}
86+
r.repub.pubNow()
8887
}(r)
8988
}
9089
wg.Wait()
@@ -93,11 +92,44 @@ func (fs *Filesystem) Close() error {
9392

9493
// GetRoot returns the KeyRoot of the given name
9594
func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) {
95+
fs.lk.Lock()
96+
defer fs.lk.Unlock()
9697
r, ok := fs.roots[name]
9798
if ok {
9899
return r, nil
99100
}
100-
return nil, os.ErrNotExist
101+
return nil, ErrNotExist
102+
}
103+
104+
type RootListing struct {
105+
Name string
106+
Hash key.Key
107+
}
108+
109+
func (fs *Filesystem) ListRoots() []RootListing {
110+
fs.lk.Lock()
111+
defer fs.lk.Unlock()
112+
var out []RootListing
113+
for name, r := range fs.roots {
114+
k := r.repub.getVal()
115+
out = append(out, RootListing{
116+
Name: name,
117+
Hash: k,
118+
})
119+
}
120+
return out
121+
}
122+
123+
func (fs *Filesystem) CloseRoot(name string) (key.Key, error) {
124+
fs.lk.Lock()
125+
defer fs.lk.Unlock()
126+
r, ok := fs.roots[name]
127+
if !ok {
128+
return "", ErrNotExist
129+
}
130+
131+
delete(fs.roots, name)
132+
return r.repub.getVal(), r.Close()
101133
}
102134

103135
type childCloser interface {
@@ -121,7 +153,6 @@ type FSNode interface {
121153

122154
// KeyRoot represents the root of a filesystem tree pointed to by a given keypair
123155
type KeyRoot struct {
124-
key ci.PrivKey
125156
name string
126157

127158
// node is the merkledag node pointed to by this keypair
@@ -136,59 +167,39 @@ type KeyRoot struct {
136167
repub *Republisher
137168
}
138169

170+
type PubFunc func(key.Key) error
171+
139172
// newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine
140173
// for it
141-
func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) {
142-
hash, err := k.GetPublic().Hash()
174+
func (fs *Filesystem) newKeyRoot(parent context.Context, node *dag.Node, pf PubFunc) (*KeyRoot, error) {
175+
name := "NO NAME (WIP)"
176+
177+
ndk, err := node.Key()
143178
if err != nil {
144179
return nil, err
145180
}
146181

147-
name := "/ipns/" + key.Key(hash).String()
148-
149182
root := new(KeyRoot)
150-
root.key = k
151183
root.fs = fs
152184
root.name = name
153185

154-
ctx, cancel := context.WithCancel(parent)
155-
defer cancel()
186+
root.node = node
156187

157-
pointsTo, err := fs.nsys.Resolve(ctx, name)
158-
if err != nil {
159-
err = namesys.InitializeKeyspace(ctx, fs.dserv, fs.nsys, fs.pins, k)
160-
if err != nil {
161-
return nil, err
162-
}
188+
root.repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)
189+
root.repub.setVal(ndk)
190+
go root.repub.Run()
163191

164-
pointsTo, err = fs.nsys.Resolve(ctx, name)
165-
if err != nil {
166-
return nil, err
167-
}
168-
}
169-
170-
mnode, err := fs.resolver.ResolvePath(ctx, pointsTo)
171-
if err != nil {
172-
log.Errorf("Failed to retrieve value '%s' for ipns entry: %s\n", pointsTo, err)
173-
return nil, err
174-
}
175-
176-
root.node = mnode
177-
178-
root.repub = NewRepublisher(root, time.Millisecond*300, time.Second*3)
179-
go root.repub.Run(parent)
180-
181-
pbn, err := ft.FromBytes(mnode.Data)
192+
pbn, err := ft.FromBytes(node.Data)
182193
if err != nil {
183194
log.Error("IPNS pointer was not unixfs node")
184195
return nil, err
185196
}
186197

187198
switch pbn.GetType() {
188199
case ft.TDirectory:
189-
root.val = NewDirectory(pointsTo.String(), mnode, root, fs)
200+
root.val = NewDirectory(ndk.String(), node, root, fs)
190201
case ft.TFile, ft.TMetadata, ft.TRaw:
191-
fi, err := NewFile(pointsTo.String(), mnode, root, fs)
202+
fi, err := NewFile(ndk.String(), node, root, fs)
192203
if err != nil {
193204
return nil, err
194205
}
@@ -206,74 +217,82 @@ func (kr *KeyRoot) GetValue() FSNode {
206217
// closeChild implements the childCloser interface, and signals to the publisher that
207218
// there are changes ready to be published
208219
func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error {
209-
kr.repub.Touch()
210-
return nil
211-
}
212-
213-
// Publish publishes the ipns entry associated with this key
214-
func (kr *KeyRoot) Publish(ctx context.Context) error {
215-
child, ok := kr.val.(FSNode)
216-
if !ok {
217-
return errors.New("child of key root not valid type")
218-
}
219-
220-
nd, err := child.GetNode()
221-
if err != nil {
222-
return err
223-
}
224-
225-
// Holding this lock so our child doesnt change out from under us
226-
child.Lock()
227220
k, err := kr.fs.dserv.Add(nd)
228221
if err != nil {
229-
child.Unlock()
230222
return err
231223
}
232-
child.Unlock()
233-
// Dont want to hold the lock while we publish
234-
// otherwise we are holding the lock through a costly
235-
// network operation
236-
237-
kp := path.FromKey(k)
238224

239-
ev := &eventlog.Metadata{"name": kr.name, "key": kp}
240-
defer log.EventBegin(ctx, "ipnsfsPublishing", ev).Done()
241-
log.Info("ipnsfs publishing %s -> %s", kr.name, kp)
225+
kr.repub.Update(k)
226+
return nil
227+
}
242228

243-
return kr.fs.nsys.Publish(ctx, kr.key, kp)
229+
func (kr *KeyRoot) Close() error {
230+
return kr.repub.publish()
244231
}
245232

246233
// Republisher manages when to publish the ipns entry associated with a given key
247234
type Republisher struct {
248235
TimeoutLong time.Duration
249236
TimeoutShort time.Duration
250237
Publish chan struct{}
251-
root *KeyRoot
238+
pubfunc PubFunc
239+
pubnowch chan struct{}
240+
241+
ctx context.Context
242+
cancel func()
243+
244+
lk sync.Mutex
245+
val key.Key
246+
lastpub key.Key
247+
}
248+
249+
func (rp *Republisher) getVal() key.Key {
250+
rp.lk.Lock()
251+
defer rp.lk.Unlock()
252+
return rp.val
252253
}
253254

254255
// NewRepublisher creates a new Republisher object to republish the given keyroot
255256
// using the given short and long time intervals
256-
func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher {
257+
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
258+
ctx, cancel := context.WithCancel(ctx)
257259
return &Republisher{
258260
TimeoutShort: tshort,
259261
TimeoutLong: tlong,
260262
Publish: make(chan struct{}, 1),
261-
root: root,
263+
pubfunc: pf,
264+
pubnowch: make(chan struct{}),
265+
ctx: ctx,
266+
cancel: cancel,
267+
}
268+
}
269+
270+
func (p *Republisher) setVal(k key.Key) {
271+
p.lk.Lock()
272+
defer p.lk.Unlock()
273+
p.val = k
274+
}
275+
276+
func (p *Republisher) pubNow() {
277+
select {
278+
case p.pubnowch <- struct{}{}:
279+
default:
262280
}
263281
}
264282

265283
// Touch signals that an update has occurred since the last publish.
266284
// Multiple consecutive touches may extend the time period before
267285
// the next Publish occurs in order to more efficiently batch updates
268-
func (np *Republisher) Touch() {
286+
func (np *Republisher) Update(k key.Key) {
287+
np.setVal(k)
269288
select {
270289
case np.Publish <- struct{}{}:
271290
default:
272291
}
273292
}
274293

275294
// Run is the main republisher loop
276-
func (np *Republisher) Run(ctx context.Context) {
295+
func (np *Republisher) Run() {
277296
for {
278297
select {
279298
case <-np.Publish:
@@ -282,23 +301,39 @@ func (np *Republisher) Run(ctx context.Context) {
282301

283302
wait:
284303
select {
285-
case <-ctx.Done():
304+
case <-np.ctx.Done():
286305
return
287306
case <-np.Publish:
288307
quick = time.After(np.TimeoutShort)
289308
goto wait
290309
case <-quick:
291310
case <-longer:
311+
case <-np.pubnowch:
292312
}
293313

294-
log.Info("Publishing Changes!")
295-
err := np.root.Publish(ctx)
314+
err := np.publish()
296315
if err != nil {
297316
log.Error("republishRoot error: %s", err)
298317
}
299318

300-
case <-ctx.Done():
319+
case <-np.ctx.Done():
301320
return
302321
}
303322
}
304323
}
324+
325+
func (np *Republisher) publish() error {
326+
np.lk.Lock()
327+
topub := np.val
328+
np.lk.Unlock()
329+
330+
log.Info("Publishing Changes!")
331+
err := np.pubfunc(topub)
332+
if err != nil {
333+
return err
334+
}
335+
np.lk.Lock()
336+
np.lastpub = topub
337+
np.lk.Unlock()
338+
return nil
339+
}

0 commit comments

Comments
 (0)
Please sign in to comment.