Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add locking interface to blockstore
Browse files Browse the repository at this point in the history
The addition of a locking interface to the blockstore allows us to
perform atomic operations on the underlying datastore without having to
worry about different operations happening in the background, such as
garbage collection.

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
whyrusleeping committed Jul 7, 2015
1 parent 16ea653 commit f008ce5
Showing 3 changed files with 50 additions and 29 deletions.
22 changes: 21 additions & 1 deletion blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ package blockstore

import (
"errors"
"sync"

ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsns "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
@@ -34,7 +35,14 @@ type Blockstore interface {
AllKeysChan(ctx context.Context) (<-chan key.Key, error)
}

func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
type GCBlockstore interface {
Blockstore

Lock() func()
RLock() func()
}

func NewBlockstore(d ds.ThreadSafeDatastore) *blockstore {
dd := dsns.Wrap(d, BlockPrefix)
return &blockstore{
datastore: dd,
@@ -45,6 +53,8 @@ type blockstore struct {
datastore ds.Datastore
// cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it.
// we do check it on `NewBlockstore` though.

lk sync.RWMutex
}

func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
@@ -151,3 +161,13 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {

return output, nil
}

func (bs *blockstore) Lock() func() {
bs.lk.Lock()
return bs.lk.Unlock
}

func (bs *blockstore) RLock() func() {
bs.lk.RLock()
return bs.lk.RUnlock
}
10 changes: 9 additions & 1 deletion blocks/blockstore/write_cache.go
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ import (
)

// WriteCached returns a blockstore that caches up to |size| unique writes (bs.Put).
func WriteCached(bs Blockstore, size int) (Blockstore, error) {
func WriteCached(bs Blockstore, size int) (*writecache, error) {
c, err := lru.New(size)
if err != nil {
return nil, err
@@ -48,3 +48,11 @@ func (w *writecache) Put(b *blocks.Block) error {
func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return w.blockstore.AllKeysChan(ctx)
}

func (w *writecache) Lock() func() {
return w.blockstore.(GCBlockstore).Lock()
}

func (w *writecache) RLock() func() {
return w.blockstore.(GCBlockstore).RLock()
}
47 changes: 20 additions & 27 deletions blocks/key/key_set.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,39 @@
package key

import (
"sync"
)

type KeySet interface {
Add(Key)
Has(Key) bool
Remove(Key)
Keys() []Key
}

type ks struct {
lock sync.RWMutex
data map[Key]struct{}
type keySet struct {
keys map[Key]struct{}
}

func NewKeySet() KeySet {
return &ks{
data: make(map[Key]struct{}),
}
return &keySet{make(map[Key]struct{})}
}

func (wl *ks) Add(k Key) {
wl.lock.Lock()
defer wl.lock.Unlock()

wl.data[k] = struct{}{}
func (gcs *keySet) Add(k Key) {
gcs.keys[k] = struct{}{}
}

func (wl *ks) Remove(k Key) {
wl.lock.Lock()
defer wl.lock.Unlock()

delete(wl.data, k)
func (gcs *keySet) Has(k Key) bool {
_, has := gcs.keys[k]
return has
}

func (wl *ks) Keys() []Key {
wl.lock.RLock()
defer wl.lock.RUnlock()
keys := make([]Key, 0)
for k := range wl.data {
keys = append(keys, k)
func (ks *keySet) Keys() []Key {
var out []Key
for k, _ := range ks.keys {
out = append(out, k)
}
return keys
return out
}

func (ks *keySet) Remove(k Key) {
delete(ks.keys, k)
}

// TODO: implement disk-backed keyset for working with massive DAGs

0 comments on commit f008ce5

Please sign in to comment.