mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 14:52:17 +00:00
new tmlibs Parallel implementation
This commit is contained in:
parent
49986b05bc
commit
22949e6dfd
@ -7,7 +7,6 @@ import (
|
||||
"io"
|
||||
"math"
|
||||
"net"
|
||||
"runtime/debug"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -230,8 +229,7 @@ func (c *MConnection) flush() {
|
||||
// Catch panics, usually caused by remote disconnects.
|
||||
func (c *MConnection) _recover() {
|
||||
if r := recover(); r != nil {
|
||||
stack := debug.Stack()
|
||||
err := cmn.StackError{r, stack}
|
||||
err := cmn.ErrorWrap(r, "recovered from panic")
|
||||
c.stopForError(err)
|
||||
}
|
||||
}
|
||||
|
@ -20,8 +20,8 @@ import (
|
||||
"golang.org/x/crypto/nacl/secretbox"
|
||||
"golang.org/x/crypto/ripemd160"
|
||||
|
||||
"github.com/tendermint/go-crypto"
|
||||
"github.com/tendermint/go-wire"
|
||||
crypto "github.com/tendermint/go-crypto"
|
||||
wire "github.com/tendermint/go-wire"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
)
|
||||
|
||||
@ -200,26 +200,36 @@ func genEphKeys() (ephPub, ephPriv *[32]byte) {
|
||||
}
|
||||
|
||||
func shareEphPubKey(conn io.ReadWriteCloser, locEphPub *[32]byte) (remEphPub *[32]byte, err error) {
|
||||
var err1, err2 error
|
||||
|
||||
cmn.Parallel(
|
||||
func() {
|
||||
_, err1 = conn.Write(locEphPub[:])
|
||||
// Send our pubkey and receive theirs in tandem.
|
||||
var trs, _ = cmn.Parallel(
|
||||
func(_ int) (val interface{}, err error, abort bool) {
|
||||
var _, err1 = conn.Write(locEphPub[:])
|
||||
if err1 != nil {
|
||||
return nil, err1, true // abort
|
||||
} else {
|
||||
return nil, nil, false
|
||||
}
|
||||
},
|
||||
func() {
|
||||
remEphPub = new([32]byte)
|
||||
_, err2 = io.ReadFull(conn, remEphPub[:])
|
||||
func(_ int) (val interface{}, err error, abort bool) {
|
||||
var _remEphPub [32]byte
|
||||
var _, err2 = io.ReadFull(conn, _remEphPub[:])
|
||||
if err2 != nil {
|
||||
return nil, err2, true // abort
|
||||
} else {
|
||||
return _remEphPub, nil, false
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
}
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
// If error:
|
||||
if trs.FirstError() != nil {
|
||||
err = trs.FirstError()
|
||||
return
|
||||
}
|
||||
|
||||
return remEphPub, nil
|
||||
// Otherwise:
|
||||
var _remEphPub = trs.FirstValue().([32]byte)
|
||||
return &_remEphPub, nil
|
||||
}
|
||||
|
||||
func computeSharedSecret(remPubKey, locPrivKey *[32]byte) (shrSecret *[32]byte) {
|
||||
@ -268,33 +278,42 @@ type authSigMessage struct {
|
||||
Sig crypto.Signature
|
||||
}
|
||||
|
||||
func shareAuthSignature(sc *SecretConnection, pubKey crypto.PubKey, signature crypto.Signature) (*authSigMessage, error) {
|
||||
var recvMsg authSigMessage
|
||||
var err1, err2 error
|
||||
|
||||
cmn.Parallel(
|
||||
func() {
|
||||
func shareAuthSignature(sc *SecretConnection, pubKey crypto.PubKey, signature crypto.Signature) (recvMsg *authSigMessage, err error) {
|
||||
// Send our info and receive theirs in tandem.
|
||||
var trs, _ = cmn.Parallel(
|
||||
func(_ int) (val interface{}, err error, abort bool) {
|
||||
msgBytes := wire.BinaryBytes(authSigMessage{pubKey.Wrap(), signature.Wrap()})
|
||||
_, err1 = sc.Write(msgBytes)
|
||||
var _, err1 = sc.Write(msgBytes)
|
||||
if err1 != nil {
|
||||
return nil, err1, true // abort
|
||||
} else {
|
||||
return nil, nil, false
|
||||
}
|
||||
},
|
||||
func() {
|
||||
func(_ int) (val interface{}, err error, abort bool) {
|
||||
readBuffer := make([]byte, authSigMsgSize)
|
||||
_, err2 = io.ReadFull(sc, readBuffer)
|
||||
var _, err2 = io.ReadFull(sc, readBuffer)
|
||||
if err2 != nil {
|
||||
return
|
||||
return nil, err2, true // abort
|
||||
}
|
||||
n := int(0) // not used.
|
||||
recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), authSigMsgSize, &n, &err2).(authSigMessage)
|
||||
})
|
||||
var _recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), authSigMsgSize, &n, &err2).(authSigMessage)
|
||||
if err2 != nil {
|
||||
return nil, err2, true // abort
|
||||
} else {
|
||||
return _recvMsg, nil, false
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
}
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
// If error:
|
||||
if trs.FirstError() != nil {
|
||||
err = trs.FirstError()
|
||||
return
|
||||
}
|
||||
|
||||
return &recvMsg, nil
|
||||
var _recvMsg = trs.FirstValue().(authSigMessage)
|
||||
return &_recvMsg, nil
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
@ -1,9 +1,12 @@
|
||||
package conn
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
crypto "github.com/tendermint/go-crypto"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
)
|
||||
@ -36,33 +39,41 @@ func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection
|
||||
barPrvKey := crypto.GenPrivKeyEd25519().Wrap()
|
||||
barPubKey := barPrvKey.PubKey()
|
||||
|
||||
cmn.Parallel(
|
||||
func() {
|
||||
var err error
|
||||
var trs, ok = cmn.Parallel(
|
||||
func(_ int) (val interface{}, err error, abort bool) {
|
||||
fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey)
|
||||
if err != nil {
|
||||
tb.Errorf("Failed to establish SecretConnection for foo: %v", err)
|
||||
return
|
||||
return nil, err, true
|
||||
}
|
||||
remotePubBytes := fooSecConn.RemotePubKey()
|
||||
if !remotePubBytes.Equals(barPubKey) {
|
||||
tb.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v",
|
||||
err = fmt.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v",
|
||||
barPubKey, fooSecConn.RemotePubKey())
|
||||
tb.Error(err)
|
||||
return nil, err, false
|
||||
}
|
||||
return nil, nil, false
|
||||
},
|
||||
func() {
|
||||
var err error
|
||||
func(_ int) (val interface{}, err error, abort bool) {
|
||||
barSecConn, err = MakeSecretConnection(barConn, barPrvKey)
|
||||
if barSecConn == nil {
|
||||
tb.Errorf("Failed to establish SecretConnection for bar: %v", err)
|
||||
return
|
||||
return nil, err, true
|
||||
}
|
||||
remotePubBytes := barSecConn.RemotePubKey()
|
||||
if !remotePubBytes.Equals(fooPubKey) {
|
||||
tb.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v",
|
||||
err = fmt.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v",
|
||||
fooPubKey, barSecConn.RemotePubKey())
|
||||
tb.Error(err)
|
||||
return nil, nil, false
|
||||
}
|
||||
})
|
||||
return nil, nil, false
|
||||
},
|
||||
)
|
||||
|
||||
require.Nil(tb, trs.FirstError())
|
||||
require.True(tb, ok, "Unexpected task abortion")
|
||||
|
||||
return
|
||||
}
|
||||
@ -89,59 +100,76 @@ func TestSecretConnectionReadWrite(t *testing.T) {
|
||||
}
|
||||
|
||||
// A helper that will run with (fooConn, fooWrites, fooReads) and vice versa
|
||||
genNodeRunner := func(nodeConn kvstoreConn, nodeWrites []string, nodeReads *[]string) func() {
|
||||
return func() {
|
||||
genNodeRunner := func(nodeConn kvstoreConn, nodeWrites []string, nodeReads *[]string) cmn.Task {
|
||||
return func(_ int) (interface{}, error, bool) {
|
||||
// Node handskae
|
||||
nodePrvKey := crypto.GenPrivKeyEd25519().Wrap()
|
||||
nodeSecretConn, err := MakeSecretConnection(nodeConn, nodePrvKey)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to establish SecretConnection for node: %v", err)
|
||||
return
|
||||
return nil, err, true
|
||||
}
|
||||
// In parallel, handle reads and writes
|
||||
cmn.Parallel(
|
||||
func() {
|
||||
// In parallel, handle some reads and writes.
|
||||
var trs, ok = cmn.Parallel(
|
||||
func(_ int) (interface{}, error, bool) {
|
||||
// Node writes
|
||||
for _, nodeWrite := range nodeWrites {
|
||||
n, err := nodeSecretConn.Write([]byte(nodeWrite))
|
||||
if err != nil {
|
||||
t.Errorf("Failed to write to nodeSecretConn: %v", err)
|
||||
return
|
||||
return nil, err, true
|
||||
}
|
||||
if n != len(nodeWrite) {
|
||||
t.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n)
|
||||
return
|
||||
err = fmt.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n)
|
||||
t.Error(err)
|
||||
return nil, err, true
|
||||
}
|
||||
}
|
||||
if err := nodeConn.PipeWriter.Close(); err != nil {
|
||||
t.Error(err)
|
||||
return nil, err, true
|
||||
}
|
||||
return nil, nil, false
|
||||
},
|
||||
func() {
|
||||
func(_ int) (interface{}, error, bool) {
|
||||
// Node reads
|
||||
readBuffer := make([]byte, dataMaxSize)
|
||||
for {
|
||||
n, err := nodeSecretConn.Read(readBuffer)
|
||||
if err == io.EOF {
|
||||
return
|
||||
return nil, nil, false
|
||||
} else if err != nil {
|
||||
t.Errorf("Failed to read from nodeSecretConn: %v", err)
|
||||
return
|
||||
return nil, err, true
|
||||
}
|
||||
*nodeReads = append(*nodeReads, string(readBuffer[:n]))
|
||||
}
|
||||
if err := nodeConn.PipeReader.Close(); err != nil {
|
||||
t.Error(err)
|
||||
return nil, err, true
|
||||
}
|
||||
})
|
||||
return nil, nil, false
|
||||
},
|
||||
)
|
||||
assert.True(t, ok, "Unexpected task abortion")
|
||||
|
||||
// If error:
|
||||
if trs.FirstError() != nil {
|
||||
return nil, trs.FirstError(), true
|
||||
}
|
||||
|
||||
// Otherwise:
|
||||
return nil, nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// Run foo & bar in parallel
|
||||
cmn.Parallel(
|
||||
var trs, ok = cmn.Parallel(
|
||||
genNodeRunner(fooConn, fooWrites, &fooReads),
|
||||
genNodeRunner(barConn, barWrites, &barReads),
|
||||
)
|
||||
require.Nil(t, trs.FirstError())
|
||||
require.True(t, ok, "unexpected task abortion")
|
||||
|
||||
// A helper to ensure that the writes and reads match.
|
||||
// Additionally, small writes (<= dataMaxSize) must be atomically read.
|
||||
|
24
p2p/peer.go
24
p2p/peer.go
@ -293,22 +293,20 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration
|
||||
return peerNodeInfo, errors.Wrap(err, "Error setting deadline")
|
||||
}
|
||||
|
||||
var err1 error
|
||||
var err2 error
|
||||
cmn.Parallel(
|
||||
func() {
|
||||
var trs, _ = cmn.Parallel(
|
||||
func(_ int) (val interface{}, err error, abort bool) {
|
||||
var n int
|
||||
wire.WriteBinary(&ourNodeInfo, pc.conn, &n, &err1)
|
||||
wire.WriteBinary(&ourNodeInfo, pc.conn, &n, &err)
|
||||
return
|
||||
},
|
||||
func() {
|
||||
func(_ int) (val interface{}, err error, abort bool) {
|
||||
var n int
|
||||
wire.ReadBinary(&peerNodeInfo, pc.conn, MaxNodeInfoSize(), &n, &err2)
|
||||
})
|
||||
if err1 != nil {
|
||||
return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
|
||||
}
|
||||
if err2 != nil {
|
||||
return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
|
||||
wire.ReadBinary(&peerNodeInfo, pc.conn, MaxNodeInfoSize(), &n, &err)
|
||||
return
|
||||
},
|
||||
)
|
||||
if err := trs.FirstError(); err != nil {
|
||||
return peerNodeInfo, errors.Wrap(err, "Error during handshake")
|
||||
}
|
||||
|
||||
// Remove deadline
|
||||
|
Loading…
x
Reference in New Issue
Block a user