tendermint/peer/client.go

172 lines
4.2 KiB
Go
Raw Normal View History

2014-06-18 20:48:32 -07:00
package peer
import (
. "github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/merkle"
2014-06-24 17:28:40 -07:00
"atomic"
2014-06-18 20:48:32 -07:00
"sync"
"io"
2014-06-24 17:28:40 -07:00
"errors"
2014-06-18 20:48:32 -07:00
)
2014-06-24 17:28:40 -07:00
/* Client
A client is half of a p2p system.
It can reach out to the network and establish connections with servers.
A client doesn't listen for incoming connections -- that's done by the server.
newPeerCb is a factory method for generating new peers from new *Connections.
newPeerCb(nil) must return a prototypical peer that represents the self "peer".
XXX what about peer disconnects?
*/
2014-06-18 20:48:32 -07:00
type Client struct {
addrBook AddrBook
targetNumPeers int
2014-06-24 17:28:40 -07:00
newPeerCb func(*Connection) *Peer
self *Peer
inQueues map[String]chan *InboundMsg
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
mtx sync.Mutex
2014-06-18 20:48:32 -07:00
peers merkle.Tree // addr -> *Peer
2014-06-24 17:28:40 -07:00
quit chan struct{}
stopped uint32
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
var (
CLIENT_STOPPED_ERROR = errors.New("Client already stopped")
CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer")
)
func NewClient(newPeerCb func(*Connect) *Peer) *Client {
self := newPeerCb(nil)
if self == nil {
Panicf("newPeerCb(nil) must return a prototypical peer for self")
}
inQueues := make(map[String]chan *InboundMsg)
for chName, channel := peer.channels {
inQueues[chName] = make(chan *InboundMsg)
}
2014-06-18 20:48:32 -07:00
c := &Client{
2014-06-24 17:28:40 -07:00
newPeerCb: newPeerCb,
2014-06-18 20:48:32 -07:00
peers: merkle.NewIAVLTree(nil),
2014-06-24 17:28:40 -07:00
self: self,
inQueues: inQueues,
2014-06-18 20:48:32 -07:00
}
return c
}
func (c *Client) Stop() {
2014-06-24 17:28:40 -07:00
// lock
c.mtx.Lock()
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
close(c.quit)
// stop each peer.
for peerValue := range c.peers.Values() {
peer := peerValue.(*Peer)
peer.Stop()
}
// empty tree.
c.peers = merkle.NewIAVLTree(nil)
}
c.mtx.Unlock()
// unlock
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) {
if atomic.LoadUint32(&c.stopped) == 1 { return nil, CLIENT_STOPPED_ERROR }
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
peer := c.newPeerCb(conn)
peer.outgoing = outgoing
err := c.addPeer(peer)
2014-06-18 20:48:32 -07:00
if err != nil { return nil, err }
2014-06-24 17:28:40 -07:00
go peer.Start(c.inQueues)
2014-06-18 20:48:32 -07:00
return peer, nil
}
2014-06-24 17:28:40 -07:00
func (c *Client) Broadcast(chName String, msg Msg) {
if atomic.LoadUint32(&c.stopped) == 1 { return }
2014-06-18 20:48:32 -07:00
for v := range c.peersCopy().Values() {
2014-06-24 17:28:40 -07:00
peer := v.(*Peer)
success := peer.TryQueueOut(chName , msg)
if !success {
// TODO: notify the peer
}
2014-06-18 20:48:32 -07:00
}
}
2014-06-24 17:28:40 -07:00
func (c *Client) PopMessage(chName String) *InboundMsg {
if atomic.LoadUint32(&c.stopped) == 1 { return nil }
channel := c.Channel(chName)
q := c.inQueues[chName]
if q == nil { Panicf("Expected inQueues[%f], found none", chName) }
for {
select {
case <-quit:
return nil
case msg := <-q:
// skip if known.
if channel.Filter().Has(msg) {
continue
}
return msg
}
2014-06-18 20:48:32 -07:00
}
}
2014-06-24 17:28:40 -07:00
// Updates self's filter for a channel & broadcasts it.
// TODO: maybe don't expose this
func (c *Client) UpdateFilter(chName String, filter Filter) {
if atomic.LoadUint32(&c.stopped) == 1 { return }
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
c.self.Channel(chName).UpdateFilter(filter)
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
c.Broadcast("", &NewFilterMsg{
Channel: chName,
Filter: filter,
})
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (c *Client) StopPeer(peer *Peer) {
// lock
c.mtx.Lock()
p, _ := c.peers.Remove(peer.RemoteAddress())
c.mtx.Unlock()
// unlock
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
if p != nil {
p.Stop()
}
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (c *Client) addPeer(peer *Peer) error {
addr := peer.RemoteAddress()
2014-06-18 20:48:32 -07:00
2014-06-24 17:28:40 -07:00
// lock & defer
c.mtx.Lock(); defer c.mtx.Unlock()
if c.stopped == 1 { return CLIENT_STOPPED_ERROR }
if !c.peers.Has(addr) {
c.peers.Put(addr, peer)
return nil
} else {
// ignore duplicate peer for addr.
log.Infof("Ignoring duplicate peer for addr %v", addr)
return CLIENT_DUPLICATE_PEER_ERROR
}
// unlock deferred
2014-06-18 20:48:32 -07:00
}
2014-06-24 17:28:40 -07:00
func (c *Client) peersCopy() merkle.Tree {
// lock & defer
c.mtx.Lock(); defer c.mtx.Unlock()
return c.peers.Copy()
// unlock deferred
2014-06-18 20:48:32 -07:00
}