addrbook cleanup

This commit is contained in:
Jae Kwon
2014-07-10 02:19:50 -07:00
parent 1b59caf950
commit 442cae1f3f
7 changed files with 384 additions and 235 deletions

54
common/cmap.go Normal file
View File

@ -0,0 +1,54 @@
package common
import "sync"
/*
CMap is a threadsafe map
*/
type CMap struct {
m map[string]interface{}
l sync.Mutex
}
func NewCMap() *CMap {
return &CMap{
m: make(map[string]interface{}, 0),
}
}
func (cm *CMap) Set(key string, value interface{}) {
cm.l.Lock()
defer cm.l.Unlock()
cm.m[key] = value
}
func (cm *CMap) Get(key string) interface{} {
cm.l.Lock()
defer cm.l.Unlock()
return cm.m[key]
}
func (cm *CMap) Has(key string) bool {
cm.l.Lock()
defer cm.l.Unlock()
_, ok := cm.m[key]
return ok
}
func (cm *CMap) Delete(key string) {
cm.l.Lock()
defer cm.l.Unlock()
delete(cm.m, key)
}
func (cm *CMap) Size() int {
cm.l.Lock()
defer cm.l.Unlock()
return len(cm.m)
}
func (cm *CMap) Clear() {
cm.l.Lock()
defer cm.l.Unlock()
cm.m = make(map[string]interface{}, 0)
}

20
main.go
View File

@ -17,9 +17,10 @@ const (
) )
type Node struct { type Node struct {
sw *p2p.Switch sw *p2p.Switch
book *p2p.AddressBook book *p2p.AddressBook
quit chan struct{} quit chan struct{}
dialing *CMap
} }
func NewNode() *Node { func NewNode() *Node {
@ -50,8 +51,10 @@ func NewNode() *Node {
book := p2p.NewAddrBook(config.AppDir + "/addrbook.json") book := p2p.NewAddrBook(config.AppDir + "/addrbook.json")
return &New{ return &New{
sw: sw, sw: sw,
book: book, book: book,
quit: make(chan struct{}, 0),
dialing: NewCMap(),
} }
} }
@ -90,7 +93,12 @@ func (n *Node) AddListener(l Listener) {
// Ensures that sufficient peers are connected. // Ensures that sufficient peers are connected.
func (n *Node) ensurePeers() { func (n *Node) ensurePeers() {
numPeers := len(n.sw.Peers()) numPeers := len(n.sw.Peers())
if numPeers < minNumPeers { numDialing := n.dialing.Size()
numToDial = minNumPeers - (numPeers + numDialing)
if numToDial <= 0 {
return
}
for i := 0; i < numToDial; i++ {
// XXX // XXX
} }
} }

View File

@ -17,28 +17,31 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
. "github.com/tendermint/tendermint/binary"
) )
/* AddrBook - concurrency safe peer address manager */ /* AddrBook - concurrency safe peer address manager */
type AddrBook struct { type AddrBook struct {
filePath string filePath string
mtx sync.Mutex mtx sync.Mutex
rand *rand.Rand rand *rand.Rand
key [32]byte key [32]byte
addrIndex map[string]*knownAddress // new & old addrLookup map[string]*knownAddress // new & old
addrNew [newBucketCount]map[string]*knownAddress addrNew []map[string]*knownAddress
addrOld [oldBucketCount][]*knownAddress addrOld []map[string]*knownAddress
started int32 started uint32
shutdown int32 stopped uint32
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
nOld int nOld int
nNew int nNew int
} }
const (
bucketTypeNew = 0x01
bucketTypeOld = 0x02
)
const ( const (
// addresses under which the address manager will claim to need more addresses. // addresses under which the address manager will claim to need more addresses.
needAddressThreshold = 1000 needAddressThreshold = 1000
@ -65,7 +68,7 @@ const (
newBucketsPerGroup = 32 newBucketsPerGroup = 32
// buckets a frequently seen new address may end up in. // buckets a frequently seen new address may end up in.
newBucketsPerAddress = 4 maxNewBucketsPerAddress = 4
// days before which we assume an address has vanished // days before which we assume an address has vanished
// if we have not seen it announced in that long. // if we have not seen it announced in that long.
@ -103,33 +106,36 @@ func NewAddrBook(filePath string) *AddrBook {
// When modifying this, don't forget to update loadFromFile() // When modifying this, don't forget to update loadFromFile()
func (a *AddrBook) init() { func (a *AddrBook) init() {
a.addrIndex = make(map[string]*knownAddress)
io.ReadFull(crand.Reader, a.key[:]) io.ReadFull(crand.Reader, a.key[:])
// addr -> ka index
a.addrLookup = make(map[string]*knownAddress)
// New addr buckets
a.addrNew = make([]map[string]*knownAddress, newBucketCount)
for i := range a.addrNew { for i := range a.addrNew {
a.addrNew[i] = make(map[string]*knownAddress) a.addrNew[i] = make(map[string]*knownAddress)
} }
// Old addr buckets
a.addrOld = make([]map[string]*knownAddress, oldBucketCount)
for i := range a.addrOld { for i := range a.addrOld {
a.addrOld[i] = make([]*knownAddress, 0, oldBucketSize) a.addrOld[i] = make(map[string]*knownAddress)
} }
} }
func (a *AddrBook) Start() { func (a *AddrBook) Start() {
if atomic.AddInt32(&a.started, 1) != 1 { if atomic.CompareAndSwapUint32(&a.started, 0, 1) {
return log.Trace("Starting address manager")
a.loadFromFile(a.filePath)
a.wg.Add(1)
go a.saveHandler()
} }
log.Trace("Starting address manager")
a.loadFromFile(a.filePath)
a.wg.Add(1)
go a.saveHandler()
} }
func (a *AddrBook) Stop() { func (a *AddrBook) Stop() {
if atomic.AddInt32(&a.shutdown, 1) != 1 { if atomic.CompareAndSwapUint32(&a.stopped, 0, 1) {
return log.Infof("Stopping address manager")
close(a.quit)
a.wg.Wait()
} }
log.Infof("Address manager shutting down")
close(a.quit)
a.wg.Wait()
} }
func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) { func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {
@ -153,11 +159,11 @@ func (a *AddrBook) size() int {
} }
// Pick an address to connect to with new/old bias. // Pick an address to connect to with new/old bias.
func (a *AddrBook) PickAddress(newBias int) *knownAddress { func (a *AddrBook) PickAddress(newBias int) *NetAddress {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
if a.nOld == 0 && a.nNew == 0 { if a.size() == 0 {
return nil return nil
} }
if newBias > 100 { if newBias > 100 {
@ -173,12 +179,19 @@ func (a *AddrBook) PickAddress(newBias int) *knownAddress {
if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation { if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation {
// pick random Old bucket. // pick random Old bucket.
var bucket []*knownAddress = nil var bucket map[string]*knownAddress = nil
for len(bucket) == 0 { for len(bucket) == 0 {
bucket = a.addrOld[a.rand.Intn(len(a.addrOld))] bucket = a.addrOld[a.rand.Intn(len(a.addrOld))]
} }
// pick a random ka from bucket. // pick a random ka from bucket.
return bucket[a.rand.Intn(len(bucket))] randIndex := a.rand.Intn(len(bucket))
for _, ka := range bucket {
randIndex--
if randIndex == 0 {
return ka.Addr
}
}
panic("Should not happen")
} else { } else {
// pick random New bucket. // pick random New bucket.
var bucket map[string]*knownAddress = nil var bucket map[string]*knownAddress = nil
@ -190,7 +203,7 @@ func (a *AddrBook) PickAddress(newBias int) *knownAddress {
for _, ka := range bucket { for _, ka := range bucket {
randIndex-- randIndex--
if randIndex == 0 { if randIndex == 0 {
return ka return ka.Addr
} }
} }
panic("Should not happen") panic("Should not happen")
@ -201,12 +214,12 @@ func (a *AddrBook) PickAddress(newBias int) *knownAddress {
func (a *AddrBook) MarkGood(addr *NetAddress) { func (a *AddrBook) MarkGood(addr *NetAddress) {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
ka := a.addrIndex[addr.String()] ka := a.addrLookup[addr.String()]
if ka == nil { if ka == nil {
return return
} }
ka.MarkGood() ka.markGood()
if ka.OldBucket == -1 { if ka.isNew() {
a.moveToOld(ka) a.moveToOld(ka)
} }
} }
@ -214,11 +227,23 @@ func (a *AddrBook) MarkGood(addr *NetAddress) {
func (a *AddrBook) MarkAttempt(addr *NetAddress) { func (a *AddrBook) MarkAttempt(addr *NetAddress) {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
ka := a.addrIndex[addr.String()] ka := a.addrLookup[addr.String()]
if ka == nil { if ka == nil {
return return
} }
ka.MarkAttempt() ka.markAttempt()
}
func (a *AddrBook) MarkBad(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[addr.String()]
if ka == nil {
return
}
// We currently just eject the address.
// In the future, consider blacklisting.
a.removeFromAllBuckets(ka)
} }
/* Peer exchange */ /* Peer exchange */
@ -227,13 +252,14 @@ func (a *AddrBook) MarkAttempt(addr *NetAddress) {
func (a *AddrBook) GetSelection() []*NetAddress { func (a *AddrBook) GetSelection() []*NetAddress {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
if a.size() == 0 { if a.size() == 0 {
return nil return nil
} }
allAddr := make([]*NetAddress, a.size()) allAddr := make([]*NetAddress, a.size())
i := 0 i := 0
for _, v := range a.addrIndex { for _, v := range a.addrLookup {
allAddr[i] = v.Addr allAddr[i] = v.Addr
i++ i++
} }
@ -258,30 +284,20 @@ func (a *AddrBook) GetSelection() []*NetAddress {
/* Loading & Saving */ /* Loading & Saving */
type addrBookJSON struct { type addrBookJSON struct {
Key [32]byte Key [32]byte
AddrNew [newBucketCount][]*knownAddress Addrs []*knownAddress
AddrOld [oldBucketCount][]*knownAddress
NumOld int
NumNew int
} }
func (a *AddrBook) saveToFile(filePath string) { func (a *AddrBook) saveToFile(filePath string) {
// turn a.addrNew into an array like a.addrOld // Compile Addrs
__addrNew := [newBucketCount][]*knownAddress{} addrs := []*knownAddress{}
for i, newBucket := range a.addrNew { for _, ka := range a.addrLookup {
var array []*knownAddress = make([]*knownAddress, 0) addrs = append(addrs, ka)
for _, ka := range newBucket {
array = append(array, ka)
}
__addrNew[i] = array
} }
aJSON := &addrBookJSON{ aJSON := &addrBookJSON{
Key: a.key, Key: a.key,
AddrNew: __addrNew, Addrs: addrs,
AddrOld: a.addrOld,
NumOld: a.nOld,
NumNew: a.nNew,
} }
w, err := os.Create(filePath) w, err := os.Create(filePath)
@ -305,13 +321,11 @@ func (a *AddrBook) loadFromFile(filePath string) {
} }
// Load addrBookJSON{} // Load addrBookJSON{}
r, err := os.Open(filePath) r, err := os.Open(filePath)
if err != nil { if err != nil {
panic(fmt.Errorf("%s error opening file: %v", filePath, err)) panic(fmt.Errorf("%s error opening file: %v", filePath, err))
} }
defer r.Close() defer r.Close()
aJSON := &addrBookJSON{} aJSON := &addrBookJSON{}
dec := json.NewDecoder(r) dec := json.NewDecoder(r)
err = dec.Decode(aJSON) err = dec.Decode(aJSON)
@ -319,27 +333,22 @@ func (a *AddrBook) loadFromFile(filePath string) {
panic(fmt.Errorf("error reading %s: %v", filePath, err)) panic(fmt.Errorf("error reading %s: %v", filePath, err))
} }
// Now we need to restore the fields // Restore all the fields...
// Restore the key // Restore the key
copy(a.key[:], aJSON.Key[:]) copy(a.key[:], aJSON.Key[:])
// Restore .addrNew // Restore .addrNew & .addrOld
for i, newBucket := range aJSON.AddrNew { for _, ka := range aJSON.Addrs {
for _, ka := range newBucket { for _, bucketIndex := range ka.Buckets {
a.addrNew[i][ka.Addr.String()] = ka bucket := a.getBucket(ka.BucketType, bucketIndex)
a.addrIndex[ka.Addr.String()] = ka bucket[ka.Addr.String()] = ka
}
a.addrLookup[ka.Addr.String()] = ka
if ka.BucketType == bucketTypeNew {
a.nNew++
} else {
a.nOld++
} }
} }
// Restore .addrOld
for i, oldBucket := range aJSON.AddrOld {
copy(a.addrOld[i], oldBucket)
for _, ka := range oldBucket {
a.addrIndex[ka.Addr.String()] = ka
}
}
// Restore simple fields
a.nNew = aJSON.NumNew
a.nOld = aJSON.NumOld
} }
/* Private methods */ /* Private methods */
@ -361,160 +370,220 @@ out:
log.Trace("Address handler done") log.Trace("Address handler done")
} }
func (a *AddrBook) addAddress(addr, src *NetAddress) { func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
if !addr.Routable() { switch bucketType {
return case bucketTypeNew:
return a.addrNew[bucketIdx]
case bucketTypeOld:
return a.addrOld[bucketIdx]
default:
panic("Should not happen")
}
}
// Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full.
// NOTE: currently it always returns true.
func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
// Sanity check
if ka.isOld() {
panic("Cannot add address already in old bucket to a new bucket")
} }
key := addr.String() key := ka.Addr.String()
ka := a.addrIndex[key] bucket := a.getBucket(bucketTypeNew, bucketIdx)
// Already exists?
if _, ok := bucket[key]; ok {
return true
}
// Enforce max addresses.
if len(bucket) > newBucketSize {
log.Tracef("new bucket is full, expiring old ")
a.expireNew(bucketIdx)
}
// Add to bucket.
bucket[key] = ka
if ka.addBucketRef(bucketIdx) == 1 {
a.nNew++
}
// Ensure in addrLookup
a.addrLookup[key] = ka
return true
}
// Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full.
func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
// Sanity check
if ka.isNew() {
panic("Cannot add new address to old bucket")
}
if len(ka.Buckets) != 0 {
panic("Cannot add already old address to another old bucket")
}
key := ka.Addr.String()
bucket := a.getBucket(bucketTypeNew, bucketIdx)
// Already exists?
if _, ok := bucket[key]; ok {
return true
}
// Enforce max addresses.
if len(bucket) > oldBucketSize {
return false
}
// Add to bucket.
bucket[key] = ka
if ka.addBucketRef(bucketIdx) == 1 {
a.nOld++
}
// Ensure in addrLookup
a.addrLookup[key] = ka
return true
}
func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
if ka.BucketType != bucketType {
panic("Bucket type mismatch")
}
bucket := a.getBucket(bucketType, bucketIdx)
delete(bucket, ka.Addr.String())
if ka.removeBucketRef(bucketIdx) == 0 {
if bucketType == bucketTypeNew {
a.nNew--
} else {
a.nOld--
}
delete(a.addrLookup, ka.Addr.String())
}
}
func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
for _, bucketIdx := range ka.Buckets {
bucket := a.getBucket(ka.BucketType, bucketIdx)
delete(bucket, ka.Addr.String())
}
ka.Buckets = nil
if ka.BucketType == bucketTypeNew {
a.nNew--
} else {
a.nOld--
}
delete(a.addrLookup, ka.Addr.String())
}
func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
bucket := a.getBucket(bucketType, bucketIdx)
var oldest *knownAddress
for _, ka := range bucket {
if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
oldest = ka
}
}
return oldest
}
func (a *AddrBook) addAddress(addr, src *NetAddress) {
if !addr.Routable() {
panic("Cannot add non-routable address")
}
ka := a.addrLookup[addr.String()]
if ka != nil { if ka != nil {
// Already added // Already old.
if ka.OldBucket != -1 { if ka.isOld() {
return return
} }
if ka.NewRefs == newBucketsPerAddress { // Already in max new buckets.
if len(ka.Buckets) == maxNewBucketsPerAddress {
return return
} }
// The more entries we have, the less likely we are to add more. // The more entries we have, the less likely we are to add more.
factor := int32(2 * ka.NewRefs) factor := int32(2 * len(ka.Buckets))
if a.rand.Int31n(factor) != 0 { if a.rand.Int31n(factor) != 0 {
return return
} }
} else { } else {
ka = newKnownAddress(addr, src) ka = newKnownAddress(addr, src)
a.addrIndex[key] = ka
a.nNew++
} }
bucket := a.getNewBucket(addr, src) bucket := a.calcNewBucket(addr, src)
a.addToNewBucket(ka, bucket)
// Already exists? log.Tracef("Added new address %s for a total of %d addresses", addr, a.size())
if _, ok := a.addrNew[bucket][key]; ok {
return
}
// Enforce max addresses.
if len(a.addrNew[bucket]) > newBucketSize {
log.Tracef("new bucket is full, expiring old ")
a.expireNew(bucket)
}
// Add to new bucket.
ka.NewRefs++
a.addrNew[bucket][key] = ka
log.Tracef("Added new address %s for a total of %d addresses", addr, a.nOld+a.nNew)
} }
// Make space in the new buckets by expiring the really bad entries. // Make space in the new buckets by expiring the really bad entries.
// If no bad entries are available we look at a few and remove the oldest. // If no bad entries are available we remove the oldest.
func (a *AddrBook) expireNew(bucket int) { func (a *AddrBook) expireNew(bucketIdx int) {
var oldest *knownAddress for key, ka := range a.addrNew[bucketIdx] {
for k, v := range a.addrNew[bucket] {
// If an entry is bad, throw it away // If an entry is bad, throw it away
if v.IsBad() { if ka.IsBad() {
log.Tracef("expiring bad address %v", k) log.Tracef("expiring bad address %v", key)
delete(a.addrNew[bucket], k) a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
v.NewRefs--
if v.NewRefs == 0 {
a.nNew--
delete(a.addrIndex, k)
}
return return
} }
// or, keep track of the oldest entry
if oldest == nil {
oldest = v
} else if v.LastAttempt.Before(oldest.LastAttempt.Time) {
oldest = v
}
} }
// If we haven't thrown out a bad entry, throw out the oldest entry // If we haven't thrown out a bad entry, throw out the oldest entry
if oldest != nil { oldest := a.pickOldest(bucketTypeNew, bucketIdx)
key := oldest.Addr.String() a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
log.Tracef("expiring oldest address %v", key)
delete(a.addrNew[bucket], key)
oldest.NewRefs--
if oldest.NewRefs == 0 {
a.nNew--
delete(a.addrIndex, key)
}
}
} }
// Promotes an address from new to old.
func (a *AddrBook) moveToOld(ka *knownAddress) { func (a *AddrBook) moveToOld(ka *knownAddress) {
// Remove from all new buckets. // Sanity check
// Remember one of those new buckets. if ka.isOld() {
addrKey := ka.Addr.String() panic("Cannot promote address that is already old")
freedBucket := -1 }
for i := range a.addrNew { if len(ka.Buckets) == 0 {
// we check for existance so we can record the first one panic("Cannot promote address that isn't in any new buckets")
if _, ok := a.addrNew[i][addrKey]; ok { }
delete(a.addrNew[i], addrKey)
ka.NewRefs-- // Remember one of the buckets in which ka is in.
if freedBucket == -1 { freedBucket := ka.Buckets[0]
freedBucket = i // Remove from all (new) buckets.
a.removeFromAllBuckets(ka)
// It's officially old now.
ka.BucketType = bucketTypeOld
// Try to add it to its oldBucket destination.
oldBucketIdx := a.calcOldBucket(ka.Addr)
added := a.addToOldBucket(ka, oldBucketIdx)
if !added {
// No room, must evict something
oldest := a.pickOldest(bucketTypeOld, oldBucketIdx)
a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx)
// Find new bucket to put oldest in
newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src)
added := a.addToNewBucket(oldest, newBucketIdx)
// No space in newBucket either, just put it in freedBucket from above.
if !added {
added := a.addToNewBucket(oldest, freedBucket)
if !added {
log.Warnf("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket)
} }
} }
} // Finally, add to bucket again.
a.nNew-- added = a.addToOldBucket(ka, oldBucketIdx)
if freedBucket == -1 { if !added {
panic("Expected to find addr in at least one new bucket") log.Warnf("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx)
}
oldBucket := a.getOldBucket(ka.Addr)
// If room in oldBucket, put it in.
if len(a.addrOld[oldBucket]) < oldBucketSize {
ka.OldBucket = Int16(oldBucket)
a.addrOld[oldBucket] = append(a.addrOld[oldBucket], ka)
a.nOld++
return
}
// No room, we have to evict something else.
rmkaIndex := a.pickOld(oldBucket)
rmka := a.addrOld[oldBucket][rmkaIndex]
// Find a new bucket to put rmka in.
newBucket := a.getNewBucket(rmka.Addr, rmka.Src)
if len(a.addrNew[newBucket]) >= newBucketSize {
newBucket = freedBucket
}
// Replace with ka in list.
ka.OldBucket = Int16(oldBucket)
a.addrOld[oldBucket][rmkaIndex] = ka
rmka.OldBucket = -1
// Put rmka into new bucket
rmkey := rmka.Addr.String()
log.Tracef("Replacing %s with %s in old", rmkey, addrKey)
a.addrNew[newBucket][rmkey] = rmka
rmka.NewRefs++
a.nNew++
}
// Returns the index in old bucket of oldest entry.
func (a *AddrBook) pickOld(bucket int) int {
var oldest *knownAddress
var oldestIndex int
for i, ka := range a.addrOld[bucket] {
if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt.Time) {
oldest = ka
oldestIndex = i
} }
} }
return oldestIndex
} }
// doublesha256(key + sourcegroup + // doublesha256(key + sourcegroup +
// int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckes // int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckes
func (a *AddrBook) getNewBucket(addr, src *NetAddress) int { func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
data1 := []byte{} data1 := []byte{}
data1 = append(data1, a.key[:]...) data1 = append(data1, a.key[:]...)
data1 = append(data1, []byte(groupKey(addr))...) data1 = append(data1, []byte(groupKey(addr))...)
@ -534,7 +603,7 @@ func (a *AddrBook) getNewBucket(addr, src *NetAddress) int {
} }
// doublesha256(key + group + truncate_to_64bits(doublesha256(key + addr))%buckets_per_group) % num_buckets // doublesha256(key + group + truncate_to_64bits(doublesha256(key + addr))%buckets_per_group) % num_buckets
func (a *AddrBook) getOldBucket(addr *NetAddress) int { func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
data1 := []byte{} data1 := []byte{}
data1 = append(data1, a.key[:]...) data1 = append(data1, a.key[:]...)
data1 = append(data1, []byte(addr.String())...) data1 = append(data1, []byte(addr.String())...)
@ -601,6 +670,8 @@ func groupKey(na *NetAddress) string {
return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String() return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
} }
//-----------------------------------------------------------------------------
/* /*
knownAddress knownAddress
@ -610,59 +681,69 @@ func groupKey(na *NetAddress) string {
type knownAddress struct { type knownAddress struct {
Addr *NetAddress Addr *NetAddress
Src *NetAddress Src *NetAddress
Attempts UInt32 Attempts uint32
LastAttempt Time LastAttempt time.Time
LastSuccess Time LastSuccess time.Time
NewRefs UInt16 BucketType byte
OldBucket Int16 Buckets []int
} }
func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress { func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress {
return &knownAddress{ return &knownAddress{
Addr: addr, Addr: addr,
Src: src, Src: src,
OldBucket: -1,
LastAttempt: Time{time.Now()},
Attempts: 0, Attempts: 0,
LastAttempt: time.Now(),
BucketType: bucketTypeNew,
Buckets: nil,
} }
} }
func readKnownAddress(r io.Reader) *knownAddress { func (ka *knownAddress) isOld() bool {
return &knownAddress{ return ka.BucketType == bucketTypeOld
Addr: ReadNetAddress(r),
Src: ReadNetAddress(r),
Attempts: ReadUInt32(r),
LastAttempt: ReadTime(r),
LastSuccess: ReadTime(r),
NewRefs: ReadUInt16(r),
OldBucket: ReadInt16(r),
}
} }
func (ka *knownAddress) WriteTo(w io.Writer) (n int64, err error) { func (ka *knownAddress) isNew() bool {
n, err = WriteOnto(ka.Addr, w, n, err) return ka.BucketType == bucketTypeNew
n, err = WriteOnto(ka.Src, w, n, err)
n, err = WriteOnto(ka.Attempts, w, n, err)
n, err = WriteOnto(ka.LastAttempt, w, n, err)
n, err = WriteOnto(ka.LastSuccess, w, n, err)
n, err = WriteOnto(ka.NewRefs, w, n, err)
n, err = WriteOnto(ka.OldBucket, w, n, err)
return
} }
func (ka *knownAddress) MarkAttempt() { func (ka *knownAddress) markAttempt() {
now := Time{time.Now()} now := time.Now()
ka.LastAttempt = now ka.LastAttempt = now
ka.Attempts += 1 ka.Attempts += 1
} }
func (ka *knownAddress) MarkGood() { func (ka *knownAddress) markGood() {
now := Time{time.Now()} now := time.Now()
ka.LastAttempt = now ka.LastAttempt = now
ka.Attempts = 0 ka.Attempts = 0
ka.LastSuccess = now ka.LastSuccess = now
} }
func (ka *knownAddress) addBucketRef(bucketIdx int) int {
for _, bucket := range ka.Buckets {
if bucket == bucketIdx {
panic("Bucket already exists in ka.Buckets")
}
}
ka.Buckets = append(ka.Buckets, bucketIdx)
return len(ka.Buckets)
}
func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
buckets := []int{}
for _, bucket := range ka.Buckets {
if bucket != bucketIdx {
buckets = append(buckets, bucket)
}
}
if len(buckets) != len(ka.Buckets)-1 {
panic("bucketIdx not found in ka.Buckets")
}
ka.Buckets = buckets
return len(ka.Buckets)
}
/* /*
An address is bad if the address in question has not been tried in the last An address is bad if the address in question has not been tried in the last
minute and meets one of the following criteria: minute and meets one of the following criteria:

View File

@ -96,7 +96,7 @@ func TestSaveAddresses(t *testing.T) {
for _, addrSrc := range randAddrs { for _, addrSrc := range randAddrs {
addr := addrSrc.addr addr := addrSrc.addr
src := addrSrc.src src := addrSrc.src
ka := book.addrIndex[addr.String()] ka := book.addrLookup[addr.String()]
if ka == nil { if ka == nil {
t.Fatalf("Expected to find KnownAddress %v but wasn't there.", addr) t.Fatalf("Expected to find KnownAddress %v but wasn't there.", addr)
} }

View File

@ -65,8 +65,8 @@ func NewConnection(conn net.Conn) *Connection {
// .Start() begins multiplexing packets to and from "channels". // .Start() begins multiplexing packets to and from "channels".
// If an error occurs, the recovered reason is passed to "onError". // If an error occurs, the recovered reason is passed to "onError".
func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) { func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) {
log.Debugf("Starting %v", c)
if atomic.CompareAndSwapUint32(&c.started, 0, 1) { if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
log.Debugf("Starting %v", c)
c.channels = channels c.channels = channels
c.onError = onError c.onError = onError
go c.sendHandler() go c.sendHandler()

View File

@ -5,11 +5,13 @@
package p2p package p2p
import ( import (
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"io" "io"
"net" "net"
"strconv" "strconv"
"time"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
) )
/* NetAddress */ /* NetAddress */

View File

@ -23,6 +23,7 @@ type Switch struct {
pktRecvQueues map[string]chan *InboundPacket pktRecvQueues map[string]chan *InboundPacket
peers *PeerSet peers *PeerSet
quit chan struct{} quit chan struct{}
started uint32
stopped uint32 stopped uint32
} }
@ -50,11 +51,14 @@ func NewSwitch(channels []ChannelDescriptor) *Switch {
} }
func (s *Switch) Start() { func (s *Switch) Start() {
if atomic.CompareAndSwapUint32(&s.started, 0, 1) {
log.Infof("Starting switch")
}
} }
func (s *Switch) Stop() { func (s *Switch) Stop() {
log.Infof("Stopping switch")
if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
log.Infof("Stopping switch")
close(s.quit) close(s.quit)
// stop each peer. // stop each peer.
for _, peer := range s.peers.List() { for _, peer := range s.peers.List() {