Message is wrapped by TypedMessage.

This commit is contained in:
Jae Kwon
2014-07-18 21:21:42 -07:00
parent 9464241c02
commit 6750d05b05
15 changed files with 365 additions and 171 deletions

View File

@ -1,12 +1,13 @@
TenderMint - proof of concept TenderMint - proof of concept
* **[peer](https://github.com/tendermint/tendermint/blob/master/peer):** P2P networking stack. Designed to be extensible. * **[p2p](https://github.com/tendermint/tendermint/blob/master/p2p):** P2P networking stack. Designed to be extensible.
* **[merkle](https://github.com/tendermint/tendermint/blob/master/merkle):** Immutable Persistent Merkle-ized AVL+ Tree, used primarily for keeping track of mutable state like account balances. * **[merkle](https://github.com/tendermint/tendermint/blob/master/merkle):** Immutable Persistent Merkle-ized AVL+ Tree, used primarily for keeping track of mutable state like account balances.
* **[crypto](https://github.com/tendermint/tendermint/blob/master/crypto):** Includes cgo bindings of ed25519. * **[crypto](https://github.com/tendermint/tendermint/blob/master/crypto):** Includes cgo bindings of ed25519.
### Status ### Status
* Node & testnet *now* * Block manager *now*
* Node & testnet *complete*
* PEX peer exchange *complete* * PEX peer exchange *complete*
* p2p/* *complete* * p2p/* *complete*
* Ed25519 bindings *complete* * Ed25519 bindings *complete*

View File

@ -6,7 +6,7 @@ type Binary interface {
WriteTo(w io.Writer) (int64, error) WriteTo(w io.Writer) (int64, error)
} }
func WriteOnto(b Binary, w io.Writer, n int64, err error) (int64, error) { func WriteTo(b Binary, w io.Writer, n int64, err error) (int64, error) {
if err != nil { if err != nil {
return n, err return n, err
} }

View File

@ -33,12 +33,12 @@ func ReadAccountId(r io.Reader) AccountId {
} }
func (self AccountId) WriteTo(w io.Writer) (n int64, err error) { func (self AccountId) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Type, w, n, err) n, err = WriteTo(self.Type, w, n, err)
if self.Type == ACCOUNT_TYPE_NUMBER || self.Type == ACCOUNT_TYPE_BOTH { if self.Type == ACCOUNT_TYPE_NUMBER || self.Type == ACCOUNT_TYPE_BOTH {
n, err = WriteOnto(self.Number, w, n, err) n, err = WriteTo(self.Number, w, n, err)
} }
if self.Type == ACCOUNT_TYPE_PUBKEY || self.Type == ACCOUNT_TYPE_BOTH { if self.Type == ACCOUNT_TYPE_PUBKEY || self.Type == ACCOUNT_TYPE_BOTH {
n, err = WriteOnto(self.PubKey, w, n, err) n, err = WriteTo(self.PubKey, w, n, err)
} }
return return
} }

View File

@ -73,11 +73,11 @@ func (self *Bond) Type() Byte {
} }
func (self *Bond) WriteTo(w io.Writer) (n int64, err error) { func (self *Bond) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Type(), w, n, err) n, err = WriteTo(self.Type(), w, n, err)
n, err = WriteOnto(self.Fee, w, n, err) n, err = WriteTo(self.Fee, w, n, err)
n, err = WriteOnto(self.UnbondTo, w, n, err) n, err = WriteTo(self.UnbondTo, w, n, err)
n, err = WriteOnto(self.Amount, w, n, err) n, err = WriteTo(self.Amount, w, n, err)
n, err = WriteOnto(self.Signature, w, n, err) n, err = WriteTo(self.Signature, w, n, err)
return return
} }
@ -94,10 +94,10 @@ func (self *Unbond) Type() Byte {
} }
func (self *Unbond) WriteTo(w io.Writer) (n int64, err error) { func (self *Unbond) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Type(), w, n, err) n, err = WriteTo(self.Type(), w, n, err)
n, err = WriteOnto(self.Fee, w, n, err) n, err = WriteTo(self.Fee, w, n, err)
n, err = WriteOnto(self.Amount, w, n, err) n, err = WriteTo(self.Amount, w, n, err)
n, err = WriteOnto(self.Signature, w, n, err) n, err = WriteTo(self.Signature, w, n, err)
return return
} }
@ -113,9 +113,9 @@ func (self *Timeout) Type() Byte {
} }
func (self *Timeout) WriteTo(w io.Writer) (n int64, err error) { func (self *Timeout) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Type(), w, n, err) n, err = WriteTo(self.Type(), w, n, err)
n, err = WriteOnto(self.Account, w, n, err) n, err = WriteTo(self.Account, w, n, err)
n, err = WriteOnto(self.Penalty, w, n, err) n, err = WriteTo(self.Penalty, w, n, err)
return return
} }
@ -131,8 +131,8 @@ func (self *Dupeout) Type() Byte {
} }
func (self *Dupeout) WriteTo(w io.Writer) (n int64, err error) { func (self *Dupeout) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Type(), w, n, err) n, err = WriteTo(self.Type(), w, n, err)
n, err = WriteOnto(self.VoteA, w, n, err) n, err = WriteTo(self.VoteA, w, n, err)
n, err = WriteOnto(self.VoteB, w, n, err) n, err = WriteTo(self.VoteB, w, n, err)
return return
} }

View File

@ -11,7 +11,7 @@ import (
type Block struct { type Block struct {
Header Header
Validation Validation
Data Txs
// Checkpoint // Checkpoint
} }
@ -19,7 +19,7 @@ func ReadBlock(r io.Reader) *Block {
return &Block{ return &Block{
Header: ReadHeader(r), Header: ReadHeader(r),
Validation: ReadValidation(r), Validation: ReadValidation(r),
Data: ReadData(r), Txs: ReadTxs(r),
} }
} }
@ -28,9 +28,9 @@ func (self *Block) Validate() bool {
} }
func (self *Block) WriteTo(w io.Writer) (n int64, err error) { func (self *Block) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(&self.Header, w, n, err) n, err = WriteTo(&self.Header, w, n, err)
n, err = WriteOnto(&self.Validation, w, n, err) n, err = WriteTo(&self.Validation, w, n, err)
n, err = WriteOnto(&self.Data, w, n, err) n, err = WriteTo(&self.Txs, w, n, err)
return return
} }
@ -43,7 +43,7 @@ type Header struct {
Time UInt64 Time UInt64
PrevHash ByteSlice PrevHash ByteSlice
ValidationHash ByteSlice ValidationHash ByteSlice
DataHash ByteSlice TxsHash ByteSlice
} }
func ReadHeader(r io.Reader) Header { func ReadHeader(r io.Reader) Header {
@ -54,18 +54,18 @@ func ReadHeader(r io.Reader) Header {
Time: ReadUInt64(r), Time: ReadUInt64(r),
PrevHash: ReadByteSlice(r), PrevHash: ReadByteSlice(r),
ValidationHash: ReadByteSlice(r), ValidationHash: ReadByteSlice(r),
DataHash: ReadByteSlice(r), TxsHash: ReadByteSlice(r),
} }
} }
func (self *Header) WriteTo(w io.Writer) (n int64, err error) { func (self *Header) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Name, w, n, err) n, err = WriteTo(self.Name, w, n, err)
n, err = WriteOnto(self.Height, w, n, err) n, err = WriteTo(self.Height, w, n, err)
n, err = WriteOnto(self.Fees, w, n, err) n, err = WriteTo(self.Fees, w, n, err)
n, err = WriteOnto(self.Time, w, n, err) n, err = WriteTo(self.Time, w, n, err)
n, err = WriteOnto(self.PrevHash, w, n, err) n, err = WriteTo(self.PrevHash, w, n, err)
n, err = WriteOnto(self.ValidationHash, w, n, err) n, err = WriteTo(self.ValidationHash, w, n, err)
n, err = WriteOnto(self.DataHash, w, n, err) n, err = WriteTo(self.TxsHash, w, n, err)
return return
} }
@ -94,41 +94,41 @@ func ReadValidation(r io.Reader) Validation {
} }
func (self *Validation) WriteTo(w io.Writer) (n int64, err error) { func (self *Validation) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(UInt64(len(self.Signatures)), w, n, err) n, err = WriteTo(UInt64(len(self.Signatures)), w, n, err)
n, err = WriteOnto(UInt64(len(self.Adjustments)), w, n, err) n, err = WriteTo(UInt64(len(self.Adjustments)), w, n, err)
for _, sig := range self.Signatures { for _, sig := range self.Signatures {
n, err = WriteOnto(sig, w, n, err) n, err = WriteTo(sig, w, n, err)
} }
for _, adj := range self.Adjustments { for _, adj := range self.Adjustments {
n, err = WriteOnto(adj, w, n, err) n, err = WriteTo(adj, w, n, err)
} }
return return
} }
/* Block > Data */ /* Block > Txs */
type Data struct { type Txs struct {
Txs []Tx Txs []Tx
} }
func ReadData(r io.Reader) Data { func ReadTxs(r io.Reader) Txs {
numTxs := int(ReadUInt64(r)) numTxs := int(ReadUInt64(r))
txs := make([]Tx, 0, numTxs) txs := make([]Tx, 0, numTxs)
for i := 0; i < numTxs; i++ { for i := 0; i < numTxs; i++ {
txs = append(txs, ReadTx(r)) txs = append(txs, ReadTx(r))
} }
return Data{txs} return Txs{txs}
} }
func (self *Data) WriteTo(w io.Writer) (n int64, err error) { func (self *Txs) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(UInt64(len(self.Txs)), w, n, err) n, err = WriteTo(UInt64(len(self.Txs)), w, n, err)
for _, tx := range self.Txs { for _, tx := range self.Txs {
n, err = WriteOnto(tx, w, n, err) n, err = WriteTo(tx, w, n, err)
} }
return return
} }
func (self *Data) MerkleHash() ByteSlice { func (self *Txs) MerkleHash() ByteSlice {
bs := make([]Binary, 0, len(self.Txs)) bs := make([]Binary, 0, len(self.Txs))
for i, tx := range self.Txs { for i, tx := range self.Txs {
bs[i] = Binary(tx) bs[i] = Binary(tx)

163
blocks/block_manager.go Normal file
View File

@ -0,0 +1,163 @@
package blocks
import (
"github.com/tendermint/tendermint/p2p"
)
const (
BlocksCh = "block"
msgTypeUnknown = Byte(0x00)
msgTypeState = Byte(0x01)
msgTypeRequest = Byte(0x02)
msgTypeData = Byte(0x03)
dataTypeAll = byte(0x00)
dataTypeValidation = byte(0x01)
dataTypeTxs = byte(0x02)
// dataTypeCheckpoint = byte(0x04)
)
/*
*/
type BlockManager struct {
quit chan struct{}
started uint32
stopped uint32
}
func NewBlockManager() *BlockManager {
bm := &BlockManager{
sw: sw,
quit: make(chan struct{}),
}
return bm
}
func (bm *BlockManager) Start() {
if atomic.CompareAndSwapUint32(&bm.started, 0, 1) {
log.Info("Starting BlockManager")
go bm.XXX()
}
}
func (bm *BlockManager) Stop() {
if atomic.CompareAndSwapUint32(&bm.stopped, 0, 1) {
log.Info("Stopping BlockManager")
close(bm.quit)
}
}
func (bm *BlockManager) XXX() {
}
//-----------------------------------------------------------------------------
/* Messages */
// TODO: check for unnecessary extra bytes at the end.
func decodeMessage(bz ByteSlice) (msg Message) {
// log.Debug("decoding msg bytes: %X", bz)
switch Byte(bz[0]) {
case msgTypeState:
return &StateMessage{}
case msgTypeRequest:
return readRequestMessage(bytes.NewReader(bz[1:]))
case msgTypeData:
return readDataMessage(bytes.NewReader(bz[1:]))
default:
return nil
}
}
/*
A StateMessage declares what (contiguous) blocks & headers are known.
LastValidationHeight >= LastBlockHeight.
*/
type StateMessage struct {
LastBlockHeight UInt64
LastValidationHeight UInt64
}
func readStateMessage(r io.Reader) *StateMessage {
lastBlockHeight := ReadUInt64(r)
lastValidationHeight := ReadUInt64(r)
return &StateMessage{
LastBlockHeight: lastBlockHeight,
LastValidationHeight: lastValidationHeight,
}
}
func (m *StateMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteTo(msgTypeState, w, n, err)
n, err = WriteTo(m.LastBlockHeight, w, n, err)
n, err = WriteTo(m.LastValidationHeight, w, n, err)
return
}
func (m *StateMessage) String() string {
return fmt.Sprintf("[State %v/%v]",
m.LastBlockHeight, m.LastValidationHeight)
}
/*
A RequestMessage requests a block and/or header at a given height.
*/
type RequestMessage struct {
Type Byte
Height UInt64
}
func readRequestMessage(r io.Reader) *RequestMessage {
requestType := ReadByte(r)
height := ReadUInt64(r)
return &RequestMessage{
Type: requestType,
Height: height,
}
}
func (m *RequestMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteTo(msgTypeRequest, w, n, err)
n, err = WriteTo(m.Type, w, n, err)
n, err = WriteTo(m.Height, w, n, err)
return
}
func (m *RequestMessage) String() string {
return fmt.Sprintf("[Request %X@%v]", m.Type, m.Height)
}
/*
A DataMessage contains block data, maybe requested.
The data can be a Validation, Txs, or whole Block object.
*/
type DataMessage struct {
Type Byte
Height UInt64
Bytes ByteSlice
}
func readDataMessage(r io.Reader) *DataMessage {
dataType := ReadByte(r)
height := ReadUInt64(r)
bytes := ReadByteSlice(r)
return &DataMessage{
Type: dataType,
Height: height,
Bytes: bytes,
}
}
func (m *DataMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteTo(msgTypeData, w, n, err)
n, err = WriteTo(m.Type, w, n, err)
n, err = WriteTo(m.Height, w, n, err)
n, err = WriteTo(m.Bytes, w, n, err)
return
}
func (m *DataMessage) String() string {
return fmt.Sprintf("[Data %X@%v]", m.Type, m.Height)
}

View File

@ -21,7 +21,7 @@ func BenchmarkTestCustom(b *testing.B) {
Time: 123, Time: 123,
PrevHash: ByteSlice("prevhash"), PrevHash: ByteSlice("prevhash"),
ValidationHash: ByteSlice("validationhash"), ValidationHash: ByteSlice("validationhash"),
DataHash: ByteSlice("datahash"), TxsHash: ByteSlice("txshash"),
} }
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
@ -44,7 +44,7 @@ type HHeader struct {
Time uint64 `json:"T"` Time uint64 `json:"T"`
PrevHash []byte `json:"PH"` PrevHash []byte `json:"PH"`
ValidationHash []byte `json:"VH"` ValidationHash []byte `json:"VH"`
DataHash []byte `json:"DH"` TxsHash []byte `json:"DH"`
} }
func BenchmarkTestJSON(b *testing.B) { func BenchmarkTestJSON(b *testing.B) {
@ -57,7 +57,7 @@ func BenchmarkTestJSON(b *testing.B) {
Time: 123, Time: 123,
PrevHash: []byte("prevhash"), PrevHash: []byte("prevhash"),
ValidationHash: []byte("validationhash"), ValidationHash: []byte("validationhash"),
DataHash: []byte("datahash"), TxsHash: []byte("txshash"),
} }
h2 := &HHeader{} h2 := &HHeader{}
@ -86,7 +86,7 @@ func BenchmarkTestGob(b *testing.B) {
Time: 123, Time: 123,
PrevHash: []byte("prevhash"), PrevHash: []byte("prevhash"),
ValidationHash: []byte("validationhash"), ValidationHash: []byte("validationhash"),
DataHash: []byte("datahash"), TxsHash: []byte("txshash"),
} }
h2 := &Header{} h2 := &Header{}
@ -115,7 +115,7 @@ func BenchmarkTestMsgPack(b *testing.B) {
Time: 123, Time: 123,
PrevHash: []byte("prevhash"), PrevHash: []byte("prevhash"),
ValidationHash: []byte("validationhash"), ValidationHash: []byte("validationhash"),
DataHash: []byte("datahash"), TxsHash: []byte("txshash"),
} }
h2 := &Header{} h2 := &Header{}
@ -144,7 +144,7 @@ func BenchmarkTestMsgPack2(b *testing.B) {
Time: 123, Time: 123,
PrevHash: []byte("prevhash"), PrevHash: []byte("prevhash"),
ValidationHash: []byte("validationhash"), ValidationHash: []byte("validationhash"),
DataHash: []byte("datahash"), TxsHash: []byte("txshash"),
} }
h2 := &Header{} h2 := &Header{}
var mh codec.MsgpackHandle var mh codec.MsgpackHandle

View File

@ -31,8 +31,8 @@ func ReadSignature(r io.Reader) Signature {
} }
func (self Signature) WriteTo(w io.Writer) (n int64, err error) { func (self Signature) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Signer, w, n, err) n, err = WriteTo(self.Signer, w, n, err)
n, err = WriteOnto(self.SigBytes, w, n, err) n, err = WriteTo(self.SigBytes, w, n, err)
return return
} }

View File

@ -66,11 +66,11 @@ func (self *SendTx) Type() Byte {
} }
func (self *SendTx) WriteTo(w io.Writer) (n int64, err error) { func (self *SendTx) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Type(), w, n, err) n, err = WriteTo(self.Type(), w, n, err)
n, err = WriteOnto(self.Fee, w, n, err) n, err = WriteTo(self.Fee, w, n, err)
n, err = WriteOnto(self.To, w, n, err) n, err = WriteTo(self.To, w, n, err)
n, err = WriteOnto(self.Amount, w, n, err) n, err = WriteTo(self.Amount, w, n, err)
n, err = WriteOnto(self.Signature, w, n, err) n, err = WriteTo(self.Signature, w, n, err)
return return
} }
@ -88,10 +88,10 @@ func (self *NameTx) Type() Byte {
} }
func (self *NameTx) WriteTo(w io.Writer) (n int64, err error) { func (self *NameTx) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Type(), w, n, err) n, err = WriteTo(self.Type(), w, n, err)
n, err = WriteOnto(self.Fee, w, n, err) n, err = WriteTo(self.Fee, w, n, err)
n, err = WriteOnto(self.Name, w, n, err) n, err = WriteTo(self.Name, w, n, err)
n, err = WriteOnto(self.PubKey, w, n, err) n, err = WriteTo(self.PubKey, w, n, err)
n, err = WriteOnto(self.Signature, w, n, err) n, err = WriteTo(self.Signature, w, n, err)
return return
} }

View File

@ -25,8 +25,8 @@ func ReadVote(r io.Reader) Vote {
} }
func (self Vote) WriteTo(w io.Writer) (n int64, err error) { func (self Vote) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(self.Height, w, n, err) n, err = WriteTo(self.Height, w, n, err)
n, err = WriteOnto(self.BlockHash, w, n, err) n, err = WriteTo(self.BlockHash, w, n, err)
n, err = WriteOnto(self.Signature, w, n, err) n, err = WriteTo(self.Signature, w, n, err)
return return
} }

View File

@ -1,7 +1,5 @@
package main package main
// TODO: ensure Mark* gets called.
import ( import (
"os" "os"
"os/signal" "os/signal"
@ -116,8 +114,7 @@ func (n *Node) switchEventsHandler() {
if event.Peer.IsOutbound() { if event.Peer.IsOutbound() {
n.sendOurExternalAddrs(event.Peer) n.sendOurExternalAddrs(event.Peer)
if n.book.NeedMoreAddrs() { if n.book.NeedMoreAddrs() {
pkt := p2p.NewPacket(p2p.PexCh, p2p.NewPexRequestMessage()) n.pmgr.RequestPEX(event.Peer)
event.Peer.TrySend(pkt)
} }
} }
case p2p.SwitchEventDonePeer: case p2p.SwitchEventDonePeer:
@ -132,8 +129,7 @@ func (n *Node) sendOurExternalAddrs(peer *p2p.Peer) {
for _, l := range n.lz { for _, l := range n.lz {
addrs = append(addrs, l.ExternalAddress()) addrs = append(addrs, l.ExternalAddress())
} }
msg := &p2p.PexAddrsMessage{Addrs: addrs} n.pmgr.SendAddrs(peer, addrs)
peer.Send(p2p.NewPacket(p2p.PexCh, msg))
// On the remote end, the pexHandler may choose // On the remote end, the pexHandler may choose
// to add these to its book. // to add these to its book.
} }

View File

@ -1,9 +1,94 @@
package p2p package p2p
import ( import (
"bytes"
"fmt"
"io"
. "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/binary"
) )
/*
A Message is anything that can be serialized.
The resulting serialized bytes of Message don't contain type information,
so messages are typically wrapped in a TypedMessage before put in the wire.
*/
type Message interface { type Message interface {
Binary Binary
} }
/*
A TypedMessage extends a Message with a single byte of type information.
When deserializing a message from the wire, a switch statement is needed
to dispatch to the correct constructor, typically named "ReadXXXMessage".
*/
type TypedMessage struct {
Type Byte
Message Message
}
func (tm TypedMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteTo(tm.Type, w, n, err)
n, err = WriteTo(tm.Message, w, n, err)
return
}
func (tm TypedMessage) String() string {
return fmt.Sprintf("0x%X⋺%v", tm.Type, tm.Message)
}
//-----------------------------------------------------------------------------
/*
Packet encapsulates a ByteSlice on a Channel.
Typically the Bytes are the serialized form of a TypedMessage.
*/
type Packet struct {
Channel String
Bytes ByteSlice
// Hash
}
func NewPacket(chName String, msg Binary) Packet {
msgBytes := BinaryBytes(msg)
return Packet{
Channel: chName,
Bytes: msgBytes,
}
}
func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteTo(p.Channel, w, n, err)
n, err = WriteTo(p.Bytes, w, n, err)
return
}
func (p Packet) Reader() io.Reader {
return bytes.NewReader(p.Bytes)
}
func (p Packet) String() string {
return fmt.Sprintf("%v:%X", p.Channel, p.Bytes)
}
func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
chName, err := ReadStringSafe(r)
if err != nil {
return
}
// TODO: packet length sanity check.
bytes, err := ReadByteSliceSafe(r)
if err != nil {
return
}
return Packet{Channel: chName, Bytes: bytes}, nil
}
/*
InboundPacket extends Packet with fields relevant to inbound packets.
*/
type InboundPacket struct {
Peer *Peer
Time Time
Packet
}

View File

@ -62,8 +62,8 @@ func ReadNetAddress(r io.Reader) *NetAddress {
} }
func (na *NetAddress) WriteTo(w io.Writer) (n int64, err error) { func (na *NetAddress) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(ByteSlice(na.IP.To16()), w, n, err) n, err = WriteTo(ByteSlice(na.IP.To16()), w, n, err)
n, err = WriteOnto(na.Port, w, n, err) n, err = WriteTo(na.Port, w, n, err)
return return
} }

View File

@ -1,7 +1,6 @@
package p2p package p2p
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"sync/atomic" "sync/atomic"
@ -212,58 +211,3 @@ func (c *Channel) RecvQueue() <-chan Packet {
func (c *Channel) SendQueue() chan<- Packet { func (c *Channel) SendQueue() chan<- Packet {
return c.sendQueue return c.sendQueue
} }
//-----------------------------------------------------------------------------
/*
Packet encapsulates a ByteSlice on a Channel.
*/
type Packet struct {
Channel String
Bytes ByteSlice
// Hash
}
func NewPacket(chName String, msg Binary) Packet {
msgBytes := BinaryBytes(msg)
return Packet{
Channel: chName,
Bytes: msgBytes,
}
}
func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(p.Channel, w, n, err)
n, err = WriteOnto(p.Bytes, w, n, err)
return
}
func (p Packet) Reader() io.Reader {
return bytes.NewReader(p.Bytes)
}
func (p Packet) String() string {
return fmt.Sprintf("%v:%X", p.Channel, p.Bytes)
}
func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
chName, err := ReadStringSafe(r)
if err != nil {
return
}
// TODO: packet length sanity check.
bytes, err := ReadByteSliceSafe(r)
if err != nil {
return
}
return Packet{Channel: chName, Bytes: bytes}, nil
}
/*
InboundPacket extends Packet with fields relevant to inbound packets.
*/
type InboundPacket struct {
Peer *Peer
Time Time
Packet
}

View File

@ -15,7 +15,7 @@ import (
var pexErrInvalidMessage = errors.New("Invalid PEX message") var pexErrInvalidMessage = errors.New("Invalid PEX message")
const ( const (
PexCh = "PEX" pexCh = "PEX"
ensurePeersPeriodSeconds = 30 ensurePeersPeriodSeconds = 30
minNumOutboundPeers = 10 minNumOutboundPeers = 10
maxNumPeers = 50 maxNumPeers = 50
@ -44,7 +44,7 @@ func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager {
func (pm *PeerManager) Start() { func (pm *PeerManager) Start() {
if atomic.CompareAndSwapUint32(&pm.started, 0, 1) { if atomic.CompareAndSwapUint32(&pm.started, 0, 1) {
log.Info("Starting peerManager") log.Info("Starting PeerManager")
go pm.ensurePeersHandler() go pm.ensurePeersHandler()
go pm.pexHandler() go pm.pexHandler()
} }
@ -52,11 +52,24 @@ func (pm *PeerManager) Start() {
func (pm *PeerManager) Stop() { func (pm *PeerManager) Stop() {
if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) {
log.Info("Stopping peerManager") log.Info("Stopping PeerManager")
close(pm.quit) close(pm.quit)
} }
} }
// Asks peer for more addresses.
func (pm *PeerManager) RequestPEX(peer *Peer) {
msg := &pexRequestMessage{}
tm := TypedMessage{msgTypeRequest, msg}
peer.TrySend(NewPacket(pexCh, tm))
}
func (pm *PeerManager) SendAddrs(peer *Peer, addrs []*NetAddress) {
msg := &pexAddrsMessage{Addrs: addrs}
tm := TypedMessage{msgTypeAddrs, msg}
peer.Send(NewPacket(pexCh, tm))
}
// Ensures that sufficient peers are connected. (continuous) // Ensures that sufficient peers are connected. (continuous)
func (pm *PeerManager) ensurePeersHandler() { func (pm *PeerManager) ensurePeersHandler() {
// fire once immediately. // fire once immediately.
@ -124,11 +137,11 @@ func (pm *PeerManager) ensurePeers() {
} }
} }
// Handles incoming Pex messages. // Handles incoming PEX messages.
func (pm *PeerManager) pexHandler() { func (pm *PeerManager) pexHandler() {
for { for {
inPkt := pm.sw.Receive(PexCh) // {Peer, Time, Packet} inPkt := pm.sw.Receive(pexCh) // {Peer, Time, Packet}
if inPkt == nil { if inPkt == nil {
// Client has stopped // Client has stopped
break break
@ -139,22 +152,22 @@ func (pm *PeerManager) pexHandler() {
log.Info("pexHandler received %v", msg) log.Info("pexHandler received %v", msg)
switch msg.(type) { switch msg.(type) {
case *PexRequestMessage: case *pexRequestMessage:
// inPkt.Peer requested some peers. // inPkt.Peer requested some peers.
// TODO: prevent abuse. // TODO: prevent abuse.
addrs := pm.book.GetSelection() addrs := pm.book.GetSelection()
response := &PexAddrsMessage{Addrs: addrs} msg := &pexAddrsMessage{Addrs: addrs}
pkt := NewPacket(PexCh, response) tm := TypedMessage{msgTypeRequest, msg}
queued := inPkt.Peer.TrySend(pkt) queued := inPkt.Peer.TrySend(NewPacket(pexCh, tm))
if !queued { if !queued {
// ignore // ignore
} }
case *PexAddrsMessage: case *pexAddrsMessage:
// We received some peer addresses from inPkt.Peer. // We received some peer addresses from inPkt.Peer.
// TODO: prevent abuse. // TODO: prevent abuse.
// (We don't want to get spammed with bad peers) // (We don't want to get spammed with bad peers)
srcAddr := inPkt.Peer.RemoteAddress() srcAddr := inPkt.Peer.RemoteAddress()
for _, addr := range msg.(*PexAddrsMessage).Addrs { for _, addr := range msg.(*pexAddrsMessage).Addrs {
pm.book.AddAddress(addr, srcAddr) pm.book.AddAddress(addr, srcAddr)
} }
default: default:
@ -172,18 +185,18 @@ func (pm *PeerManager) pexHandler() {
/* Messages */ /* Messages */
const ( const (
pexTypeUnknown = Byte(0x00) msgTypeUnknown = Byte(0x00)
pexTypeRequest = Byte(0x01) msgTypeRequest = Byte(0x01)
pexTypeAddrs = Byte(0x02) msgTypeAddrs = Byte(0x02)
) )
// TODO: check for unnecessary extra bytes at the end. // TODO: check for unnecessary extra bytes at the end.
func decodeMessage(bz ByteSlice) (msg Message) { func decodeMessage(bz ByteSlice) (msg Message) {
// log.Debug("decoding msg bytes: %X", bz) // log.Debug("decoding msg bytes: %X", bz)
switch Byte(bz[0]) { switch Byte(bz[0]) {
case pexTypeRequest: case msgTypeRequest:
return &PexRequestMessage{} return &pexRequestMessage{}
case pexTypeAddrs: case msgTypeAddrs:
return readPexAddrsMessage(bytes.NewReader(bz[1:])) return readPexAddrsMessage(bytes.NewReader(bz[1:]))
default: default:
return nil return nil
@ -191,54 +204,46 @@ func decodeMessage(bz ByteSlice) (msg Message) {
} }
/* /*
A PexRequestMessage requests additional peer addresses. A pexRequestMessage requests additional peer addresses.
*/ */
type PexRequestMessage struct { type pexRequestMessage struct {
} }
// TODO: define NewPexRequestPacket instead? func (m *pexRequestMessage) WriteTo(w io.Writer) (n int64, err error) {
return // nothing to write.
func NewPexRequestMessage() *PexRequestMessage {
return &PexRequestMessage{}
} }
func (m *PexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { func (m *pexRequestMessage) String() string {
n, err = WriteOnto(pexTypeRequest, w, n, err) return "[pexRequest]"
return
}
func (m *PexRequestMessage) String() string {
return "[PexRequest]"
} }
/* /*
A message with announced peer addresses. A message with announced peer addresses.
*/ */
type PexAddrsMessage struct { type pexAddrsMessage struct {
Addrs []*NetAddress Addrs []*NetAddress
} }
func readPexAddrsMessage(r io.Reader) *PexAddrsMessage { func readPexAddrsMessage(r io.Reader) *pexAddrsMessage {
numAddrs := int(ReadUInt32(r)) numAddrs := int(ReadUInt32(r))
addrs := []*NetAddress{} addrs := []*NetAddress{}
for i := 0; i < numAddrs; i++ { for i := 0; i < numAddrs; i++ {
addr := ReadNetAddress(r) addr := ReadNetAddress(r)
addrs = append(addrs, addr) addrs = append(addrs, addr)
} }
return &PexAddrsMessage{ return &pexAddrsMessage{
Addrs: addrs, Addrs: addrs,
} }
} }
func (m *PexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) { func (m *pexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(pexTypeAddrs, w, n, err) n, err = WriteTo(UInt32(len(m.Addrs)), w, n, err)
n, err = WriteOnto(UInt32(len(m.Addrs)), w, n, err)
for _, addr := range m.Addrs { for _, addr := range m.Addrs {
n, err = WriteOnto(addr, w, n, err) n, err = WriteTo(addr, w, n, err)
} }
return return
} }
func (m *PexAddrsMessage) String() string { func (m *pexAddrsMessage) String() string {
return fmt.Sprintf("[PexAddrs %v]", m.Addrs) return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
} }