P2P now works with Amino

This commit is contained in:
Jae Kwon
2018-03-26 06:40:02 +02:00
parent ced74251e9
commit 901b456151
22 changed files with 193 additions and 200 deletions

View File

@ -8,7 +8,6 @@ import (
"math"
"net"
"reflect"
"runtime/debug"
"sync/atomic"
"time"
@ -19,8 +18,8 @@ import (
)
const (
packetMsgMaxPayloadSizeDefault = 1024
packetMsgMaxOverheadSize = 10 // It's actually lower but good enough
maxPacketMsgPayloadSizeDefault = 1024
maxPacketMsgOverheadSize = 14
numBatchPacketMsgs = 10
minReadBufferSize = 1024
@ -57,15 +56,15 @@ The byte id and the relative priorities of each `Channel` are configured upon
initialization of the connection.
There are two methods for sending messages:
func (m MConnection) Send(chID byte, msg interface{}) bool {}
func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
`Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
for the channel with the given id byte `chID`, or until the request times out.
The message `msg` is serialized using Go-Amino.
`Send(chID, msgBytes)` is a blocking call that waits until `msg` is
successfully queued for the channel with the given id byte `chID`, or until the
request times out. The message `msg` is serialized using Go-Amino.
`TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
queue is full.
`TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
channel's queue is full.
Inbound message bytes are handled with an onReceive callback function.
*/
@ -105,7 +104,7 @@ type MConnConfig struct {
RecvRate int64 `mapstructure:"recv_rate"`
// Maximum payload size
PacketMsgMaxPayloadSize int `mapstructure:"packet_msg_max_payload_size"`
MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"`
// Interval to flush writes (throttled)
FlushThrottle time.Duration `mapstructure:"flush_throttle"`
@ -118,7 +117,7 @@ type MConnConfig struct {
}
func (cfg *MConnConfig) maxPacketMsgTotalSize() int {
return cfg.PacketMsgMaxPayloadSize + packetMsgMaxOverheadSize
return cfg.MaxPacketMsgPayloadSize + maxPacketMsgOverheadSize
}
// DefaultMConnConfig returns the default config.
@ -126,7 +125,7 @@ func DefaultMConnConfig() *MConnConfig {
return &MConnConfig{
SendRate: defaultSendRate,
RecvRate: defaultRecvRate,
PacketMsgMaxPayloadSize: packetMsgMaxPayloadSizeDefault,
MaxPacketMsgPayloadSize: maxPacketMsgPayloadSizeDefault,
FlushThrottle: defaultFlushThrottle,
PingInterval: defaultPingInterval,
PongTimeout: defaultPongTimeout,
@ -233,8 +232,7 @@ func (c *MConnection) flush() {
// Catch panics, usually caused by remote disconnects.
func (c *MConnection) _recover() {
if r := recover(); r != nil {
stack := debug.Stack()
err := cmn.StackError{r, stack}
err := cmn.ErrorWrap(r, "recovered panic in MConnection")
c.stopForError(err)
}
}
@ -249,12 +247,12 @@ func (c *MConnection) stopForError(r interface{}) {
}
// Queues a message to be sent to channel.
func (c *MConnection) Send(chID byte, msg interface{}) bool {
func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
if !c.IsRunning() {
return false
}
c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg)
c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
// Send message to channel.
channel, ok := c.channelsIdx[chID]
@ -263,7 +261,7 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
return false
}
success := channel.sendBytes(cdc.MustMarshalBinary(msg))
success := channel.sendBytes(msgBytes)
if success {
// Wake up sendRoutine if necessary
select {
@ -271,19 +269,19 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
default:
}
} else {
c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
c.Logger.Error("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
}
return success
}
// Queues a message to be sent to channel.
// Nonblocking, returns true if successful.
func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
if !c.IsRunning() {
return false
}
c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
// Send message to channel.
channel, ok := c.channelsIdx[chID]
@ -292,7 +290,7 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
return false
}
ok = channel.trySendBytes(cdc.MustMarshalBinary(msg))
ok = channel.trySendBytes(msgBytes)
if ok {
// Wake up sendRoutine if necessary
select {
@ -462,18 +460,17 @@ FOR_LOOP:
// Block until .recvMonitor says we can read.
c.recvMonitor.Limit(c.config.maxPacketMsgTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
// Peek into bufConnReader for debugging
/*
// Peek into bufConnReader for debugging
if numBytes := c.bufConnReader.Buffered(); numBytes > 0 {
log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
bytes, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
if err == nil {
return bytes
} else {
log.Warn("Error peeking connection buffer", "err", err)
return nil
}
}})
bz, err := c.bufConnReader.Peek(cmn.MinInt(numBytes, 100))
if err == nil {
// return
} else {
c.Logger.Debug("Error peeking connection buffer", "err", err)
// return nil
}
c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz)
}
*/
@ -639,7 +636,7 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
desc: desc,
sendQueue: make(chan []byte, desc.SendQueueCapacity),
recving: make([]byte, 0, desc.RecvBufferCapacity),
maxPacketMsgPayloadSize: conn.config.PacketMsgMaxPayloadSize,
maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize,
}
}
@ -719,7 +716,6 @@ func (ch *Channel) nextPacketMsg() PacketMsg {
// Not goroutine-safe
func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
var packet = ch.nextPacketMsg()
ch.Logger.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
n, err = cdc.MarshalBinaryWriter(w, packet)
ch.recentlySent += n
return
@ -729,7 +725,7 @@ func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
// complete. NOTE message bytes may change on next call to recvPacketMsg.
// Not goroutine-safe
func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
if recvCap < recvReceived {
return nil, fmt.Errorf("Received message exceeds available capacity: %v < %v", recvCap, recvReceived)

View File

@ -1,6 +1,7 @@
package conn
import (
"bytes"
"net"
"testing"
"time"
@ -41,7 +42,7 @@ func TestMConnectionSend(t *testing.T) {
require.Nil(t, err)
defer mconn.Stop()
msg := "Ant-Man"
msg := []byte("Ant-Man")
assert.True(t, mconn.Send(0x01, msg))
// Note: subsequent Send/TrySend calls could pass because we are reading from
// the send queue in a separate goroutine.
@ -51,7 +52,7 @@ func TestMConnectionSend(t *testing.T) {
}
assert.True(t, mconn.CanSend(0x01))
msg = "Spider-Man"
msg = []byte("Spider-Man")
assert.True(t, mconn.TrySend(0x01, msg))
_, err = server.Read(make([]byte, len(msg)))
if err != nil {
@ -59,7 +60,7 @@ func TestMConnectionSend(t *testing.T) {
}
assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
assert.False(t, mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
}
func TestMConnectionReceive(t *testing.T) {
@ -85,12 +86,12 @@ func TestMConnectionReceive(t *testing.T) {
require.Nil(t, err)
defer mconn2.Stop()
msg := "Cyclops"
msg := []byte("Cyclops")
assert.True(t, mconn2.Send(0x01, msg))
select {
case receivedBytes := <-receivedCh:
assert.Equal(t, []byte(msg), receivedBytes[2:]) // first 3 bytes are internal
assert.Equal(t, []byte(msg), receivedBytes)
case err := <-errorsCh:
t.Fatalf("Expected %s, got %+v", msg, err)
case <-time.After(500 * time.Millisecond):
@ -386,7 +387,7 @@ func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
defer mconnClient.Stop()
defer mconnServer.Stop()
msg := "Ant-Man"
msg := []byte("Ant-Man")
// fail to send msg on channel unknown by client
assert.False(t, mconnClient.Send(0x03, msg))
@ -413,23 +414,40 @@ func TestMConnectionReadErrorLongMessage(t *testing.T) {
// send msg thats just right
var err error
var buf = new(bytes.Buffer)
// - Uvarint length of MustMarshalBinary(packet) = 1 or 2 bytes
// (as long as it's less than 16,384 bytes)
// - Prefix bytes = 4 bytes
// - ChannelID field key + byte = 2 bytes
// - EOF field key + byte = 2 bytes
// - Bytes field key = 1 bytes
// - Uvarint length of MustMarshalBinary(bytes) = 1 or 2 bytes
// - Struct terminator = 1 byte
// = up to 14 bytes overhead for the packet.
var packet = PacketMsg{
ChannelID: 0x01,
Bytes: make([]byte, mconnClient.config.maxPacketMsgTotalSize()-12),
EOF: 1,
Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
}
_, err = cdc.MarshalBinaryWriter(client, packet)
_, err = cdc.MarshalBinaryWriter(buf, packet)
assert.Nil(t, err)
_, err = client.Write(buf.Bytes())
assert.Nil(t, err)
assert.True(t, expectSend(chOnRcv), "msg just right")
assert.False(t, expectSend(chOnErr), "msg just right")
// send msg thats too long
buf = new(bytes.Buffer)
packet = PacketMsg{
ChannelID: 0x01,
Bytes: make([]byte, mconnClient.config.maxPacketMsgTotalSize()-11),
EOF: 1,
Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+1),
}
_, err = cdc.MarshalBinaryWriter(client, packet)
_, err = cdc.MarshalBinaryWriter(buf, packet)
assert.Nil(t, err)
_, err = client.Write(buf.Bytes())
assert.NotNil(t, err)
assert.False(t, expectSend(chOnRcv), "msg too long")
assert.True(t, expectSend(chOnErr), "msg too long")
}
@ -458,7 +476,7 @@ func TestMConnectionTrySend(t *testing.T) {
require.Nil(t, err)
defer mconn.Stop()
msg := "Semicolon-Woman"
msg := []byte("Semicolon-Woman")
resultCh := make(chan string, 2)
assert.True(t, mconn.TrySend(0x01, msg))
server.Read(make([]byte, len(msg)))

View File

@ -12,7 +12,6 @@ import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"time"
@ -145,8 +144,8 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) {
// CONTRACT: data smaller than dataMaxSize is read atomically.
func (sc *SecretConnection) Read(data []byte) (n int, err error) {
if 0 < len(sc.recvBuffer) {
n_ := copy(data, sc.recvBuffer)
sc.recvBuffer = sc.recvBuffer[n_:]
n = copy(data, sc.recvBuffer)
sc.recvBuffer = sc.recvBuffer[n:]
return
}
@ -193,7 +192,7 @@ func genEphKeys() (ephPub, ephPriv *[32]byte) {
var err error
ephPub, ephPriv, err = box.GenerateKey(crand.Reader)
if err != nil {
cmn.PanicCrisis("Could not generate ephemeral keypairs")
panic("Could not generate ephemeral keypairs")
}
return
}
@ -225,9 +224,6 @@ func shareEphPubKey(conn io.ReadWriteCloser, locEphPub *[32]byte) (remEphPub *[3
if trs.FirstError() != nil {
err = trs.FirstError()
return
} else if trs.FirstPanic() != nil {
err = fmt.Errorf("Panic: %v", trs.FirstPanic())
return
}
// Otherwise:
@ -309,9 +305,6 @@ func shareAuthSignature(sc *SecretConnection, pubKey crypto.PubKey, signature cr
if trs.FirstError() != nil {
err = trs.FirstError()
return
} else if trs.FirstPanic() != nil {
err = fmt.Errorf("Panic: %v", trs.FirstPanic())
return
}
var _recvMsg = trs.FirstValue().(authSigMessage)

View File

@ -73,7 +73,6 @@ func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection
return nil, nil, false
},
)
require.Nil(tb, trs.FirstPanic())
require.Nil(tb, trs.FirstError())
require.True(tb, ok, "Unexpected task abortion")
@ -158,9 +157,6 @@ func TestSecretConnectionReadWrite(t *testing.T) {
// If error:
if trs.FirstError() != nil {
return nil, trs.FirstError(), true
} else if trs.FirstPanic() != nil {
err = fmt.Errorf("Panic in task: %v", trs.FirstPanic())
return nil, err, true
}
// Otherwise:
@ -173,7 +169,6 @@ func TestSecretConnectionReadWrite(t *testing.T) {
genNodeRunner("foo", fooConn, fooWrites, &fooReads),
genNodeRunner("bar", barConn, barWrites, &barReads),
)
require.Nil(t, trs.FirstPanic())
require.Nil(t, trs.FirstError())
require.True(t, ok, "unexpected task abortion")

View File

@ -5,10 +5,9 @@ import (
"github.com/tendermint/go-crypto"
)
var cdc *amino.Codec
var cdc *amino.Codec = amino.NewCodec()
func init() {
cdc = amino.NewCodec()
crypto.RegisterAmino(cdc)
RegisterPacket(cdc)
}