Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 4105a6f

Browse files
committedOct 2, 2015
record time each record was received
Use received time to make sure that nodes dont store records that nobody else is putting effort into keeping around. License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
1 parent 989a5ed commit 4105a6f

File tree

5 files changed

+106
-31
lines changed

5 files changed

+106
-31
lines changed
 

‎namesys/publisher.go

+24-10
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value
6868
_, ipnskey := IpnsKeysForID(id)
6969

7070
// get previous records sequence number
71-
seqnum, err := getPreviousSeqNo(p.ds, ipnskey)
71+
seqnum, err := p.getPreviousSeqNo(ctx, ipnskey)
7272
if err != nil {
7373
return err
7474
}
@@ -79,8 +79,13 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value
7979
return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id)
8080
}
8181

82-
func getPreviousSeqNo(d ds.Datastore, ipnskey key.Key) (uint64, error) {
83-
prevrec, err := d.Get(ipnskey.DsKey())
82+
func (p *ipnsPublisher) getPreviousSeqNo(ctx context.Context, ipnskey key.Key) (uint64, error) {
83+
prevrec, err := p.ds.Get(ipnskey.DsKey())
84+
if err != nil && err != ds.ErrNotFound {
85+
// None found, lets start at zero!
86+
return 0, err
87+
}
88+
var val []byte
8489
if err == nil {
8590
prbytes, ok := prevrec.([]byte)
8691
if !ok {
@@ -92,19 +97,28 @@ func getPreviousSeqNo(d ds.Datastore, ipnskey key.Key) (uint64, error) {
9297
return 0, err
9398
}
9499

95-
e := new(pb.IpnsEntry)
96-
err = proto.Unmarshal(dhtrec.GetValue(), e)
100+
val = dhtrec.GetValue()
101+
} else {
102+
// try and check the dht for a record
103+
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
104+
defer cancel()
105+
106+
rv, err := p.routing.GetValue(ctx, ipnskey)
97107
if err != nil {
98-
return 0, err
108+
// no such record found, start at zero!
109+
return 0, nil
99110
}
100111

101-
return e.GetSequence(), nil
102-
} else if err != ds.ErrNotFound {
112+
val = rv
113+
}
114+
115+
e := new(pb.IpnsEntry)
116+
err = proto.Unmarshal(val, e)
117+
if err != nil {
103118
return 0, err
104119
}
105120

106-
// None found, lets start at zero!
107-
return 0, nil
121+
return e.GetSequence(), nil
108122
}
109123

110124
func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.IpfsRouting, id peer.ID) error {

‎routing/dht/handlers.go

+58-18
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package dht
33
import (
44
"errors"
55
"fmt"
6+
"time"
67

78
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
89
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
910
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1011
key "github.com/ipfs/go-ipfs/blocks/key"
1112
peer "github.com/ipfs/go-ipfs/p2p/peer"
1213
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
14+
u "github.com/ipfs/go-ipfs/util"
1315
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
1416
)
1517

@@ -46,15 +48,41 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
4648
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
4749

4850
// first, is there even a key?
49-
k := pmes.GetKey()
51+
k := key.Key(pmes.GetKey())
5052
if k == "" {
5153
return nil, errors.New("handleGetValue but no key was provided")
5254
// TODO: send back an error response? could be bad, but the other node's hanging.
5355
}
5456

5557
// let's first check if we have the value locally.
58+
rec, err := dht.checkLocalDatastore(k)
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
// Find closest peer on given cluster to desired key and reply with that info
64+
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
65+
if len(closer) > 0 {
66+
closerinfos := peer.PeerInfos(dht.peerstore, closer)
67+
for _, pi := range closerinfos {
68+
log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
69+
if len(pi.Addrs) < 1 {
70+
log.Errorf(`no addresses on peer being sent!
71+
[local:%s]
72+
[sending:%s]
73+
[remote:%s]`, dht.self, pi.ID, p)
74+
}
75+
}
76+
77+
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
78+
}
79+
80+
return resp, nil
81+
}
82+
83+
func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) {
5684
log.Debugf("%s handleGetValue looking into ds", dht.self)
57-
dskey := key.Key(k).DsKey()
85+
dskey := k.DsKey()
5886
iVal, err := dht.datastore.Get(dskey)
5987
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)
6088

@@ -79,27 +107,35 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
79107
return nil, err
80108
}
81109

82-
resp.Record = rec
83-
}
110+
var recordIsBad bool
111+
recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
112+
if err != nil {
113+
log.Info("either no receive time set on record, or it was invalid: ", err)
114+
recordIsBad = true
115+
}
84116

85-
// Find closest peer on given cluster to desired key and reply with that info
86-
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
87-
if len(closer) > 0 {
88-
closerinfos := peer.PeerInfos(dht.peerstore, closer)
89-
for _, pi := range closerinfos {
90-
log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
91-
if len(pi.Addrs) < 1 {
92-
log.Errorf(`no addresses on peer being sent!
93-
[local:%s]
94-
[sending:%s]
95-
[remote:%s]`, dht.self, pi.ID, p)
117+
if time.Now().Sub(recvtime) > MaxRecordAge {
118+
log.Debug("old record found, tossing.")
119+
recordIsBad = true
120+
}
121+
122+
// NOTE: we do not verify the record here beyond checking these timestamps.
123+
// we put the burden of checking the records on the requester as checking a record
124+
// may be computationally expensive
125+
126+
if recordIsBad {
127+
err := dht.datastore.Delete(dskey)
128+
if err != nil {
129+
log.Error("Failed to delete bad record from datastore: ", err)
96130
}
131+
132+
return nil, nil // can treat this as not having the record at all
97133
}
98134

99-
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
135+
return rec, nil
100136
}
101137

102-
return resp, nil
138+
return nil, nil
103139
}
104140

105141
// Store a value in this peer local storage
@@ -112,7 +148,11 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
112148
return nil, err
113149
}
114150

115-
data, err := proto.Marshal(pmes.GetRecord())
151+
rec := pmes.GetRecord()
152+
// note the time we receive each record
153+
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
154+
155+
data, err := proto.Marshal(rec)
116156
if err != nil {
117157
return nil, err
118158
}

‎routing/dht/pb/dht.pb.go

+12-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎routing/dht/pb/dht.proto

+3
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,7 @@ message Record {
7575

7676
// A PKI signature for the key+value+author
7777
optional bytes signature = 4;
78+
79+
// Time the record was received, set by receiver
80+
optional string timeReceived = 5;
7881
}

‎routing/dht/records.go

+9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dht
22

33
import (
44
"fmt"
5+
"time"
56

67
ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac"
78
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
@@ -12,6 +13,14 @@ import (
1213
record "github.com/ipfs/go-ipfs/routing/record"
1314
)
1415

16+
// MaxRecordAge specifies the maximum time that any node will hold onto a record
17+
// from the time its received. This does not apply to any other forms of validity that
18+
// the record may contain.
19+
// For example, a record may contain an ipns entry with an EOL saying its valid
20+
// until the year 2020 (a great time in the future). For that record to stick around
21+
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
22+
const MaxRecordAge = time.Hour * 36
23+
1524
func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
1625
log.Debugf("getPublicKey for: %s", p)
1726

0 commit comments

Comments
 (0)
Please sign in to comment.