@@ -3,13 +3,15 @@ package dht
3
3
import (
4
4
"errors"
5
5
"fmt"
6
+ "time"
6
7
7
8
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
8
9
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
9
10
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
10
11
key "github.com/ipfs/go-ipfs/blocks/key"
11
12
peer "github.com/ipfs/go-ipfs/p2p/peer"
12
13
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
14
+ u "github.com/ipfs/go-ipfs/util"
13
15
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
14
16
)
15
17
@@ -46,15 +48,41 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
46
48
resp := pb .NewMessage (pmes .GetType (), pmes .GetKey (), pmes .GetClusterLevel ())
47
49
48
50
// first, is there even a key?
49
- k := pmes .GetKey ()
51
+ k := key . Key ( pmes .GetKey () )
50
52
if k == "" {
51
53
return nil , errors .New ("handleGetValue but no key was provided" )
52
54
// TODO: send back an error response? could be bad, but the other node's hanging.
53
55
}
54
56
55
57
// let's first check if we have the value locally.
58
+ rec , err := dht .checkLocalDatastore (k )
59
+ if err != nil {
60
+ return nil , err
61
+ }
62
+
63
+ // Find closest peer on given cluster to desired key and reply with that info
64
+ closer := dht .betterPeersToQuery (pmes , p , CloserPeerCount )
65
+ if len (closer ) > 0 {
66
+ closerinfos := peer .PeerInfos (dht .peerstore , closer )
67
+ for _ , pi := range closerinfos {
68
+ log .Debugf ("handleGetValue returning closer peer: '%s'" , pi .ID )
69
+ if len (pi .Addrs ) < 1 {
70
+ log .Errorf (`no addresses on peer being sent!
71
+ [local:%s]
72
+ [sending:%s]
73
+ [remote:%s]` , dht .self , pi .ID , p )
74
+ }
75
+ }
76
+
77
+ resp .CloserPeers = pb .PeerInfosToPBPeers (dht .host .Network (), closerinfos )
78
+ }
79
+
80
+ return resp , nil
81
+ }
82
+
83
+ func (dht * IpfsDHT ) checkLocalDatastore (k key.Key ) (* pb.Record , error ) {
56
84
log .Debugf ("%s handleGetValue looking into ds" , dht .self )
57
- dskey := key . Key ( k ) .DsKey ()
85
+ dskey := k .DsKey ()
58
86
iVal , err := dht .datastore .Get (dskey )
59
87
log .Debugf ("%s handleGetValue looking into ds GOT %v" , dht .self , iVal )
60
88
@@ -79,27 +107,35 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
79
107
return nil , err
80
108
}
81
109
82
- resp .Record = rec
83
- }
110
+ var recordIsBad bool
111
+ recvtime , err := u .ParseRFC3339 (rec .GetTimeReceived ())
112
+ if err != nil {
113
+ log .Info ("either no receive time set on record, or it was invalid: " , err )
114
+ recordIsBad = true
115
+ }
84
116
85
- // Find closest peer on given cluster to desired key and reply with that info
86
- closer := dht .betterPeersToQuery (pmes , p , CloserPeerCount )
87
- if len (closer ) > 0 {
88
- closerinfos := peer .PeerInfos (dht .peerstore , closer )
89
- for _ , pi := range closerinfos {
90
- log .Debugf ("handleGetValue returning closer peer: '%s'" , pi .ID )
91
- if len (pi .Addrs ) < 1 {
92
- log .Errorf (`no addresses on peer being sent!
93
- [local:%s]
94
- [sending:%s]
95
- [remote:%s]` , dht .self , pi .ID , p )
117
+ if time .Now ().Sub (recvtime ) > MaxRecordAge {
118
+ log .Debug ("old record found, tossing." )
119
+ recordIsBad = true
120
+ }
121
+
122
+ // NOTE: we do not verify the record here beyond checking these timestamps.
123
+ // we put the burden of checking the records on the requester as checking a record
124
+ // may be computationally expensive
125
+
126
+ if recordIsBad {
127
+ err := dht .datastore .Delete (dskey )
128
+ if err != nil {
129
+ log .Error ("Failed to delete bad record from datastore: " , err )
96
130
}
131
+
132
+ return nil , nil // can treat this as not having the record at all
97
133
}
98
134
99
- resp . CloserPeers = pb . PeerInfosToPBPeers ( dht . host . Network (), closerinfos )
135
+ return rec , nil
100
136
}
101
137
102
- return resp , nil
138
+ return nil , nil
103
139
}
104
140
105
141
// Store a value in this peer local storage
@@ -112,7 +148,11 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
112
148
return nil , err
113
149
}
114
150
115
- data , err := proto .Marshal (pmes .GetRecord ())
151
+ rec := pmes .GetRecord ()
152
+ // note the time we receive each record
153
+ rec .TimeReceived = proto .String (u .FormatRFC3339 (time .Now ()))
154
+
155
+ data , err := proto .Marshal (rec )
116
156
if err != nil {
117
157
return nil , err
118
158
}
0 commit comments