From 0b1265dc105f426c792c64cbfb80c1aead197cbb Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 10 Jul 2014 22:14:23 -0700 Subject: [PATCH] . --- README.md | 5 +- common/math.go | 115 +++++++++++++++++++++++++++++++++++++++++++++ main.go | 66 +++++++++++++++++++++----- p2p/addrbook.go | 14 +++--- p2p/peer.go | 8 ++-- p2p/peer_set.go | 23 +++++++++ p2p/switch.go | 22 ++++++--- p2p/switch_test.go | 8 ++-- 8 files changed, 226 insertions(+), 35 deletions(-) create mode 100644 common/math.go diff --git a/README.md b/README.md index cf614c1e..747dbeb3 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,8 @@ TenderMint - proof of concept ### Status -* Implement basic peer exchange -* Implemented the basics of peer/* +* Node & testnet *now* +* PEX peer exchange *complete* +* p2p/* *complete* * Ed25519 bindings *complete* * merkle/* *complete* diff --git a/common/math.go b/common/math.go new file mode 100644 index 00000000..cc9c31ad --- /dev/null +++ b/common/math.go @@ -0,0 +1,115 @@ +package common + +func MaxInt8(a, b int8) int8 { + if a > b { + return a + } + return b +} + +func MaxUint8(a, b uint8) uint8 { + if a > b { + return a + } + return b +} + +func MaxInt16(a, b int16) int16 { + if a > b { + return a + } + return b +} + +func MaxUint16(a, b uint16) uint16 { + if a > b { + return a + } + return b +} + +func MaxInt32(a, b int32) int32 { + if a > b { + return a + } + return b +} + +func MaxUint32(a, b uint32) uint32 { + if a > b { + return a + } + return b +} + +func MaxInt(a, b int) int { + if a > b { + return a + } + return b +} + +func MaxUint(a, b uint) uint { + if a > b { + return a + } + return b +} + +//----------------------------------------------------------------------------- + +func MinInt8(a, b int8) int8 { + if a < b { + return a + } + return b +} + +func MinUint8(a, b uint8) uint8 { + if a < b { + return a + } + return b +} + +func MinInt16(a, b int16) int16 { + if a < b { + return a + } + return b +} + +func MinUint16(a, b uint16) uint16 { + if a < b { + return a + } + return b +} + +func MinInt32(a, b int32) int32 { + if a < b { + return a + } + return b +} + +func MinUint32(a, b uint32) uint32 { + if a < b { + return a + } + return b +} + +func MinInt(a, b int) int { + if a < b { + return a + } + return b +} + +func MinUint(a, b uint) uint { + if a < b { + return a + } + return b +} diff --git a/main.go b/main.go index 18249644..f0886862 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,9 @@ package main import ( "os" "os/signal" + "time" + . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" ) @@ -18,7 +20,7 @@ const ( type Node struct { sw *p2p.Switch - book *p2p.AddressBook + book *p2p.AddrBook quit chan struct{} dialing *CMap } @@ -50,7 +52,7 @@ func NewNode() *Node { sw := p2p.NewSwitch(chDescs) book := p2p.NewAddrBook(config.AppDir + "/addrbook.json") - return &New{ + return &Node{ sw: sw, book: book, quit: make(chan struct{}, 0), @@ -59,20 +61,21 @@ func NewNode() *Node { } func (n *Node) Start() { + log.Infof("Starting node") n.sw.Start() n.book.Start() - go p2p.PexHandler(sw, book) - go n.ensurePeersHandler(sw, book) + go p2p.PexHandler(n.sw, n.book) + go n.ensurePeersHandler() } -func (n *Node) initPeer(peer *Peer) { - if peer.IsOutgoing() { +func (n *Node) initPeer(peer *p2p.Peer) { + if peer.IsOutbound() { // TODO: initiate PEX } } // Add a Listener to accept incoming peer connections. -func (n *Node) AddListener(l Listener) { +func (n *Node) AddListener(l p2p.Listener) { go func() { for { inConn, ok := <-l.Connections() @@ -92,18 +95,56 @@ func (n *Node) AddListener(l Listener) { // Ensures that sufficient peers are connected. func (n *Node) ensurePeers() { - numPeers := len(n.sw.Peers()) + numPeers := n.sw.NumOutboundPeers() numDialing := n.dialing.Size() - numToDial = minNumPeers - (numPeers + numDialing) + numToDial := minNumPeers - (numPeers + numDialing) if numToDial <= 0 { return } for i := 0; i < numToDial; i++ { - // XXX + newBias := MinInt(numPeers, 8)*10 + 10 + var picked *p2p.NetAddress + // Try to fetch a new peer 3 times. + // This caps the maximum number of tries to 3 * numToDial. + for j := 0; i < 3; j++ { + picked = n.book.PickAddress(newBias) + if picked == nil { + log.Infof("Empty addrbook.") + return + } + if n.sw.Peers().Has(picked) { + continue + } else { + break + } + } + if picked == nil { + continue + } + n.dialing.Set(picked.String(), picked) + n.book.MarkAttempt(picked) + go func() { + log.Infof("Dialing addr: %v", picked) + conn, err := picked.DialTimeout(peerDialTimeoutSeconds * time.Second) + n.dialing.Delete(picked.String()) + if err != nil { + // ignore error. + return + } + peer, err := n.sw.AddPeerWithConnection(conn, true) + if err != nil { + log.Warnf("Error trying to add new outbound peer connection:%v", err) + return + } + n.initPeer(peer) + }() } } func (n *Node) ensurePeersHandler() { + // fire once immediately. + n.ensurePeers() + // fire periodically timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second) FOR_LOOP: for { @@ -131,7 +172,8 @@ func main() { n := NewNode() l := p2p.NewDefaultListener("tcp", ":8001") - n.AddListener() + n.AddListener(l) + n.Start() if false { // TODO remove @@ -141,7 +183,7 @@ func main() { log.Infof("Error connecting to it: %v", err) return } - peer, err := sw.AddPeerWithConnection(conn, true) + peer, err := n.sw.AddPeerWithConnection(conn, true) if err != nil { log.Infof("Error adding peer with connection: %v", err) return diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 2cdb9683..06642753 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -123,7 +123,7 @@ func (a *AddrBook) init() { func (a *AddrBook) Start() { if atomic.CompareAndSwapUint32(&a.started, 0, 1) { - log.Trace("Starting address manager") + log.Infof("Starting address manager") a.loadFromFile(a.filePath) a.wg.Add(1) go a.saveHandler() @@ -367,7 +367,7 @@ out: dumpAddressTicker.Stop() a.saveToFile(a.filePath) a.wg.Done() - log.Trace("Address handler done") + log.Info("Address handler done") } func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress { @@ -399,7 +399,7 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { // Enforce max addresses. if len(bucket) > newBucketSize { - log.Tracef("new bucket is full, expiring old ") + log.Infof("new bucket is full, expiring old ") a.expireNew(bucketIdx) } @@ -519,7 +519,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { bucket := a.calcNewBucket(addr, src) a.addToNewBucket(ka, bucket) - log.Tracef("Added new address %s for a total of %d addresses", addr, a.size()) + log.Infof("Added new address %s for a total of %d addresses", addr, a.size()) } // Make space in the new buckets by expiring the really bad entries. @@ -527,8 +527,8 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { func (a *AddrBook) expireNew(bucketIdx int) { for key, ka := range a.addrNew[bucketIdx] { // If an entry is bad, throw it away - if ka.IsBad() { - log.Tracef("expiring bad address %v", key) + if ka.isBad() { + log.Infof("expiring bad address %v", key) a.removeFromBucket(ka, bucketTypeNew, bucketIdx) return } @@ -756,7 +756,7 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int { All addresses that meet these criteria are assumed to be worthless and not worth keeping hold of. */ -func (ka *knownAddress) IsBad() bool { +func (ka *knownAddress) isBad() bool { // Has been attempted in the last minute --> good if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) { return false diff --git a/p2p/peer.go b/p2p/peer.go index 039662f3..2b7395e3 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -12,7 +12,7 @@ import ( /* Peer */ type Peer struct { - outgoing bool + outbound bool conn *Connection channels map[string]*Channel quit chan struct{} @@ -55,8 +55,8 @@ func (p *Peer) stop() { } } -func (p *Peer) IsOutgoing() bool { - return p.outgoing +func (p *Peer) IsOutbound() bool { + return p.outbound } func (p *Peer) LocalAddress() *NetAddress { @@ -96,7 +96,7 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { } func (p *Peer) String() string { - return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing) + return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outbound) } // sendHandler pulls from a channel and pushes to the connection. diff --git a/p2p/peer_set.go b/p2p/peer_set.go index ade9abca..e8905ce0 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -4,6 +4,16 @@ import ( "sync" ) +/* +ReadOnlyPeerSet has a subset of the methods of PeerSet. +*/ +type ReadOnlyPeerSet interface { + Has(addr *NetAddress) bool + List() []*Peer +} + +//----------------------------------------------------------------------------- + /* PeerSet is a special structure for keeping a table of peers. Iteration over the peers is super fast and thread-safe. @@ -41,6 +51,13 @@ func (ps *PeerSet) Add(peer *Peer) bool { return true } +func (ps *PeerSet) Has(addr *NetAddress) bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + _, ok := ps.lookup[addr.String()] + return ok +} + func (ps *PeerSet) Remove(peer *Peer) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -69,6 +86,12 @@ func (ps *PeerSet) Remove(peer *Peer) { delete(ps.lookup, addr) } +func (ps *PeerSet) Size() int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return len(ps.list) +} + // threadsafe list of peers. func (ps *PeerSet) List() []*Peer { ps.mtx.Lock() diff --git a/p2p/switch.go b/p2p/switch.go index 14af7833..7e49a933 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -12,7 +12,7 @@ All communication amongst peers are multiplexed by "channels". (Not the same as Go "channels") To send a message, encapsulate it into a "Packet" and send it to each peer. -You can find all connected and active peers by iterating over ".Peers()". +You can find all connected and active peers by iterating over ".Peers().List()". ".Broadcast()" is provided for convenience, but by iterating over the peers manually the caller can decide which subset receives a message. @@ -69,19 +69,19 @@ func (s *Switch) Stop() { } } -func (s *Switch) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) { +func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, error) { if atomic.LoadUint32(&s.stopped) == 1 { return nil, ErrSwitchStopped } - log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing) + log.Infof("Adding peer with connection: %v, outbound: %v", conn, outbound) // Create channels for peer channels := map[string]*Channel{} for _, chDesc := range s.channels { channels[chDesc.Name] = newChannel(chDesc) } peer := newPeer(conn, channels) - peer.outgoing = outgoing + peer.outbound = outbound err := s.addPeer(peer) if err != nil { return nil, err @@ -133,8 +133,18 @@ func (s *Switch) Receive(chName string) *InboundPacket { } } -func (s *Switch) Peers() []*Peer { - return s.peers.List() +func (s *Switch) NumOutboundPeers() (count int) { + peers := s.peers.List() + for _, peer := range peers { + if peer.outbound { + count++ + } + } + return +} + +func (s *Switch) Peers() ReadOnlyPeerSet { + return s.peers } // Disconnect from a peer due to external error. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index fc50c84e..57933dde 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -59,11 +59,11 @@ func TestSwitches(t *testing.T) { defer s2.Stop() // Lets send a message from s1 to s2. - if len(s1.Peers()) != 1 { - t.Errorf("Expected exactly 1 peer in s1, got %v", len(s1.Peers())) + if s1.Peers().Size() != 1 { + t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size()) } - if len(s2.Peers()) != 1 { - t.Errorf("Expected exactly 1 peer in s2, got %v", len(s2.Peers())) + if s2.Peers().Size() != 1 { + t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size()) } // Broadcast a message on ch1