mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 14:52:17 +00:00
p2p/conn: Add Bufferpool (#3664)
* use byte buffer pool to decreass allocs * wrap to put buffer in defer * wapper defer * add dependency * remove Gopkg,* * add change log
This commit is contained in:
parent
5d7e22a53c
commit
17b69d4d56
@ -22,5 +22,6 @@ program](https://hackerone.com/tendermint).
|
||||
|
||||
- [abci] \#3809 Recover from application panics in `server/socket_server.go` to allow socket cleanup (@ruseinov)
|
||||
- [rpc] \#3818 Make `max_body_bytes` and `max_header_bytes` configurable
|
||||
- [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection
|
||||
|
||||
### BUG FIXES:
|
||||
|
1
go.mod
1
go.mod
@ -18,6 +18,7 @@ require (
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect
|
||||
github.com/libp2p/go-buffer-pool v0.0.1
|
||||
github.com/magiconair/properties v1.8.0
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/mitchellh/mapstructure v1.1.2 // indirect
|
||||
|
2
go.sum
2
go.sum
@ -64,6 +64,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
|
||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/libp2p/go-buffer-pool v0.0.1 h1:9Rrn/H46cXjaA2HQ5Y8lyhOS1NhTkZ4yuEs2r3Eechg=
|
||||
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
|
||||
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
|
||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||
|
@ -2,6 +2,7 @@ package conn
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/cipher"
|
||||
crand "crypto/rand"
|
||||
"crypto/sha256"
|
||||
"crypto/subtle"
|
||||
@ -17,6 +18,7 @@ import (
|
||||
"golang.org/x/crypto/curve25519"
|
||||
"golang.org/x/crypto/nacl/box"
|
||||
|
||||
pool "github.com/libp2p/go-buffer-pool"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"golang.org/x/crypto/hkdf"
|
||||
@ -47,10 +49,11 @@ var (
|
||||
type SecretConnection struct {
|
||||
|
||||
// immutable
|
||||
recvSecret *[aeadKeySize]byte
|
||||
sendSecret *[aeadKeySize]byte
|
||||
remPubKey crypto.PubKey
|
||||
conn io.ReadWriteCloser
|
||||
recvAead cipher.AEAD
|
||||
sendAead cipher.AEAD
|
||||
|
||||
remPubKey crypto.PubKey
|
||||
conn io.ReadWriteCloser
|
||||
|
||||
// net.Conn must be thread safe:
|
||||
// https://golang.org/pkg/net/#Conn.
|
||||
@ -102,14 +105,22 @@ func MakeSecretConnection(conn io.ReadWriteCloser, locPrivKey crypto.PrivKey) (*
|
||||
// generate the secret used for receiving, sending, challenge via hkdf-sha2 on dhSecret
|
||||
recvSecret, sendSecret, challenge := deriveSecretAndChallenge(dhSecret, locIsLeast)
|
||||
|
||||
sendAead, err := chacha20poly1305.New(sendSecret[:])
|
||||
if err != nil {
|
||||
return nil, errors.New("Invalid send SecretConnection Key")
|
||||
}
|
||||
recvAead, err := chacha20poly1305.New(recvSecret[:])
|
||||
if err != nil {
|
||||
return nil, errors.New("Invalid receive SecretConnection Key")
|
||||
}
|
||||
// Construct SecretConnection.
|
||||
sc := &SecretConnection{
|
||||
conn: conn,
|
||||
recvBuffer: nil,
|
||||
recvNonce: new([aeadNonceSize]byte),
|
||||
sendNonce: new([aeadNonceSize]byte),
|
||||
recvSecret: recvSecret,
|
||||
sendSecret: sendSecret,
|
||||
recvAead: recvAead,
|
||||
sendAead: sendAead,
|
||||
}
|
||||
|
||||
// Sign the challenge bytes for authentication.
|
||||
@ -143,35 +154,39 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) {
|
||||
defer sc.sendMtx.Unlock()
|
||||
|
||||
for 0 < len(data) {
|
||||
var frame = make([]byte, totalFrameSize)
|
||||
var chunk []byte
|
||||
if dataMaxSize < len(data) {
|
||||
chunk = data[:dataMaxSize]
|
||||
data = data[dataMaxSize:]
|
||||
} else {
|
||||
chunk = data
|
||||
data = nil
|
||||
}
|
||||
chunkLength := len(chunk)
|
||||
binary.LittleEndian.PutUint32(frame, uint32(chunkLength))
|
||||
copy(frame[dataLenSize:], chunk)
|
||||
if err := func() error {
|
||||
var sealedFrame = pool.Get(aeadSizeOverhead + totalFrameSize)
|
||||
var frame = pool.Get(totalFrameSize)
|
||||
defer func() {
|
||||
pool.Put(sealedFrame)
|
||||
pool.Put(frame)
|
||||
}()
|
||||
var chunk []byte
|
||||
if dataMaxSize < len(data) {
|
||||
chunk = data[:dataMaxSize]
|
||||
data = data[dataMaxSize:]
|
||||
} else {
|
||||
chunk = data
|
||||
data = nil
|
||||
}
|
||||
chunkLength := len(chunk)
|
||||
binary.LittleEndian.PutUint32(frame, uint32(chunkLength))
|
||||
copy(frame[dataLenSize:], chunk)
|
||||
|
||||
aead, err := chacha20poly1305.New(sc.sendSecret[:])
|
||||
if err != nil {
|
||||
return n, errors.New("Invalid SecretConnection Key")
|
||||
}
|
||||
// encrypt the frame
|
||||
sc.sendAead.Seal(sealedFrame[:0], sc.sendNonce[:], frame, nil)
|
||||
incrNonce(sc.sendNonce)
|
||||
// end encryption
|
||||
|
||||
// encrypt the frame
|
||||
var sealedFrame = make([]byte, aeadSizeOverhead+totalFrameSize)
|
||||
aead.Seal(sealedFrame[:0], sc.sendNonce[:], frame, nil)
|
||||
incrNonce(sc.sendNonce)
|
||||
// end encryption
|
||||
|
||||
_, err = sc.conn.Write(sealedFrame)
|
||||
if err != nil {
|
||||
_, err = sc.conn.Write(sealedFrame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n += len(chunk)
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return n, err
|
||||
}
|
||||
n += len(chunk)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -189,21 +204,18 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
// read off the conn
|
||||
sealedFrame := make([]byte, totalFrameSize+aeadSizeOverhead)
|
||||
var sealedFrame = pool.Get(aeadSizeOverhead + totalFrameSize)
|
||||
defer pool.Put(sealedFrame)
|
||||
_, err = io.ReadFull(sc.conn, sealedFrame)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
aead, err := chacha20poly1305.New(sc.recvSecret[:])
|
||||
if err != nil {
|
||||
return n, errors.New("Invalid SecretConnection Key")
|
||||
}
|
||||
|
||||
// decrypt the frame.
|
||||
// reads and updates the sc.recvNonce
|
||||
var frame = make([]byte, totalFrameSize)
|
||||
_, err = aead.Open(frame[:0], sc.recvNonce[:], sealedFrame, nil)
|
||||
var frame = pool.Get(totalFrameSize)
|
||||
defer pool.Put(frame)
|
||||
_, err = sc.recvAead.Open(frame[:0], sc.recvNonce[:], sealedFrame, nil)
|
||||
if err != nil {
|
||||
return n, errors.New("Failed to decrypt SecretConnection")
|
||||
}
|
||||
@ -218,7 +230,10 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
|
||||
}
|
||||
var chunk = frame[dataLenSize : dataLenSize+chunkLength]
|
||||
n = copy(data, chunk)
|
||||
sc.recvBuffer = chunk[n:]
|
||||
if n < len(chunk) {
|
||||
sc.recvBuffer = make([]byte, len(chunk)-n)
|
||||
copy(sc.recvBuffer, chunk[n:])
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -383,10 +383,23 @@ func createGoldenTestVectors(t *testing.T) string {
|
||||
return data
|
||||
}
|
||||
|
||||
func BenchmarkSecretConnection(b *testing.B) {
|
||||
func BenchmarkWriteSecretConnection(b *testing.B) {
|
||||
b.StopTimer()
|
||||
b.ReportAllocs()
|
||||
fooSecConn, barSecConn := makeSecretConnPair(b)
|
||||
fooWriteText := cmn.RandStr(dataMaxSize)
|
||||
randomMsgSizes := []int{
|
||||
dataMaxSize / 10,
|
||||
dataMaxSize / 3,
|
||||
dataMaxSize / 2,
|
||||
dataMaxSize,
|
||||
dataMaxSize * 3 / 2,
|
||||
dataMaxSize * 2,
|
||||
dataMaxSize * 7 / 2,
|
||||
}
|
||||
fooWriteBytes := make([][]byte, 0, len(randomMsgSizes))
|
||||
for _, size := range randomMsgSizes {
|
||||
fooWriteBytes = append(fooWriteBytes, cmn.RandBytes(size))
|
||||
}
|
||||
// Consume reads from bar's reader
|
||||
go func() {
|
||||
readBuffer := make([]byte, dataMaxSize)
|
||||
@ -402,7 +415,8 @@ func BenchmarkSecretConnection(b *testing.B) {
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := fooSecConn.Write([]byte(fooWriteText))
|
||||
idx := cmn.RandIntn(len(fooWriteBytes))
|
||||
_, err := fooSecConn.Write(fooWriteBytes[idx])
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to write to fooSecConn: %v", err)
|
||||
}
|
||||
@ -414,3 +428,44 @@ func BenchmarkSecretConnection(b *testing.B) {
|
||||
}
|
||||
//barSecConn.Close() race condition
|
||||
}
|
||||
|
||||
func BenchmarkReadSecretConnection(b *testing.B) {
|
||||
b.StopTimer()
|
||||
b.ReportAllocs()
|
||||
fooSecConn, barSecConn := makeSecretConnPair(b)
|
||||
randomMsgSizes := []int{
|
||||
dataMaxSize / 10,
|
||||
dataMaxSize / 3,
|
||||
dataMaxSize / 2,
|
||||
dataMaxSize,
|
||||
dataMaxSize * 3 / 2,
|
||||
dataMaxSize * 2,
|
||||
dataMaxSize * 7 / 2,
|
||||
}
|
||||
fooWriteBytes := make([][]byte, 0, len(randomMsgSizes))
|
||||
for _, size := range randomMsgSizes {
|
||||
fooWriteBytes = append(fooWriteBytes, cmn.RandBytes(size))
|
||||
}
|
||||
go func() {
|
||||
for i := 0; i < b.N; i++ {
|
||||
idx := cmn.RandIntn(len(fooWriteBytes))
|
||||
_, err := fooSecConn.Write(fooWriteBytes[idx])
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to write to fooSecConn: %v, %v,%v", err, i, b.N)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
readBuffer := make([]byte, dataMaxSize)
|
||||
_, err := barSecConn.Read(readBuffer)
|
||||
|
||||
if err == io.EOF {
|
||||
return
|
||||
} else if err != nil {
|
||||
b.Fatalf("Failed to read from barSecConn: %v", err)
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user