Skip to content
Snippets Groups Projects
Commit 3b9ad300 authored by Jeromy Johnson's avatar Jeromy Johnson
Browse files

Addressing comments from CR


License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent e5512b41
Branches
No related tags found
1 merge request!1New
package namesys
import (
"fmt"
"math/rand"
"testing"
"time"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
pb "github.com/ipfs/go-ipfs/namesys/pb"
ci "github.com/ipfs/go-ipfs/p2p/crypto"
path "github.com/ipfs/go-ipfs/path"
u "github.com/ipfs/go-ipfs/util"
)
func shuffle(a []*pb.IpnsEntry) {
for n := 0; n < 5; n++ {
for i, _ := range a {
j := rand.Intn(len(a))
a[i], a[j] = a[j], a[i]
}
}
}
func AssertSelected(r *pb.IpnsEntry, from ...*pb.IpnsEntry) error {
shuffle(from)
var vals [][]byte
for _, r := range from {
data, err := proto.Marshal(r)
if err != nil {
return err
}
vals = append(vals, data)
}
i, err := selectRecord(from, vals)
if err != nil {
return err
}
if from[i] != r {
return fmt.Errorf("selected incorrect record %d", i)
}
return nil
}
func TestOrdering(t *testing.T) {
// select timestamp so selection is deterministic
ts := time.Unix(1000000, 0)
// generate a key for signing the records
r := u.NewSeededRand(15) // generate deterministic keypair
priv, _, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, r)
if err != nil {
t.Fatal(err)
}
e1, err := CreateRoutingEntryData(priv, path.Path("foo"), 1, ts.Add(time.Hour))
if err != nil {
t.Fatal(err)
}
e2, err := CreateRoutingEntryData(priv, path.Path("bar"), 2, ts.Add(time.Hour))
if err != nil {
t.Fatal(err)
}
e3, err := CreateRoutingEntryData(priv, path.Path("baz"), 3, ts.Add(time.Hour))
if err != nil {
t.Fatal(err)
}
e4, err := CreateRoutingEntryData(priv, path.Path("cat"), 3, ts.Add(time.Hour*2))
if err != nil {
t.Fatal(err)
}
e5, err := CreateRoutingEntryData(priv, path.Path("dog"), 4, ts.Add(time.Hour*3))
if err != nil {
t.Fatal(err)
}
e6, err := CreateRoutingEntryData(priv, path.Path("fish"), 4, ts.Add(time.Hour*3))
if err != nil {
t.Fatal(err)
}
// e1 is the only record, i hope it gets this right
err = AssertSelected(e1, e1)
if err != nil {
t.Fatal(err)
}
// e2 has the highest sequence number
err = AssertSelected(e2, e1, e2)
if err != nil {
t.Fatal(err)
}
// e3 has the highest sequence number
err = AssertSelected(e3, e1, e2, e3)
if err != nil {
t.Fatal(err)
}
// e4 has a higher timeout
err = AssertSelected(e4, e1, e2, e3, e4)
if err != nil {
t.Fatal(err)
}
// e5 has the highest sequence number
err = AssertSelected(e5, e1, e2, e3, e4, e5)
if err != nil {
t.Fatal(err)
}
// e6 should be selected as its signauture will win in the comparison
err = AssertSelected(e6, e1, e2, e3, e4, e5, e6)
if err != nil {
t.Fatal(err)
}
_ = []interface{}{e1, e2, e3, e4, e5, e6}
}
......@@ -60,7 +60,7 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value
return err
}
namekey, ipnskey := IpnsKeysForID(id)
_, ipnskey := IpnsKeysForID(id)
// get previous records sequence number, and add one to it
var seqnum uint64
......@@ -77,17 +77,22 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value
return err
}
return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id)
}
func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.IpfsRouting, id peer.ID) error {
namekey, ipnskey := IpnsKeysForID(id)
entry, err := CreateRoutingEntryData(k, value, seqnum, eol)
if err != nil {
return err
}
err = PublishEntry(ctx, p.routing, ipnskey, entry)
err = PublishEntry(ctx, r, ipnskey, entry)
if err != nil {
return err
}
err = PublishPublicKey(ctx, p.routing, namekey, k.GetPublic())
err = PublishPublicKey(ctx, r, namekey, k.GetPublic())
if err != nil {
return err
}
......@@ -174,13 +179,18 @@ func IpnsSelectorFunc(k key.Key, vals [][]byte) (int, error) {
}
}
return selectRecord(recs, vals)
}
func selectRecord(recs []*pb.IpnsEntry, vals [][]byte) (int, error) {
var best_seq uint64
best_i := -1
for i, r := range recs {
if r == nil {
if r == nil || r.GetSequence() < best_seq {
continue
}
if best_i == -1 || r.GetSequence() > best_seq {
best_seq = r.GetSequence()
best_i = i
......@@ -196,8 +206,11 @@ func IpnsSelectorFunc(k key.Key, vals [][]byte) (int, error) {
}
if rt.After(bestt) {
best_seq = r.GetSequence()
best_i = i
} else if rt == bestt {
if bytes.Compare(vals[i], vals[best_i]) > 0 {
best_i = i
}
}
}
}
......
......@@ -69,7 +69,7 @@ func (rp *Republisher) Run(proc goprocess.Process) {
case <-tick.C:
err := rp.republishEntries(proc)
if err != nil {
log.Error(err)
log.Error("Republisher failed to republish: ", err)
}
case <-proc.Closing():
return
......@@ -86,7 +86,7 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error {
priv := rp.ps.PrivKey(id)
// Look for it locally only
namekey, ipnskey := namesys.IpnsKeysForID(id)
_, ipnskey := namesys.IpnsKeysForID(id)
p, seq, err := rp.getLastVal(ipnskey)
if err != nil {
if err == errNoEntry {
......@@ -97,19 +97,7 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error {
// update record with same sequence number
eol := time.Now().Add(rp.RecordLifetime)
entry, err := namesys.CreateRoutingEntryData(priv, p, seq, eol)
if err != nil {
return err
}
// republish public key
err = namesys.PublishPublicKey(ctx, rp.r, namekey, priv.GetPublic())
if err != nil {
return err
}
// republish ipns entry
err = namesys.PublishEntry(ctx, rp.r, ipnskey, entry)
err = namesys.PutRecordToRouting(ctx, priv, p, seq, eol, rp.r, id)
if err != nil {
return err
}
......
......@@ -3,6 +3,7 @@ package dht
import (
"bytes"
"sync"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
......@@ -60,6 +61,8 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) err
for p := range pchan {
wg.Add(1)
go func(p peer.ID) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer wg.Done()
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Value,
......@@ -78,7 +81,10 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) err
// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
vals, err := dht.GetValues(ctx, key, 3)
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
vals, err := dht.GetValues(ctx, key, 16)
if err != nil {
return nil, err
}
......@@ -111,6 +117,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
// if someone sent us a different 'less-valid' record, lets correct them
if !bytes.Equal(v.Val, best) {
go func(v routing.RecvdVal) {
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
defer cancel()
err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
if err != nil {
log.Error("Error correcting DHT entry: ", err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment