Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ipfs/kubo
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 5b2d2ebdd69c
Choose a base ref
...
head repository: ipfs/kubo
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 7e6914693384
Choose a head ref
  • 2 commits
  • 12 files changed
  • 2 contributors

Commits on Oct 2, 2015

  1. fix publish fail on prexisting bad record

    dont error out if prexisting record is bad, just grab its sequence number
    and continue on with the publish.
    
    License: MIT
    Signed-off-by: Jeromy <jeromyj@gmail.com>
    whyrusleeping committed Oct 2, 2015

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    b34d41e View commit details
  2. Merge pull request #1775 from ipfs/fix/ipns-old-record

    fix publish fail on prexisting bad record
    jbenet committed Oct 2, 2015
    Copy the full SHA
    7e69146 View commit details
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
@@ -227,7 +227,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer)

// setup name system
n.Namesys = namesys.NewNameSystem(n.Routing)
n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore())

// setup ipns republishing
err = n.setupIpnsRepublisher()
@@ -456,7 +456,7 @@ func (n *IpfsNode) SetupOfflineRouting() error {

n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.PrivateKey)

n.Namesys = namesys.NewNameSystem(n.Routing)
n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore())

return nil
}
2 changes: 1 addition & 1 deletion fuse/ipns/common.go
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ func InitializeKeyspace(n *core.IpfsNode, key ci.PrivKey) error {
return err
}

pub := nsys.NewRoutingPublisher(n.Routing)
pub := nsys.NewRoutingPublisher(n.Routing, n.Repo.Datastore())
if err := pub.Publish(ctx, key, path.FromKey(nodek)); err != nil {
return err
}
5 changes: 3 additions & 2 deletions namesys/namesys.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"strings"
"time"

ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
ci "github.com/ipfs/go-ipfs/p2p/crypto"
path "github.com/ipfs/go-ipfs/path"
@@ -25,15 +26,15 @@ type mpns struct {
}

// NewNameSystem will construct the IPFS naming system based on Routing
func NewNameSystem(r routing.IpfsRouting) NameSystem {
func NewNameSystem(r routing.IpfsRouting, ds ds.Datastore) NameSystem {
return &mpns{
resolvers: map[string]resolver{
"dns": newDNSResolver(),
"proquint": new(ProquintResolver),
"dht": newRoutingResolver(r),
},
publishers: map[string]Publisher{
"/ipns/": NewRoutingPublisher(r),
"/ipns/": NewRoutingPublisher(r, ds),
},
}
}
65 changes: 53 additions & 12 deletions namesys/publisher.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
routing "github.com/ipfs/go-ipfs/routing"
dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb"
record "github.com/ipfs/go-ipfs/routing/record"
ft "github.com/ipfs/go-ipfs/unixfs"
u "github.com/ipfs/go-ipfs/util"
@@ -37,11 +38,15 @@ var PublishPutValTimeout = time.Minute
// routing system.
type ipnsPublisher struct {
routing routing.IpfsRouting
ds ds.Datastore
}

// NewRoutingPublisher constructs a publisher for the IPFS Routing name system.
func NewRoutingPublisher(route routing.IpfsRouting) *ipnsPublisher {
return &ipnsPublisher{routing: route}
func NewRoutingPublisher(route routing.IpfsRouting, ds ds.Datastore) *ipnsPublisher {
if ds == nil {
panic("nil datastore")
}
return &ipnsPublisher{routing: route, ds: ds}
}

// Publish implements Publisher. Accepts a keypair and a value,
@@ -62,22 +67,58 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value

_, ipnskey := IpnsKeysForID(id)

// get previous records sequence number, and add one to it
var seqnum uint64
prevrec, err := p.routing.GetValues(ctx, ipnskey, 0)
// get previous records sequence number
seqnum, err := p.getPreviousSeqNo(ctx, ipnskey)
if err != nil {
return err
}

// increment it
seqnum++

return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id)
}

func (p *ipnsPublisher) getPreviousSeqNo(ctx context.Context, ipnskey key.Key) (uint64, error) {
prevrec, err := p.ds.Get(ipnskey.DsKey())
if err != nil && err != ds.ErrNotFound {
// None found, lets start at zero!
return 0, err
}
var val []byte
if err == nil {
e := new(pb.IpnsEntry)
err := proto.Unmarshal(prevrec[0].Val, e)
prbytes, ok := prevrec.([]byte)
if !ok {
return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec)
}
dhtrec := new(dhtpb.Record)
err := proto.Unmarshal(prbytes, dhtrec)
if err != nil {
return err
return 0, err
}

seqnum = e.GetSequence() + 1
} else if err != ds.ErrNotFound {
return err
val = dhtrec.GetValue()
} else {
// try and check the dht for a record
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()

rv, err := p.routing.GetValue(ctx, ipnskey)
if err != nil {
// no such record found, start at zero!
return 0, nil
}

val = rv
}

return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id)
e := new(pb.IpnsEntry)
err = proto.Unmarshal(val, e)
if err != nil {
return 0, err
}

return e.GetSequence(), nil
}

func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.IpfsRouting, id peer.ID) error {
2 changes: 1 addition & 1 deletion namesys/republisher/repub_test.go
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ func TestRepublish(t *testing.T) {
// have one node publish a record that is valid for 1 second
publisher := nodes[3]
p := path.FromString("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn") // does not need to be valid
rp := namesys.NewRoutingPublisher(publisher.Routing)
rp := namesys.NewRoutingPublisher(publisher.Routing, publisher.Repo.Datastore())
err := rp.PublishWithEOL(ctx, publisher.PrivateKey, p, time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
94 changes: 93 additions & 1 deletion namesys/resolve_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package namesys

import (
"errors"
"testing"
"time"

ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer"
path "github.com/ipfs/go-ipfs/path"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
u "github.com/ipfs/go-ipfs/util"
@@ -13,9 +17,10 @@ import (

func TestRoutingResolve(t *testing.T) {
d := mockrouting.NewServer().Client(testutil.RandIdentityOrFatal(t))
dstore := ds.NewMapDatastore()

resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d)
publisher := NewRoutingPublisher(d, dstore)

privk, pubk, err := testutil.RandTestKeyPair(512)
if err != nil {
@@ -43,3 +48,90 @@ func TestRoutingResolve(t *testing.T) {
t.Fatal("Got back incorrect value.")
}
}

func TestPrexistingExpiredRecord(t *testing.T) {
dstore := ds.NewMapDatastore()
d := mockrouting.NewServer().ClientWithDatastore(context.Background(), testutil.RandIdentityOrFatal(t), dstore)

resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d, dstore)

privk, pubk, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(err)
}

id, err := peer.IDFromPublicKey(pubk)
if err != nil {
t.Fatal(err)
}

// Make an expired record and put it in the datastore
h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN")
eol := time.Now().Add(time.Hour * -1)
err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id)
if err != nil {
t.Fatal(err)
}

// Now, with an old record in the system already, try and publish a new one
err = publisher.Publish(context.Background(), privk, h)
if err != nil {
t.Fatal(err)
}

err = verifyCanResolve(resolver, id.Pretty(), h)
if err != nil {
t.Fatal(err)
}
}

func TestPrexistingRecord(t *testing.T) {
dstore := ds.NewMapDatastore()
d := mockrouting.NewServer().ClientWithDatastore(context.Background(), testutil.RandIdentityOrFatal(t), dstore)

resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d, dstore)

privk, pubk, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(err)
}

id, err := peer.IDFromPublicKey(pubk)
if err != nil {
t.Fatal(err)
}

// Make a good record and put it in the datastore
h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN")
eol := time.Now().Add(time.Hour)
err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id)
if err != nil {
t.Fatal(err)
}

// Now, with an old record in the system already, try and publish a new one
err = publisher.Publish(context.Background(), privk, h)
if err != nil {
t.Fatal(err)
}

err = verifyCanResolve(resolver, id.Pretty(), h)
if err != nil {
t.Fatal(err)
}
}

func verifyCanResolve(r Resolver, name string, exp path.Path) error {
res, err := r.Resolve(context.Background(), name)
if err != nil {
return err
}

if res != exp {
return errors.New("got back wrong record!")
}

return nil
}
104 changes: 75 additions & 29 deletions routing/dht/handlers.go
Original file line number Diff line number Diff line change
@@ -3,13 +3,15 @@ package dht
import (
"errors"
"fmt"
"time"

proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer"
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
u "github.com/ipfs/go-ipfs/util"
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
)

@@ -46,41 +48,17 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

// first, is there even a key?
k := pmes.GetKey()
k := key.Key(pmes.GetKey())
if k == "" {
return nil, errors.New("handleGetValue but no key was provided")
// TODO: send back an error response? could be bad, but the other node's hanging.
}

// let's first check if we have the value locally.
log.Debugf("%s handleGetValue looking into ds", dht.self)
dskey := key.Key(k).DsKey()
iVal, err := dht.datastore.Get(dskey)
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)

// if we got an unexpected error, bail.
if err != nil && err != ds.ErrNotFound {
rec, err := dht.checkLocalDatastore(k)
if err != nil {
return nil, err
}

// if we have the value, send it back
if err == nil {
log.Debugf("%s handleGetValue success!", dht.self)

byts, ok := iVal.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}

rec := new(pb.Record)
err := proto.Unmarshal(byts, rec)
if err != nil {
log.Debug("Failed to unmarshal dht record from datastore")
return nil, err
}

resp.Record = rec
}
resp.Record = rec

// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
@@ -102,6 +80,69 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return resp, nil
}

func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) {
log.Debugf("%s handleGetValue looking into ds", dht.self)
dskey := k.DsKey()
iVal, err := dht.datastore.Get(dskey)
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)

if err == ds.ErrNotFound {
return nil, nil
}

// if we got an unexpected error, bail.
if err != nil {
return nil, err
}

// if we have the value, send it back
log.Debugf("%s handleGetValue success!", dht.self)

byts, ok := iVal.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}

rec := new(pb.Record)
err = proto.Unmarshal(byts, rec)
if err != nil {
log.Debug("Failed to unmarshal dht record from datastore")
return nil, err
}

// if its our record, dont bother checking the times on it
if peer.ID(rec.GetAuthor()) == dht.self {
return rec, nil
}

var recordIsBad bool
recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
if err != nil {
log.Info("either no receive time set on record, or it was invalid: ", err)
recordIsBad = true
}

if time.Now().Sub(recvtime) > MaxRecordAge {
log.Debug("old record found, tossing.")
recordIsBad = true
}

// NOTE: we do not verify the record here beyond checking these timestamps.
// we put the burden of checking the records on the requester as checking a record
// may be computationally expensive

if recordIsBad {
err := dht.datastore.Delete(dskey)
if err != nil {
log.Error("Failed to delete bad record from datastore: ", err)
}

return nil, nil // can treat this as not having the record at all
}

return rec, nil
}

// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handlePutValue", p).Done()
@@ -112,7 +153,12 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return nil, err
}

data, err := proto.Marshal(pmes.GetRecord())
rec := pmes.GetRecord()

// record the time we receive every record
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))

data, err := proto.Marshal(rec)
if err != nil {
return nil, err
}
15 changes: 12 additions & 3 deletions routing/dht/pb/dht.pb.go
3 changes: 3 additions & 0 deletions routing/dht/pb/dht.proto
Original file line number Diff line number Diff line change
@@ -75,4 +75,7 @@ message Record {

// A PKI signature for the key+value+author
optional bytes signature = 4;

// Time the record was received, set by receiver
optional string timeReceived = 5;
}
9 changes: 9 additions & 0 deletions routing/dht/records.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package dht

import (
"fmt"
"time"

ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
@@ -12,6 +13,14 @@ import (
record "github.com/ipfs/go-ipfs/routing/record"
)

// MaxRecordAge specifies the maximum time that any node will hold onto a record
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
const MaxRecordAge = time.Hour * 36

func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
log.Debugf("getPublicKey for: %s", p)

31 changes: 22 additions & 9 deletions routing/mock/centralized_client.go
Original file line number Diff line number Diff line change
@@ -4,12 +4,15 @@ import (
"errors"
"time"

proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer"
routing "github.com/ipfs/go-ipfs/routing"
dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb"
u "github.com/ipfs/go-ipfs/util"
"github.com/ipfs/go-ipfs/util/testutil"
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
)
@@ -25,7 +28,16 @@ type client struct {
// FIXME(brian): is this method meant to simulate putting a value into the network?
func (c *client) PutValue(ctx context.Context, key key.Key, val []byte) error {
log.Debugf("PutValue: %s", key)
return c.datastore.Put(key.DsKey(), val)
rec := new(dhtpb.Record)
rec.Value = val
rec.Key = proto.String(string(key))
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
data, err := proto.Marshal(rec)
if err != nil {
return err
}

return c.datastore.Put(key.DsKey(), data)
}

// FIXME(brian): is this method meant to simulate getting a value from the network?
@@ -41,21 +53,22 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
return nil, errors.New("could not cast value from datastore")
}

return data, nil
rec := new(dhtpb.Record)
err = proto.Unmarshal(data, rec)
if err != nil {
return nil, err
}

return rec.GetValue(), nil
}

func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) {
log.Debugf("GetValue: %s", key)
v, err := c.datastore.Get(key.DsKey())
log.Debugf("GetValues: %s", key)
data, err := c.GetValue(ctx, key)
if err != nil {
return nil, err
}

data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}

return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil
}

2 changes: 1 addition & 1 deletion routing/mock/centralized_server.go
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ func (rs *s) Client(p testutil.Identity) Client {
func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Identity, datastore ds.Datastore) Client {
return &client{
peer: p,
datastore: ds.NewMapDatastore(),
datastore: datastore,
server: rs,
}
}