first client/server connection test passes.

This commit is contained in:
Jae Kwon
2014-06-30 16:53:04 -07:00
parent e52166017d
commit a714d12085
9 changed files with 217 additions and 209 deletions

View File

@ -33,18 +33,17 @@ func (self ByteSlice) WriteTo(w io.Writer) (n int64, err error) {
return int64(n_+4), err return int64(n_+4), err
} }
func ReadByteSlice(r io.Reader) ByteSlice { func ReadByteSliceSafe(r io.Reader) (ByteSlice, error) {
length := int(ReadUInt32(r)) length, err := ReadUInt32Safe(r)
bytes := make([]byte, length) if err != nil { return nil, err }
_, err := io.ReadFull(r, bytes) bytes := make([]byte, int(length))
if err != nil { panic(err) } _, err = io.ReadFull(r, bytes)
return ByteSlice(bytes) if err != nil { return nil, err }
return bytes, nil
} }
func ReadByteSliceSafe(r io.Reader) (ByteSlice, error) { func ReadByteSlice(r io.Reader) ByteSlice {
length := int(ReadUInt32(r)) bytes, err := ReadByteSliceSafe(r)
bytes := make([]byte, length) if r != nil { panic(err) }
_, err := io.ReadFull(r, bytes) return bytes
if err != nil { return nil, err }
return ByteSlice(bytes), nil
} }

View File

@ -41,20 +41,19 @@ func (self Byte) WriteTo(w io.Writer) (int64, error) {
return int64(n), err return int64(n), err
} }
func ReadByte(r io.Reader) Byte {
buf := [1]byte{0}
_, err := io.ReadFull(r, buf[:])
if err != nil { panic(err) }
return Byte(buf[0])
}
func ReadByteSafe(r io.Reader) (Byte, error) { func ReadByteSafe(r io.Reader) (Byte, error) {
buf := [1]byte{0} buf := [1]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
if err != nil { return Byte(0), err } if err != nil { return 0, err }
return Byte(buf[0]), nil return Byte(buf[0]), nil
} }
func ReadByte(r io.Reader) (Byte) {
b, err := ReadByteSafe(r)
if err != nil { panic(err) }
return b
}
// Int8 // Int8
@ -79,11 +78,17 @@ func (self Int8) WriteTo(w io.Writer) (int64, error) {
return int64(n), err return int64(n), err
} }
func ReadInt8(r io.Reader) Int8 { func ReadInt8Safe(r io.Reader) (Int8, error) {
buf := [1]byte{0} buf := [1]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
if err != nil { return Int8(0), err }
return Int8(buf[0]), nil
}
func ReadInt8(r io.Reader) (Int8) {
b, err := ReadInt8Safe(r)
if err != nil { panic(err) } if err != nil { panic(err) }
return Int8(buf[0]) return b
} }
@ -110,13 +115,6 @@ func (self UInt8) WriteTo(w io.Writer) (int64, error) {
return int64(n), err return int64(n), err
} }
func ReadUInt8(r io.Reader) UInt8 {
buf := [1]byte{0}
_, err := io.ReadFull(r, buf[:])
if err != nil { panic(err) }
return UInt8(buf[0])
}
func ReadUInt8Safe(r io.Reader) (UInt8, error) { func ReadUInt8Safe(r io.Reader) (UInt8, error) {
buf := [1]byte{0} buf := [1]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
@ -124,6 +122,12 @@ func ReadUInt8Safe(r io.Reader) (UInt8, error) {
return UInt8(buf[0]), nil return UInt8(buf[0]), nil
} }
func ReadUInt8(r io.Reader) (UInt8) {
b, err := ReadUInt8Safe(r)
if err != nil { panic(err) }
return b
}
// Int16 // Int16
@ -148,11 +152,17 @@ func (self Int16) WriteTo(w io.Writer) (int64, error) {
return 2, err return 2, err
} }
func ReadInt16(r io.Reader) Int16 { func ReadInt16Safe(r io.Reader) (Int16, error) {
buf := [2]byte{0} buf := [2]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
if err != nil { return Int16(0), err }
return Int16(binary.LittleEndian.Uint16(buf[:])), nil
}
func ReadInt16(r io.Reader) (Int16) {
b, err := ReadInt16Safe(r)
if err != nil { panic(err) } if err != nil { panic(err) }
return Int16(binary.LittleEndian.Uint16(buf[:])) return b
} }
@ -179,11 +189,17 @@ func (self UInt16) WriteTo(w io.Writer) (int64, error) {
return 2, err return 2, err
} }
func ReadUInt16(r io.Reader) UInt16 { func ReadUInt16Safe(r io.Reader) (UInt16, error) {
buf := [2]byte{0} buf := [2]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
if err != nil { return UInt16(0), err }
return UInt16(binary.LittleEndian.Uint16(buf[:])), nil
}
func ReadUInt16(r io.Reader) (UInt16) {
b, err := ReadUInt16Safe(r)
if err != nil { panic(err) } if err != nil { panic(err) }
return UInt16(binary.LittleEndian.Uint16(buf[:])) return b
} }
@ -210,11 +226,17 @@ func (self Int32) WriteTo(w io.Writer) (int64, error) {
return 4, err return 4, err
} }
func ReadInt32(r io.Reader) Int32 { func ReadInt32Safe(r io.Reader) (Int32, error) {
buf := [4]byte{0} buf := [4]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
if err != nil { return Int32(0), err }
return Int32(binary.LittleEndian.Uint32(buf[:])), nil
}
func ReadInt32(r io.Reader) (Int32) {
b, err := ReadInt32Safe(r)
if err != nil { panic(err) } if err != nil { panic(err) }
return Int32(binary.LittleEndian.Uint32(buf[:])) return b
} }
@ -241,11 +263,17 @@ func (self UInt32) WriteTo(w io.Writer) (int64, error) {
return 4, err return 4, err
} }
func ReadUInt32(r io.Reader) UInt32 { func ReadUInt32Safe(r io.Reader) (UInt32, error) {
buf := [4]byte{0} buf := [4]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
if err != nil { return UInt32(0), err }
return UInt32(binary.LittleEndian.Uint32(buf[:])), nil
}
func ReadUInt32(r io.Reader) (UInt32) {
b, err := ReadUInt32Safe(r)
if err != nil { panic(err) } if err != nil { panic(err) }
return UInt32(binary.LittleEndian.Uint32(buf[:])) return b
} }
@ -272,11 +300,17 @@ func (self Int64) WriteTo(w io.Writer) (int64, error) {
return 8, err return 8, err
} }
func ReadInt64(r io.Reader) Int64 { func ReadInt64Safe(r io.Reader) (Int64, error) {
buf := [8]byte{0} buf := [8]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
if err != nil { return Int64(0), err }
return Int64(binary.LittleEndian.Uint64(buf[:])), nil
}
func ReadInt64(r io.Reader) (Int64) {
b, err := ReadInt64Safe(r)
if err != nil { panic(err) } if err != nil { panic(err) }
return Int64(binary.LittleEndian.Uint64(buf[:])) return b
} }
@ -303,11 +337,17 @@ func (self UInt64) WriteTo(w io.Writer) (int64, error) {
return 8, err return 8, err
} }
func ReadUInt64(r io.Reader) UInt64 { func ReadUInt64Safe(r io.Reader) (UInt64, error) {
buf := [8]byte{0} buf := [8]byte{0}
_, err := io.ReadFull(r, buf[:]) _, err := io.ReadFull(r, buf[:])
if err != nil { return UInt64(0), err }
return UInt64(binary.LittleEndian.Uint64(buf[:])), nil
}
func ReadUInt64(r io.Reader) (UInt64) {
b, err := ReadUInt64Safe(r)
if err != nil { panic(err) } if err != nil { panic(err) }
return UInt64(binary.LittleEndian.Uint64(buf[:])) return b
} }

View File

@ -30,10 +30,17 @@ func (self String) WriteTo(w io.Writer) (n int64, err error) {
return int64(n_+4), err return int64(n_+4), err
} }
func ReadString(r io.Reader) String { func ReadStringSafe(r io.Reader) (String, error) {
length := int(ReadUInt32(r)) length, err := ReadUInt32Safe(r)
bytes := make([]byte, length) if err != nil { return "", err }
_, err := io.ReadFull(r, bytes) bytes := make([]byte, int(length))
if err != nil { panic(err) } _, err = io.ReadFull(r, bytes)
return String(bytes) if err != nil { return "", err }
return String(bytes), nil
}
func ReadString(r io.Reader) String {
str, err := ReadStringSafe(r)
if r != nil { panic(err) }
return str
} }

View File

@ -2,6 +2,7 @@ package peer
import ( import (
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/merkle" "github.com/tendermint/tendermint/merkle"
"sync/atomic" "sync/atomic"
"sync" "sync"
@ -24,7 +25,7 @@ type Client struct {
targetNumPeers int targetNumPeers int
makePeerFn func(*Connection) *Peer makePeerFn func(*Connection) *Peer
self *Peer self *Peer
inQueues map[string]chan *InboundMsg recvQueues map[String]chan *InboundPacket
mtx sync.Mutex mtx sync.Mutex
peers merkle.Tree // addr -> *Peer peers merkle.Tree // addr -> *Peer
@ -43,9 +44,9 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client {
Panicf("makePeerFn(nil) must return a prototypical peer for self") Panicf("makePeerFn(nil) must return a prototypical peer for self")
} }
inQueues := make(map[string]chan *InboundMsg) recvQueues := make(map[String]chan *InboundPacket)
for chName, _ := range self.channels { for chName, _ := range self.channels {
inQueues[chName] = make(chan *InboundMsg) recvQueues[chName] = make(chan *InboundPacket)
} }
c := &Client{ c := &Client{
@ -53,7 +54,7 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client {
targetNumPeers: 0, // TODO targetNumPeers: 0, // TODO
makePeerFn: makePeerFn, makePeerFn: makePeerFn,
self: self, self: self,
inQueues: inQueues, recvQueues: recvQueues,
peers: merkle.NewIAVLTree(nil), peers: merkle.NewIAVLTree(nil),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -89,18 +90,18 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer,
err := c.addPeer(peer) err := c.addPeer(peer)
if err != nil { return nil, err } if err != nil { return nil, err }
go peer.Start(c.inQueues) go peer.Start(c.recvQueues)
return peer, nil return peer, nil
} }
func (c *Client) Broadcast(chName string, msg Msg) { func (c *Client) Broadcast(pkt Packet) {
if atomic.LoadUint32(&c.stopped) == 1 { return } if atomic.LoadUint32(&c.stopped) == 1 { return }
log.Tracef("Broadcast on [%v] msg: %v", chName, msg) log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes))
for v := range c.Peers().Values() { for v := range c.Peers().Values() {
peer := v.(*Peer) peer := v.(*Peer)
success := peer.TryQueueOut(chName , msg) success := peer.TrySend(pkt)
log.Tracef("Broadcast for peer %v success: %v", peer, success) log.Tracef("Broadcast for peer %v success: %v", peer, success)
if !success { if !success {
// TODO: notify the peer // TODO: notify the peer
@ -110,19 +111,19 @@ func (c *Client) Broadcast(chName string, msg Msg) {
} }
// blocks until a message is popped. // blocks until a message is popped.
func (c *Client) PopMessage(chName string) *InboundMsg { func (c *Client) Receive(chName String) *InboundPacket {
if atomic.LoadUint32(&c.stopped) == 1 { return nil } if atomic.LoadUint32(&c.stopped) == 1 { return nil }
log.Tracef("PopMessage on [%v]", chName) log.Tracef("Receive on [%v]", chName)
q := c.inQueues[chName] q := c.recvQueues[chName]
if q == nil { Panicf("Expected inQueues[%f], found none", chName) } if q == nil { Panicf("Expected recvQueues[%f], found none", chName) }
for { for {
select { select {
case <-c.quit: case <-c.quit:
return nil return nil
case inMsg := <-q: case inPacket := <-q:
return inMsg return inPacket
} }
} }
} }

View File

@ -11,7 +11,7 @@ func TestConnection(t *testing.T) {
peerMaker := func(conn *Connection) *Peer { peerMaker := func(conn *Connection) *Peer {
bufferSize := 10 bufferSize := 10
p := NewPeer(conn) p := NewPeer(conn)
p.channels = map[string]*Channel{} p.channels = map[String]*Channel{}
p.channels["ch1"] = NewChannel("ch1", bufferSize) p.channels["ch1"] = NewChannel("ch1", bufferSize)
p.channels["ch2"] = NewChannel("ch2", bufferSize) p.channels["ch2"] = NewChannel("ch2", bufferSize)
return p return p
@ -44,9 +44,9 @@ func TestConnection(t *testing.T) {
} }
// TODO: test the transmission of information on channels. // TODO: test the transmission of information on channels.
c1.Broadcast("ch1", Msg{Bytes:ByteSlice("test data")}) c1.Broadcast(NewPacket("ch1", ByteSlice("test data")))
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
inMsg := c2.PopMessage("ch1") inMsg := c2.Receive("ch1")
t.Logf("c2 popped message: %v", inMsg) t.Logf("c2 popped message: %v", inMsg)

View File

@ -19,7 +19,7 @@ const (
type Connection struct { type Connection struct {
ioStats IOStats ioStats IOStats
outQueue chan ByteSlice // never closes. sendQueue chan Packet // never closes
conn net.Conn conn net.Conn
quit chan struct{} quit chan struct{}
stopped uint32 stopped uint32
@ -35,7 +35,7 @@ var (
func NewConnection(conn net.Conn) *Connection { func NewConnection(conn net.Conn) *Connection {
return &Connection{ return &Connection{
outQueue: make(chan ByteSlice, OUT_QUEUE_SIZE), sendQueue: make(chan Packet, OUT_QUEUE_SIZE),
conn: conn, conn: conn,
quit: make(chan struct{}), quit: make(chan struct{}),
pingDebouncer: NewDebouncer(PING_TIMEOUT_MINUTES * time.Minute), pingDebouncer: NewDebouncer(PING_TIMEOUT_MINUTES * time.Minute),
@ -46,19 +46,19 @@ func NewConnection(conn net.Conn) *Connection {
// returns true if successfully queued, // returns true if successfully queued,
// returns false if connection was closed. // returns false if connection was closed.
// blocks. // blocks.
func (c *Connection) QueueOut(msg ByteSlice) bool { func (c *Connection) Send(pkt Packet) bool {
select { select {
case c.outQueue <- msg: case c.sendQueue <- pkt:
return true return true
case <-c.quit: case <-c.quit:
return false return false
} }
} }
func (c *Connection) Start() { func (c *Connection) Start(channels map[String]*Channel) {
log.Debugf("Starting %v", c) log.Debugf("Starting %v", c)
go c.outHandler() go c.sendHandler()
go c.inHandler() go c.recvHandler(channels)
} }
func (c *Connection) Stop() { func (c *Connection) Stop() {
@ -68,9 +68,9 @@ func (c *Connection) Stop() {
c.conn.Close() c.conn.Close()
c.pingDebouncer.Stop() c.pingDebouncer.Stop()
// We can't close pong safely here because // We can't close pong safely here because
// inHandler may write to it after we've stopped. // recvHandler may write to it after we've stopped.
// Though it doesn't need to get closed at all, // Though it doesn't need to get closed at all,
// we close it @ inHandler. // we close it @ recvHandler.
// close(c.pong) // close(c.pong)
} }
} }
@ -91,8 +91,10 @@ func (c *Connection) flush() {
// TODO flush? (turn off nagel, turn back on, etc) // TODO flush? (turn off nagel, turn back on, etc)
} }
func (c *Connection) outHandler() { func (c *Connection) sendHandler() {
log.Tracef("Connection %v outHandler", c) log.Tracef("Connection %v sendHandler", c)
// TODO: catch panics & stop connection.
FOR_LOOP: FOR_LOOP:
for { for {
@ -100,11 +102,11 @@ func (c *Connection) outHandler() {
select { select {
case <-c.pingDebouncer.Ch: case <-c.pingDebouncer.Ch:
_, err = PACKET_TYPE_PING.WriteTo(c.conn) _, err = PACKET_TYPE_PING.WriteTo(c.conn)
case outMsg := <-c.outQueue: case sendPkt := <-c.sendQueue:
log.Tracef("Found msg from outQueue. Writing msg to underlying connection") log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection")
_, err = PACKET_TYPE_MSG.WriteTo(c.conn) _, err = PACKET_TYPE_MSG.WriteTo(c.conn)
if err != nil { break } if err != nil { break }
_, err = outMsg.WriteTo(c.conn) _, err = sendPkt.WriteTo(c.conn)
case <-c.pong: case <-c.pong:
_, err = PACKET_TYPE_PONG.WriteTo(c.conn) _, err = PACKET_TYPE_PONG.WriteTo(c.conn)
case <-c.quit: case <-c.quit:
@ -112,7 +114,7 @@ func (c *Connection) outHandler() {
} }
if err != nil { if err != nil {
log.Infof("Connection %v failed @ outHandler:\n%v", c, err) log.Infof("Connection %v failed @ sendHandler:\n%v", c, err)
c.Stop() c.Stop()
break FOR_LOOP break FOR_LOOP
} }
@ -120,53 +122,55 @@ func (c *Connection) outHandler() {
c.flush() c.flush()
} }
log.Tracef("Connection %v outHandler done", c) log.Tracef("Connection %v sendHandler done", c)
// cleanup // cleanup
} }
func (c *Connection) inHandler() { func (c *Connection) recvHandler(channels map[String]*Channel) {
log.Tracef("Connection %v inHandler", c) log.Tracef("Connection %v recvHandler with %v channels", c, len(channels))
// TODO: catch panics & stop connection.
FOR_LOOP: FOR_LOOP:
for { for {
msgType, err := ReadUInt8Safe(c.conn) pktType, err := ReadUInt8Safe(c.conn)
if err != nil { if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 { if atomic.LoadUint32(&c.stopped) != 1 {
log.Infof("Connection %v failed @ inHandler", c) log.Infof("Connection %v failed @ recvHandler", c)
c.Stop() c.Stop()
} }
break FOR_LOOP break FOR_LOOP
} else { } else {
log.Tracef("Found msgType %v", msgType) log.Tracef("Found pktType %v", pktType)
} }
switch msgType { switch pktType {
case PACKET_TYPE_PING: case PACKET_TYPE_PING:
c.pong <- struct{}{} c.pong <- struct{}{}
case PACKET_TYPE_PONG: case PACKET_TYPE_PONG:
// do nothing // do nothing
case PACKET_TYPE_MSG: case PACKET_TYPE_MSG:
msg, err := ReadByteSliceSafe(c.conn) pkt, err := ReadPacketSafe(c.conn)
if err != nil { if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 { if atomic.LoadUint32(&c.stopped) != 1 {
log.Infof("Connection %v failed @ inHandler", c) log.Infof("Connection %v failed @ recvHandler", c)
c.Stop() c.Stop()
} }
break FOR_LOOP break FOR_LOOP
} }
// What to do? channel := channels[pkt.Channel]
// XXX if channel == nil {
XXX well, we need to push it into the channel or something. Panicf("Unknown channel %v", pkt.Channel)
or at least provide an inQueue. }
log.Tracef("%v", msg) channel.recvQueue <- pkt
default: default:
Panicf("Unknown message type %v", msgType) Panicf("Unknown message type %v", pktType)
} }
c.pingDebouncer.Reset() c.pingDebouncer.Reset()
} }
log.Tracef("Connection %v inHandler done", c) log.Tracef("Connection %v recvHandler done", c)
// cleanup // cleanup
close(c.pong) close(c.pong)
for _ = range c.pong { for _ = range c.pong {
@ -182,6 +186,6 @@ type IOStats struct {
LastRecv Time LastRecv Time
BytesRecv UInt64 BytesRecv UInt64
BytesSent UInt64 BytesSent UInt64
MsgsRecv UInt64 PktsRecv UInt64
MsgsSent UInt64 PktsSent UInt64
} }

View File

@ -5,21 +5,44 @@ import (
"io" "io"
) )
/* Msg */ /* Packet */
type Msg struct { type Packet struct {
Channel String
Bytes ByteSlice Bytes ByteSlice
Hash ByteSlice // Hash
}
func NewPacket(chName String, bytes ByteSlice) Packet {
return Packet{
Channel: chName,
Bytes: bytes,
}
}
func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(&p.Channel, w, n, err)
n, err = WriteOnto(&p.Bytes, w, n, err)
return
}
func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
chName, err := ReadStringSafe(r)
if err != nil { return }
// TODO: packet length sanity check.
bytes, err := ReadByteSliceSafe(r)
if err != nil { return }
return NewPacket(chName, bytes), nil
} }
/* InboundMsg */ /* InboundPacket */
type InboundMsg struct { type InboundPacket struct {
Peer *Peer Peer *Peer
Channel *Channel Channel *Channel
Time Time Time Time
Msg Packet
} }
@ -31,5 +54,6 @@ type NewFilterMsg struct {
} }
func (m *NewFilterMsg) WriteTo(w io.Writer) (int64, error) { func (m *NewFilterMsg) WriteTo(w io.Writer) (int64, error) {
panic("TODO: implement")
return 0, nil // TODO return 0, nil // TODO
} }

View File

@ -14,7 +14,7 @@ import (
type Peer struct { type Peer struct {
outgoing bool outgoing bool
conn *Connection conn *Connection
channels map[string]*Channel channels map[String]*Channel
mtx sync.Mutex mtx sync.Mutex
quit chan struct{} quit chan struct{}
@ -29,12 +29,12 @@ func NewPeer(conn *Connection) *Peer {
} }
} }
func (p *Peer) Start(peerInQueues map[string]chan *InboundMsg ) { func (p *Peer) Start(peerRecvQueues map[String]chan *InboundPacket ) {
log.Debugf("Starting %v", p) log.Debugf("Starting %v", p)
p.conn.Start() p.conn.Start(p.channels)
for chName, _ := range p.channels { for chName, _ := range p.channels {
go p.inHandler(chName, peerInQueues[chName]) go p.recvHandler(chName, peerRecvQueues[chName])
go p.outHandler(chName) go p.sendHandler(chName)
} }
} }
@ -58,21 +58,21 @@ func (p *Peer) RemoteAddress() *NetAddress {
return p.conn.RemoteAddress() return p.conn.RemoteAddress()
} }
func (p *Peer) Channel(chName string) *Channel { func (p *Peer) Channel(chName String) *Channel {
return p.channels[chName] return p.channels[chName]
} }
// Queue the msg for output. // If the channel's queue is full, just return false.
// If the queue is full, just return false. // Later the sendHandler will send the pkt to the underlying connection.
func (p *Peer) TryQueueOut(chName string, msg Msg) bool { func (p *Peer) TrySend(pkt Packet) bool {
channel := p.Channel(chName) channel := p.Channel(pkt.Channel)
outQueue := channel.OutQueue() sendQueue := channel.SendQueue()
// lock & defer // lock & defer
p.mtx.Lock(); defer p.mtx.Unlock() p.mtx.Lock(); defer p.mtx.Unlock()
if p.stopped == 1 { return false } if p.stopped == 1 { return false }
select { select {
case outQueue <- msg: case sendQueue <- pkt:
return true return true
default: // buffer full default: // buffer full
return false return false
@ -88,55 +88,55 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing) return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
} }
func (p *Peer) inHandler(chName string, inboundMsgQueue chan<- *InboundMsg) { func (p *Peer) recvHandler(chName String, inboundPacketQueue chan<- *InboundPacket) {
log.Tracef("Peer %v inHandler [%v]", p, chName) log.Tracef("Peer %v recvHandler [%v]", p, chName)
channel := p.channels[chName] channel := p.channels[chName]
inQueue := channel.InQueue() recvQueue := channel.RecvQueue()
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-p.quit: case <-p.quit:
break FOR_LOOP break FOR_LOOP
case msg := <-inQueue: case pkt := <-recvQueue:
// send to inboundMsgQueue // send to inboundPacketQueue
inboundMsg := &InboundMsg{ inboundPacket := &InboundPacket{
Peer: p, Peer: p,
Channel: channel, Channel: channel,
Time: Time{time.Now()}, Time: Time{time.Now()},
Msg: msg, Packet: pkt,
} }
select { select {
case <-p.quit: case <-p.quit:
break FOR_LOOP break FOR_LOOP
case inboundMsgQueue <- inboundMsg: case inboundPacketQueue <- inboundPacket:
continue continue
} }
} }
} }
log.Tracef("Peer %v inHandler [%v] closed", p, chName) log.Tracef("Peer %v recvHandler [%v] closed", p, chName)
// cleanup // cleanup
// (none) // (none)
} }
func (p *Peer) outHandler(chName string) { func (p *Peer) sendHandler(chName String) {
log.Tracef("Peer %v outHandler [%v]", p, chName) log.Tracef("Peer %v sendHandler [%v]", p, chName)
outQueue := p.channels[chName].outQueue chSendQueue := p.channels[chName].sendQueue
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-p.quit: case <-p.quit:
break FOR_LOOP break FOR_LOOP
case msg := <-outQueue: case pkt := <-chSendQueue:
log.Tracef("Sending msg to peer outQueue") log.Tracef("Sending packet to peer chSendQueue")
// blocks until the connection is Stop'd, // blocks until the connection is Stop'd,
// which happens when this peer is Stop'd. // which happens when this peer is Stop'd.
p.conn.QueueOut(msg.Bytes) p.conn.Send(pkt)
} }
} }
log.Tracef("Peer %v outHandler [%v] closed", p, chName) log.Tracef("Peer %v sendHandler [%v] closed", p, chName)
// cleanup // cleanup
// (none) // (none)
} }
@ -145,28 +145,28 @@ func (p *Peer) outHandler(chName string) {
/* Channel */ /* Channel */
type Channel struct { type Channel struct {
name string name String
inQueue chan Msg recvQueue chan Packet
outQueue chan Msg sendQueue chan Packet
//stats Stats //stats Stats
} }
func NewChannel(name string, bufferSize int) *Channel { func NewChannel(name String, bufferSize int) *Channel {
return &Channel{ return &Channel{
name: name, name: name,
inQueue: make(chan Msg, bufferSize), recvQueue: make(chan Packet, bufferSize),
outQueue: make(chan Msg, bufferSize), sendQueue: make(chan Packet, bufferSize),
} }
} }
func (c *Channel) Name() string { func (c *Channel) Name() String {
return c.name return c.name
} }
func (c *Channel) InQueue() <-chan Msg { func (c *Channel) RecvQueue() <-chan Packet {
return c.inQueue return c.recvQueue
} }
func (c *Channel) OutQueue() chan<- Msg { func (c *Channel) SendQueue() chan<- Packet {
return c.outQueue return c.sendQueue
} }

View File

@ -1,67 +0,0 @@
package peer
import (
. "github.com/tendermint/tendermint/binary"
"io"
)
/* Set
A Set could be a bloom filter for lossy filtering, or could be a lossless filter.
*/
type Set interface {
Binary
Add(Msg)
Has(Msg) bool
// Loads a new set.
// Convenience factory method
Load(ByteSlice) Set
}
/* BloomFilterSet */
type BloomFilterSet struct {
lastBlockHeight UInt64
lastHeaderHeight UInt64
}
func (bs *BloomFilterSet) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(String("block"), w, n, err)
n, err = WriteOnto(bs.lastBlockHeight, w, n, err)
n, err = WriteOnto(bs.lastHeaderHeight, w, n, err)
return
}
func (bs *BloomFilterSet) Add(msg Msg) {
}
func (bs *BloomFilterSet) Has(msg Msg) bool {
return false
}
func (bs *BloomFilterSet) Load(bytes ByteSlice) Set {
return nil
}
/* BitarraySet */
type BlockSet struct {
}
func (bs *BlockSet) WriteTo(w io.Writer) (n int64, err error) {
return
}
func (bs *BlockSet) Add(msg Msg) {
}
func (bs *BlockSet) Has(msg Msg) bool {
return false
}
func (bs *BlockSet) Load(bytes ByteSlice) Set {
return nil
}