diff --git a/common/cmap.go b/common/cmap.go new file mode 100644 index 00000000..384ca8c4 --- /dev/null +++ b/common/cmap.go @@ -0,0 +1,54 @@ +package common + +import "sync" + +/* +CMap is a threadsafe map +*/ +type CMap struct { + m map[string]interface{} + l sync.Mutex +} + +func NewCMap() *CMap { + return &CMap{ + m: make(map[string]interface{}, 0), + } +} + +func (cm *CMap) Set(key string, value interface{}) { + cm.l.Lock() + defer cm.l.Unlock() + cm.m[key] = value +} + +func (cm *CMap) Get(key string) interface{} { + cm.l.Lock() + defer cm.l.Unlock() + return cm.m[key] +} + +func (cm *CMap) Has(key string) bool { + cm.l.Lock() + defer cm.l.Unlock() + _, ok := cm.m[key] + return ok +} + +func (cm *CMap) Delete(key string) { + cm.l.Lock() + defer cm.l.Unlock() + delete(cm.m, key) +} + +func (cm *CMap) Size() int { + cm.l.Lock() + defer cm.l.Unlock() + return len(cm.m) +} + +func (cm *CMap) Clear() { + cm.l.Lock() + defer cm.l.Unlock() + cm.m = make(map[string]interface{}, 0) +} diff --git a/main.go b/main.go index 9136e61a..18249644 100644 --- a/main.go +++ b/main.go @@ -17,9 +17,10 @@ const ( ) type Node struct { - sw *p2p.Switch - book *p2p.AddressBook - quit chan struct{} + sw *p2p.Switch + book *p2p.AddressBook + quit chan struct{} + dialing *CMap } func NewNode() *Node { @@ -50,8 +51,10 @@ func NewNode() *Node { book := p2p.NewAddrBook(config.AppDir + "/addrbook.json") return &New{ - sw: sw, - book: book, + sw: sw, + book: book, + quit: make(chan struct{}, 0), + dialing: NewCMap(), } } @@ -90,7 +93,12 @@ func (n *Node) AddListener(l Listener) { // Ensures that sufficient peers are connected. func (n *Node) ensurePeers() { numPeers := len(n.sw.Peers()) - if numPeers < minNumPeers { + numDialing := n.dialing.Size() + numToDial = minNumPeers - (numPeers + numDialing) + if numToDial <= 0 { + return + } + for i := 0; i < numToDial; i++ { // XXX } } diff --git a/p2p/addrbook.go b/p2p/addrbook.go index d1512048..2cdb9683 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -17,28 +17,31 @@ import ( "sync" "sync/atomic" "time" - - . "github.com/tendermint/tendermint/binary" ) /* AddrBook - concurrency safe peer address manager */ type AddrBook struct { filePath string - mtx sync.Mutex - rand *rand.Rand - key [32]byte - addrIndex map[string]*knownAddress // new & old - addrNew [newBucketCount]map[string]*knownAddress - addrOld [oldBucketCount][]*knownAddress - started int32 - shutdown int32 - wg sync.WaitGroup - quit chan struct{} - nOld int - nNew int + mtx sync.Mutex + rand *rand.Rand + key [32]byte + addrLookup map[string]*knownAddress // new & old + addrNew []map[string]*knownAddress + addrOld []map[string]*knownAddress + started uint32 + stopped uint32 + wg sync.WaitGroup + quit chan struct{} + nOld int + nNew int } +const ( + bucketTypeNew = 0x01 + bucketTypeOld = 0x02 +) + const ( // addresses under which the address manager will claim to need more addresses. needAddressThreshold = 1000 @@ -65,7 +68,7 @@ const ( newBucketsPerGroup = 32 // buckets a frequently seen new address may end up in. - newBucketsPerAddress = 4 + maxNewBucketsPerAddress = 4 // days before which we assume an address has vanished // if we have not seen it announced in that long. @@ -103,33 +106,36 @@ func NewAddrBook(filePath string) *AddrBook { // When modifying this, don't forget to update loadFromFile() func (a *AddrBook) init() { - a.addrIndex = make(map[string]*knownAddress) io.ReadFull(crand.Reader, a.key[:]) + // addr -> ka index + a.addrLookup = make(map[string]*knownAddress) + // New addr buckets + a.addrNew = make([]map[string]*knownAddress, newBucketCount) for i := range a.addrNew { a.addrNew[i] = make(map[string]*knownAddress) } + // Old addr buckets + a.addrOld = make([]map[string]*knownAddress, oldBucketCount) for i := range a.addrOld { - a.addrOld[i] = make([]*knownAddress, 0, oldBucketSize) + a.addrOld[i] = make(map[string]*knownAddress) } } func (a *AddrBook) Start() { - if atomic.AddInt32(&a.started, 1) != 1 { - return + if atomic.CompareAndSwapUint32(&a.started, 0, 1) { + log.Trace("Starting address manager") + a.loadFromFile(a.filePath) + a.wg.Add(1) + go a.saveHandler() } - log.Trace("Starting address manager") - a.loadFromFile(a.filePath) - a.wg.Add(1) - go a.saveHandler() } func (a *AddrBook) Stop() { - if atomic.AddInt32(&a.shutdown, 1) != 1 { - return + if atomic.CompareAndSwapUint32(&a.stopped, 0, 1) { + log.Infof("Stopping address manager") + close(a.quit) + a.wg.Wait() } - log.Infof("Address manager shutting down") - close(a.quit) - a.wg.Wait() } func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) { @@ -153,11 +159,11 @@ func (a *AddrBook) size() int { } // Pick an address to connect to with new/old bias. -func (a *AddrBook) PickAddress(newBias int) *knownAddress { +func (a *AddrBook) PickAddress(newBias int) *NetAddress { a.mtx.Lock() defer a.mtx.Unlock() - if a.nOld == 0 && a.nNew == 0 { + if a.size() == 0 { return nil } if newBias > 100 { @@ -173,12 +179,19 @@ func (a *AddrBook) PickAddress(newBias int) *knownAddress { if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation { // pick random Old bucket. - var bucket []*knownAddress = nil + var bucket map[string]*knownAddress = nil for len(bucket) == 0 { bucket = a.addrOld[a.rand.Intn(len(a.addrOld))] } // pick a random ka from bucket. - return bucket[a.rand.Intn(len(bucket))] + randIndex := a.rand.Intn(len(bucket)) + for _, ka := range bucket { + randIndex-- + if randIndex == 0 { + return ka.Addr + } + } + panic("Should not happen") } else { // pick random New bucket. var bucket map[string]*knownAddress = nil @@ -190,7 +203,7 @@ func (a *AddrBook) PickAddress(newBias int) *knownAddress { for _, ka := range bucket { randIndex-- if randIndex == 0 { - return ka + return ka.Addr } } panic("Should not happen") @@ -201,12 +214,12 @@ func (a *AddrBook) PickAddress(newBias int) *knownAddress { func (a *AddrBook) MarkGood(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrIndex[addr.String()] + ka := a.addrLookup[addr.String()] if ka == nil { return } - ka.MarkGood() - if ka.OldBucket == -1 { + ka.markGood() + if ka.isNew() { a.moveToOld(ka) } } @@ -214,11 +227,23 @@ func (a *AddrBook) MarkGood(addr *NetAddress) { func (a *AddrBook) MarkAttempt(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrIndex[addr.String()] + ka := a.addrLookup[addr.String()] if ka == nil { return } - ka.MarkAttempt() + ka.markAttempt() +} + +func (a *AddrBook) MarkBad(addr *NetAddress) { + a.mtx.Lock() + defer a.mtx.Unlock() + ka := a.addrLookup[addr.String()] + if ka == nil { + return + } + // We currently just eject the address. + // In the future, consider blacklisting. + a.removeFromAllBuckets(ka) } /* Peer exchange */ @@ -227,13 +252,14 @@ func (a *AddrBook) MarkAttempt(addr *NetAddress) { func (a *AddrBook) GetSelection() []*NetAddress { a.mtx.Lock() defer a.mtx.Unlock() + if a.size() == 0 { return nil } allAddr := make([]*NetAddress, a.size()) i := 0 - for _, v := range a.addrIndex { + for _, v := range a.addrLookup { allAddr[i] = v.Addr i++ } @@ -258,30 +284,20 @@ func (a *AddrBook) GetSelection() []*NetAddress { /* Loading & Saving */ type addrBookJSON struct { - Key [32]byte - AddrNew [newBucketCount][]*knownAddress - AddrOld [oldBucketCount][]*knownAddress - NumOld int - NumNew int + Key [32]byte + Addrs []*knownAddress } func (a *AddrBook) saveToFile(filePath string) { - // turn a.addrNew into an array like a.addrOld - __addrNew := [newBucketCount][]*knownAddress{} - for i, newBucket := range a.addrNew { - var array []*knownAddress = make([]*knownAddress, 0) - for _, ka := range newBucket { - array = append(array, ka) - } - __addrNew[i] = array + // Compile Addrs + addrs := []*knownAddress{} + for _, ka := range a.addrLookup { + addrs = append(addrs, ka) } aJSON := &addrBookJSON{ - Key: a.key, - AddrNew: __addrNew, - AddrOld: a.addrOld, - NumOld: a.nOld, - NumNew: a.nNew, + Key: a.key, + Addrs: addrs, } w, err := os.Create(filePath) @@ -305,13 +321,11 @@ func (a *AddrBook) loadFromFile(filePath string) { } // Load addrBookJSON{} - r, err := os.Open(filePath) if err != nil { panic(fmt.Errorf("%s error opening file: %v", filePath, err)) } defer r.Close() - aJSON := &addrBookJSON{} dec := json.NewDecoder(r) err = dec.Decode(aJSON) @@ -319,27 +333,22 @@ func (a *AddrBook) loadFromFile(filePath string) { panic(fmt.Errorf("error reading %s: %v", filePath, err)) } - // Now we need to restore the fields - + // Restore all the fields... // Restore the key copy(a.key[:], aJSON.Key[:]) - // Restore .addrNew - for i, newBucket := range aJSON.AddrNew { - for _, ka := range newBucket { - a.addrNew[i][ka.Addr.String()] = ka - a.addrIndex[ka.Addr.String()] = ka + // Restore .addrNew & .addrOld + for _, ka := range aJSON.Addrs { + for _, bucketIndex := range ka.Buckets { + bucket := a.getBucket(ka.BucketType, bucketIndex) + bucket[ka.Addr.String()] = ka + } + a.addrLookup[ka.Addr.String()] = ka + if ka.BucketType == bucketTypeNew { + a.nNew++ + } else { + a.nOld++ } } - // Restore .addrOld - for i, oldBucket := range aJSON.AddrOld { - copy(a.addrOld[i], oldBucket) - for _, ka := range oldBucket { - a.addrIndex[ka.Addr.String()] = ka - } - } - // Restore simple fields - a.nNew = aJSON.NumNew - a.nOld = aJSON.NumOld } /* Private methods */ @@ -361,160 +370,220 @@ out: log.Trace("Address handler done") } -func (a *AddrBook) addAddress(addr, src *NetAddress) { - if !addr.Routable() { - return +func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress { + switch bucketType { + case bucketTypeNew: + return a.addrNew[bucketIdx] + case bucketTypeOld: + return a.addrOld[bucketIdx] + default: + panic("Should not happen") + } +} + +// Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full. +// NOTE: currently it always returns true. +func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { + // Sanity check + if ka.isOld() { + panic("Cannot add address already in old bucket to a new bucket") } - key := addr.String() - ka := a.addrIndex[key] + key := ka.Addr.String() + bucket := a.getBucket(bucketTypeNew, bucketIdx) + + // Already exists? + if _, ok := bucket[key]; ok { + return true + } + + // Enforce max addresses. + if len(bucket) > newBucketSize { + log.Tracef("new bucket is full, expiring old ") + a.expireNew(bucketIdx) + } + + // Add to bucket. + bucket[key] = ka + if ka.addBucketRef(bucketIdx) == 1 { + a.nNew++ + } + + // Ensure in addrLookup + a.addrLookup[key] = ka + + return true +} + +// Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full. +func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool { + // Sanity check + if ka.isNew() { + panic("Cannot add new address to old bucket") + } + if len(ka.Buckets) != 0 { + panic("Cannot add already old address to another old bucket") + } + + key := ka.Addr.String() + bucket := a.getBucket(bucketTypeNew, bucketIdx) + + // Already exists? + if _, ok := bucket[key]; ok { + return true + } + + // Enforce max addresses. + if len(bucket) > oldBucketSize { + return false + } + + // Add to bucket. + bucket[key] = ka + if ka.addBucketRef(bucketIdx) == 1 { + a.nOld++ + } + + // Ensure in addrLookup + a.addrLookup[key] = ka + + return true +} + +func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) { + if ka.BucketType != bucketType { + panic("Bucket type mismatch") + } + bucket := a.getBucket(bucketType, bucketIdx) + delete(bucket, ka.Addr.String()) + if ka.removeBucketRef(bucketIdx) == 0 { + if bucketType == bucketTypeNew { + a.nNew-- + } else { + a.nOld-- + } + delete(a.addrLookup, ka.Addr.String()) + } +} + +func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) { + for _, bucketIdx := range ka.Buckets { + bucket := a.getBucket(ka.BucketType, bucketIdx) + delete(bucket, ka.Addr.String()) + } + ka.Buckets = nil + if ka.BucketType == bucketTypeNew { + a.nNew-- + } else { + a.nOld-- + } + delete(a.addrLookup, ka.Addr.String()) +} + +func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { + bucket := a.getBucket(bucketType, bucketIdx) + var oldest *knownAddress + for _, ka := range bucket { + if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) { + oldest = ka + } + } + return oldest +} + +func (a *AddrBook) addAddress(addr, src *NetAddress) { + if !addr.Routable() { + panic("Cannot add non-routable address") + } + + ka := a.addrLookup[addr.String()] if ka != nil { - // Already added - if ka.OldBucket != -1 { + // Already old. + if ka.isOld() { return } - if ka.NewRefs == newBucketsPerAddress { + // Already in max new buckets. + if len(ka.Buckets) == maxNewBucketsPerAddress { return } - // The more entries we have, the less likely we are to add more. - factor := int32(2 * ka.NewRefs) + factor := int32(2 * len(ka.Buckets)) if a.rand.Int31n(factor) != 0 { return } } else { ka = newKnownAddress(addr, src) - a.addrIndex[key] = ka - a.nNew++ } - bucket := a.getNewBucket(addr, src) + bucket := a.calcNewBucket(addr, src) + a.addToNewBucket(ka, bucket) - // Already exists? - if _, ok := a.addrNew[bucket][key]; ok { - return - } - - // Enforce max addresses. - if len(a.addrNew[bucket]) > newBucketSize { - log.Tracef("new bucket is full, expiring old ") - a.expireNew(bucket) - } - - // Add to new bucket. - ka.NewRefs++ - a.addrNew[bucket][key] = ka - - log.Tracef("Added new address %s for a total of %d addresses", addr, a.nOld+a.nNew) + log.Tracef("Added new address %s for a total of %d addresses", addr, a.size()) } // Make space in the new buckets by expiring the really bad entries. -// If no bad entries are available we look at a few and remove the oldest. -func (a *AddrBook) expireNew(bucket int) { - var oldest *knownAddress - for k, v := range a.addrNew[bucket] { +// If no bad entries are available we remove the oldest. +func (a *AddrBook) expireNew(bucketIdx int) { + for key, ka := range a.addrNew[bucketIdx] { // If an entry is bad, throw it away - if v.IsBad() { - log.Tracef("expiring bad address %v", k) - delete(a.addrNew[bucket], k) - v.NewRefs-- - if v.NewRefs == 0 { - a.nNew-- - delete(a.addrIndex, k) - } + if ka.IsBad() { + log.Tracef("expiring bad address %v", key) + a.removeFromBucket(ka, bucketTypeNew, bucketIdx) return } - // or, keep track of the oldest entry - if oldest == nil { - oldest = v - } else if v.LastAttempt.Before(oldest.LastAttempt.Time) { - oldest = v - } } // If we haven't thrown out a bad entry, throw out the oldest entry - if oldest != nil { - key := oldest.Addr.String() - log.Tracef("expiring oldest address %v", key) - delete(a.addrNew[bucket], key) - oldest.NewRefs-- - if oldest.NewRefs == 0 { - a.nNew-- - delete(a.addrIndex, key) - } - } + oldest := a.pickOldest(bucketTypeNew, bucketIdx) + a.removeFromBucket(oldest, bucketTypeNew, bucketIdx) } +// Promotes an address from new to old. func (a *AddrBook) moveToOld(ka *knownAddress) { - // Remove from all new buckets. - // Remember one of those new buckets. - addrKey := ka.Addr.String() - freedBucket := -1 - for i := range a.addrNew { - // we check for existance so we can record the first one - if _, ok := a.addrNew[i][addrKey]; ok { - delete(a.addrNew[i], addrKey) - ka.NewRefs-- - if freedBucket == -1 { - freedBucket = i + // Sanity check + if ka.isOld() { + panic("Cannot promote address that is already old") + } + if len(ka.Buckets) == 0 { + panic("Cannot promote address that isn't in any new buckets") + } + + // Remember one of the buckets in which ka is in. + freedBucket := ka.Buckets[0] + // Remove from all (new) buckets. + a.removeFromAllBuckets(ka) + // It's officially old now. + ka.BucketType = bucketTypeOld + + // Try to add it to its oldBucket destination. + oldBucketIdx := a.calcOldBucket(ka.Addr) + added := a.addToOldBucket(ka, oldBucketIdx) + if !added { + // No room, must evict something + oldest := a.pickOldest(bucketTypeOld, oldBucketIdx) + a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx) + // Find new bucket to put oldest in + newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src) + added := a.addToNewBucket(oldest, newBucketIdx) + // No space in newBucket either, just put it in freedBucket from above. + if !added { + added := a.addToNewBucket(oldest, freedBucket) + if !added { + log.Warnf("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket) } } - } - a.nNew-- - if freedBucket == -1 { - panic("Expected to find addr in at least one new bucket") - } - - oldBucket := a.getOldBucket(ka.Addr) - - // If room in oldBucket, put it in. - if len(a.addrOld[oldBucket]) < oldBucketSize { - ka.OldBucket = Int16(oldBucket) - a.addrOld[oldBucket] = append(a.addrOld[oldBucket], ka) - a.nOld++ - return - } - - // No room, we have to evict something else. - rmkaIndex := a.pickOld(oldBucket) - rmka := a.addrOld[oldBucket][rmkaIndex] - - // Find a new bucket to put rmka in. - newBucket := a.getNewBucket(rmka.Addr, rmka.Src) - if len(a.addrNew[newBucket]) >= newBucketSize { - newBucket = freedBucket - } - - // Replace with ka in list. - ka.OldBucket = Int16(oldBucket) - a.addrOld[oldBucket][rmkaIndex] = ka - rmka.OldBucket = -1 - - // Put rmka into new bucket - rmkey := rmka.Addr.String() - log.Tracef("Replacing %s with %s in old", rmkey, addrKey) - a.addrNew[newBucket][rmkey] = rmka - rmka.NewRefs++ - a.nNew++ -} - -// Returns the index in old bucket of oldest entry. -func (a *AddrBook) pickOld(bucket int) int { - var oldest *knownAddress - var oldestIndex int - for i, ka := range a.addrOld[bucket] { - if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt.Time) { - oldest = ka - oldestIndex = i + // Finally, add to bucket again. + added = a.addToOldBucket(ka, oldBucketIdx) + if !added { + log.Warnf("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx) } } - return oldestIndex } // doublesha256(key + sourcegroup + // int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckes -func (a *AddrBook) getNewBucket(addr, src *NetAddress) int { +func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int { data1 := []byte{} data1 = append(data1, a.key[:]...) data1 = append(data1, []byte(groupKey(addr))...) @@ -534,7 +603,7 @@ func (a *AddrBook) getNewBucket(addr, src *NetAddress) int { } // doublesha256(key + group + truncate_to_64bits(doublesha256(key + addr))%buckets_per_group) % num_buckets -func (a *AddrBook) getOldBucket(addr *NetAddress) int { +func (a *AddrBook) calcOldBucket(addr *NetAddress) int { data1 := []byte{} data1 = append(data1, a.key[:]...) data1 = append(data1, []byte(addr.String())...) @@ -601,6 +670,8 @@ func groupKey(na *NetAddress) string { return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String() } +//----------------------------------------------------------------------------- + /* knownAddress @@ -610,59 +681,69 @@ func groupKey(na *NetAddress) string { type knownAddress struct { Addr *NetAddress Src *NetAddress - Attempts UInt32 - LastAttempt Time - LastSuccess Time - NewRefs UInt16 - OldBucket Int16 + Attempts uint32 + LastAttempt time.Time + LastSuccess time.Time + BucketType byte + Buckets []int } func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress { return &knownAddress{ Addr: addr, Src: src, - OldBucket: -1, - LastAttempt: Time{time.Now()}, Attempts: 0, + LastAttempt: time.Now(), + BucketType: bucketTypeNew, + Buckets: nil, } } -func readKnownAddress(r io.Reader) *knownAddress { - return &knownAddress{ - Addr: ReadNetAddress(r), - Src: ReadNetAddress(r), - Attempts: ReadUInt32(r), - LastAttempt: ReadTime(r), - LastSuccess: ReadTime(r), - NewRefs: ReadUInt16(r), - OldBucket: ReadInt16(r), - } +func (ka *knownAddress) isOld() bool { + return ka.BucketType == bucketTypeOld } -func (ka *knownAddress) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(ka.Addr, w, n, err) - n, err = WriteOnto(ka.Src, w, n, err) - n, err = WriteOnto(ka.Attempts, w, n, err) - n, err = WriteOnto(ka.LastAttempt, w, n, err) - n, err = WriteOnto(ka.LastSuccess, w, n, err) - n, err = WriteOnto(ka.NewRefs, w, n, err) - n, err = WriteOnto(ka.OldBucket, w, n, err) - return +func (ka *knownAddress) isNew() bool { + return ka.BucketType == bucketTypeNew } -func (ka *knownAddress) MarkAttempt() { - now := Time{time.Now()} +func (ka *knownAddress) markAttempt() { + now := time.Now() ka.LastAttempt = now ka.Attempts += 1 } -func (ka *knownAddress) MarkGood() { - now := Time{time.Now()} +func (ka *knownAddress) markGood() { + now := time.Now() ka.LastAttempt = now ka.Attempts = 0 ka.LastSuccess = now } +func (ka *knownAddress) addBucketRef(bucketIdx int) int { + for _, bucket := range ka.Buckets { + if bucket == bucketIdx { + panic("Bucket already exists in ka.Buckets") + } + } + ka.Buckets = append(ka.Buckets, bucketIdx) + return len(ka.Buckets) +} + +func (ka *knownAddress) removeBucketRef(bucketIdx int) int { + buckets := []int{} + for _, bucket := range ka.Buckets { + if bucket != bucketIdx { + buckets = append(buckets, bucket) + } + } + if len(buckets) != len(ka.Buckets)-1 { + panic("bucketIdx not found in ka.Buckets") + } + ka.Buckets = buckets + return len(ka.Buckets) +} + /* An address is bad if the address in question has not been tried in the last minute and meets one of the following criteria: diff --git a/p2p/addrbook_test.go b/p2p/addrbook_test.go index d60d2f24..50986452 100644 --- a/p2p/addrbook_test.go +++ b/p2p/addrbook_test.go @@ -96,7 +96,7 @@ func TestSaveAddresses(t *testing.T) { for _, addrSrc := range randAddrs { addr := addrSrc.addr src := addrSrc.src - ka := book.addrIndex[addr.String()] + ka := book.addrLookup[addr.String()] if ka == nil { t.Fatalf("Expected to find KnownAddress %v but wasn't there.", addr) } diff --git a/p2p/connection.go b/p2p/connection.go index f6944cdb..021bfd83 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -65,8 +65,8 @@ func NewConnection(conn net.Conn) *Connection { // .Start() begins multiplexing packets to and from "channels". // If an error occurs, the recovered reason is passed to "onError". func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) { - log.Debugf("Starting %v", c) if atomic.CompareAndSwapUint32(&c.started, 0, 1) { + log.Debugf("Starting %v", c) c.channels = channels c.onError = onError go c.sendHandler() diff --git a/p2p/netaddress.go b/p2p/netaddress.go index 3c76ef61..02570ccd 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -5,11 +5,13 @@ package p2p import ( - . "github.com/tendermint/tendermint/binary" - . "github.com/tendermint/tendermint/common" "io" "net" "strconv" + "time" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" ) /* NetAddress */ diff --git a/p2p/switch.go b/p2p/switch.go index e0bd5837..14af7833 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -23,6 +23,7 @@ type Switch struct { pktRecvQueues map[string]chan *InboundPacket peers *PeerSet quit chan struct{} + started uint32 stopped uint32 } @@ -50,11 +51,14 @@ func NewSwitch(channels []ChannelDescriptor) *Switch { } func (s *Switch) Start() { + if atomic.CompareAndSwapUint32(&s.started, 0, 1) { + log.Infof("Starting switch") + } } func (s *Switch) Stop() { - log.Infof("Stopping switch") if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { + log.Infof("Stopping switch") close(s.quit) // stop each peer. for _, peer := range s.peers.List() {