fixes; load from file with valsets and chains

This commit is contained in:
Ethan Buchman
2016-01-21 22:57:24 -05:00
parent 1af021846e
commit de1d5f6353
6 changed files with 222 additions and 47 deletions

View File

@ -20,7 +20,7 @@ func (tn *TendermintNetwork) newBlockCallback(chainID, valID string) eventmeter.
val, _ := chain.Config.GetValidatorByID(valID) val, _ := chain.Config.GetValidatorByID(valID)
// update height for validator // update height for validator
val.BlockHeight = block.Header.Height val.Status.BlockHeight = block.Header.Height
// possibly update height and mean block time for chain // possibly update height and mean block time for chain
if block.Header.Height > chain.Status.Height { if block.Header.Height > chain.Status.Height {
@ -42,8 +42,8 @@ func (tn *TendermintNetwork) latencyCallback(chainID, valID string) eventmeter.L
// update latency for this validator and avg latency for chain // update latency for this validator and avg latency for chain
mean := chain.Status.MeanLatency * float64(chain.Status.NumValidators) mean := chain.Status.MeanLatency * float64(chain.Status.NumValidators)
mean = (mean - val.Latency + latency) / float64(chain.Status.NumValidators) mean = (mean - val.Status.Latency + latency) / float64(chain.Status.NumValidators)
val.Latency = latency val.Status.Latency = latency
chain.Status.MeanLatency = mean chain.Status.MeanLatency = mean
// TODO: possibly update active nodes and uptime for chain // TODO: possibly update active nodes and uptime for chain

View File

@ -110,12 +110,13 @@ func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig)
// start the event meter and listen for new blocks on each validator // start the event meter and listen for new blocks on each validator
for _, v := range chainConfig.Validators { for _, v := range chainConfig.Validators {
v.Status = &types.ValidatorStatus{}
if err := v.Start(); err != nil { if err := v.Start(); err != nil {
return nil, err return nil, err
} }
v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainConfig.ID, v.Validator.ID)) v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainConfig.ID, v.Config.Validator.ID))
err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainConfig.ID, v.Validator.ID)) err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainConfig.ID, v.Config.Validator.ID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -137,6 +138,13 @@ func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorS
return valSet, nil return valSet, nil
} }
func (tn *TendermintNetwork) RegisterValidatorSet(valSet *types.ValidatorSet) (*types.ValidatorSet, error) {
tn.mtx.Lock()
defer tn.mtx.Unlock()
tn.ValSets[valSet.ID] = valSet
return valSet, nil
}
func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Validator, error) { func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Validator, error) {
tn.mtx.Lock() tn.mtx.Lock()
defer tn.mtx.Unlock() defer tn.mtx.Unlock()
@ -182,7 +190,7 @@ func (tn *TendermintNetwork) GetMeter(chainID, valID, eventID string) (*eventmet
return val.EventMeter().GetMetric(eventID) return val.EventMeter().GetMetric(eventID)
} }
func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ChainValidator, error) { func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ValidatorState, error) {
chain, ok := tn.Chains[chainID] chain, ok := tn.Chains[chainID]
if !ok { if !ok {
return nil, fmt.Errorf("Unknown chain %s", chainID) return nil, fmt.Errorf("Unknown chain %s", chainID)

View File

@ -15,6 +15,7 @@ func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc {
"blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"), "blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"),
"register_chain": rpc.NewRPCFunc(RegisterChainResult(network), "chainConfig"), "register_chain": rpc.NewRPCFunc(RegisterChainResult(network), "chainConfig"),
"validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"), "validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"),
"register_validator_set": rpc.NewRPCFunc(RegisterValidatorSetResult(network), "valSet"),
"validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"), "validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"),
"start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"), "start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"),
@ -47,6 +48,12 @@ func GetValidatorSetResult(network *TendermintNetwork) interface{} {
} }
} }
func RegisterValidatorSetResult(network *TendermintNetwork) interface{} {
return func(valSet *types.ValidatorSet) (NetMonResult, error) {
return network.RegisterValidatorSet(valSet)
}
}
func GetValidatorResult(network *TendermintNetwork) interface{} { func GetValidatorResult(network *TendermintNetwork) interface{} {
return func(valSetID, valID string) (NetMonResult, error) { return func(valSetID, valID string) (NetMonResult, error) {
return network.GetValidator(valSetID, valID) return network.GetValidator(valSetID, valID)

159
main.go
View File

@ -3,8 +3,10 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"os" "os"
"path"
"strconv" "strconv"
"strings" "strings"
@ -16,6 +18,7 @@ import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
pcm "github.com/tendermint/go-process" pcm "github.com/tendermint/go-process"
"github.com/tendermint/go-rpc/server" "github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-wire"
tmcfg "github.com/tendermint/tendermint/config/tendermint" tmcfg "github.com/tendermint/tendermint/config/tendermint"
) )
@ -39,6 +42,32 @@ func main() {
cmdConfig(c) cmdConfig(c)
}, },
}, },
{
Name: "chains-and-vals",
Usage: "Add a chain or validator set to the main config file",
ArgsUsage: "",
Action: func(c *cli.Context) {
cmdChainsAndVals(c)
},
Subcommands: []cli.Command{
{
Name: "chain",
Usage: "Add a chain to the main config file",
ArgsUsage: "[configFile] [chainBaseDir]",
Action: func(c *cli.Context) {
cmdAddChain(c)
},
},
{
Name: "val",
Usage: "Add a validator set to the main config file",
ArgsUsage: "[configFile] [valsetBaseDir]",
Action: func(c *cli.Context) {
cmdAddValSet(c)
},
},
},
},
{ {
Name: "monitor", Name: "monitor",
Usage: "Monitor a chain", Usage: "Monitor a chain",
@ -51,21 +80,110 @@ func main() {
app.Run(os.Args) app.Run(os.Args)
} }
func cmdChainsAndVals(c *cli.Context) {
cli.ShowAppHelp(c)
}
func cmdAddChain(c *cli.Context) {
args := c.Args()
if len(args) != 2 {
Exit("add chain expectes 2 arg")
}
cfgFile, chainDir := args[0], args[1]
// load major config
chainsAndVals := new(ChainsAndValidators)
if err := ReadJSONFile(chainsAndVals, cfgFile); err != nil {
Exit(err.Error())
}
// load new chain
chainCfg := new(types.BlockchainConfig)
if err := ReadJSONFile(chainCfg, path.Join(chainDir, "chain_config.json")); err != nil {
Exit(err.Error())
}
// append new chain
chainsAndVals.Blockchains = append(chainsAndVals.Blockchains, chainCfg)
// write major config
b := wire.JSONBytes(chainsAndVals)
if err := ioutil.WriteFile(cfgFile, b, 0600); err != nil {
Exit(err.Error())
}
}
func ReadJSONFile(o interface{}, filename string) error {
b, err := ioutil.ReadFile(filename)
if err != nil {
return err
}
wire.ReadJSON(o, b, &err)
if err != nil {
return err
}
return nil
}
func cmdAddValSet(c *cli.Context) {
args := c.Args()
if len(args) != 2 {
Exit("add chain expectes 2 arg")
}
cfgFile, valSetDir := args[0], args[1]
// load major config
chainsAndVals := new(ChainsAndValidators)
if err := ReadJSONFile(chainsAndVals, cfgFile); err != nil {
Exit(err.Error())
}
// load new validator set
valSet := new(types.ValidatorSet)
if err := ReadJSONFile(valSet, path.Join(valSetDir, "validator_set.json")); err != nil {
Exit(err.Error())
}
// append new validator set
chainsAndVals.ValidatorSets = append(chainsAndVals.ValidatorSets, valSet)
// write major config to file
b := wire.JSONBytes(chainsAndVals)
if err := ioutil.WriteFile(cfgFile, b, 0600); err != nil {
Exit(err.Error())
}
}
func cmdMonitor(c *cli.Context) { func cmdMonitor(c *cli.Context) {
args := c.Args() args := c.Args()
if len(args) != 1 { if len(args) != 1 {
Exit("monitor expectes 1 arg") Exit("monitor expectes 1 arg")
} }
chainConfigFile := args[0] chainsAndValsFile := args[0]
chainsAndVals, err := LoadChainsAndValsFromFile(chainsAndValsFile)
chainConfig, err := types.LoadChainFromFile(chainConfigFile)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
// the main object that watches for changes and serves the rpc requests // the main object that watches for changes and serves the rpc requests
network := handlers.NewTendermintNetwork() network := handlers.NewTendermintNetwork()
network.RegisterChain(chainConfig)
for _, valSetCfg := range chainsAndVals.ValidatorSets {
// Register validator set
_, err := network.RegisterValidatorSet(valSetCfg)
if err != nil {
Exit(err.Error())
}
}
for _, chainCfg := range chainsAndVals.Blockchains {
// Register blockchain
_, err := network.RegisterChain(chainCfg)
if err != nil {
Exit(err.Error())
}
}
// the routes are functions on the network object // the routes are functions on the network object
routes := handlers.Routes(network) routes := handlers.Routes(network)
@ -111,7 +229,7 @@ func ConfigFromMachines(chainID, prefix string, N int) (*types.BlockchainConfig,
chain := &types.BlockchainConfig{ chain := &types.BlockchainConfig{
ID: chainID, ID: chainID,
Validators: make([]*types.ChainValidator, N), Validators: make([]*types.ValidatorState, N),
} }
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
id := fmt.Sprintf("%s%d", prefix, i+1) id := fmt.Sprintf("%s%d", prefix, i+1)
@ -124,10 +242,12 @@ func ConfigFromMachines(chainID, prefix string, N int) (*types.BlockchainConfig,
ID: id, ID: id,
// TODO: pubkey // TODO: pubkey
} }
chainVal := &types.ChainValidator{ chainVal := &types.ValidatorState{
Config: &types.ValidatorConfig{
Validator: val, Validator: val,
Addr: fmt.Sprintf("%s:%d", strings.Trim(ip, "\n"), 46657), RPCAddr: fmt.Sprintf("%s:%d", strings.Trim(ip, "\n"), 46657),
Index: i, Index: i,
},
} }
chain.Validators[i] = chainVal chain.Validators[i] = chainVal
} }
@ -152,3 +272,28 @@ func runProcessGetResult(label string, command string, args []string) (string, b
return string(outFile.Bytes()), false return string(outFile.Bytes()), false
} }
} }
//----------------------------------------------------------------------
type ChainsAndValidators struct {
ValidatorSets []*types.ValidatorSet `json:"validator_sets"`
Blockchains []*types.BlockchainConfig `json:"blockchains"`
}
func LoadChainsAndValsFromFile(configFile string) (*ChainsAndValidators, error) {
b, err := ioutil.ReadFile(configFile)
if err != nil {
return nil, err
}
// for now we start with one blockchain loaded from file;
// eventually more can be uploaded or created through endpoints
chainsAndVals := new(ChainsAndValidators)
wire.ReadJSON(chainsAndVals, b, &err)
if err != nil {
return nil, err
}
return chainsAndVals, nil
}

View File

@ -62,6 +62,13 @@ type ChainState struct {
Status *BlockchainStatus `json:"status"` Status *BlockchainStatus `json:"status"`
} }
// chain config without ValidatorState
type BlockchainBaseConfig struct {
ID string `json:"id"`
ValSetID string `json:"val_set_id"`
Validators []*ValidatorConfig `json:"validators"`
}
// basic chain config // basic chain config
// threadsafe // threadsafe
type BlockchainConfig struct { type BlockchainConfig struct {
@ -69,7 +76,7 @@ type BlockchainConfig struct {
ValSetID string `json:"val_set_id"` ValSetID string `json:"val_set_id"`
mtx sync.Mutex mtx sync.Mutex
Validators []*ChainValidator `json:"validators"` Validators []*ValidatorState `json:"validators"` // TODO: this should be ValidatorConfig and the state in BlockchainStatus
valIDMap map[string]int // map IDs to indices valIDMap map[string]int // map IDs to indices
} }
@ -79,11 +86,11 @@ func (bc *BlockchainConfig) PopulateValIDMap() {
defer bc.mtx.Unlock() defer bc.mtx.Unlock()
bc.valIDMap = make(map[string]int) bc.valIDMap = make(map[string]int)
for i, v := range bc.Validators { for i, v := range bc.Validators {
bc.valIDMap[v.Validator.ID] = i bc.valIDMap[v.Config.Validator.ID] = i
} }
} }
func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ChainValidator, error) { func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ValidatorState, error) {
bc.mtx.Lock() bc.mtx.Lock()
defer bc.mtx.Unlock() defer bc.mtx.Unlock()
valIndex, ok := bc.valIDMap[valID] valIndex, ok := bc.valIDMap[valID]

View File

@ -18,6 +18,7 @@ import (
// validator set (independent of chains) // validator set (independent of chains)
type ValidatorSet struct { type ValidatorSet struct {
ID string `json:"id"`
Validators []*Validator `json:"validators"` Validators []*Validator `json:"validators"`
} }
@ -34,61 +35,68 @@ func (vs *ValidatorSet) Validator(valID string) (*Validator, error) {
type Validator struct { type Validator struct {
ID string `json:"id"` ID string `json:"id"`
PubKey crypto.PubKey `json:"pub_key"` PubKey crypto.PubKey `json:"pub_key"`
Chains []string `json:"chains"` Chains []string `json:"chains,omitempty"` // TODO: put this elsewhere (?)
}
type ValidatorConfig struct {
Validator *Validator `json:"validator"`
P2PAddr string `json:"p2p_addr"`
RPCAddr string `json:"rpc_addr"`
Index int `json:"index,omitempty"`
}
type ValidatorStatus struct {
Latency float64 `json:"latency" wire:"unsafe"`
BlockHeight int `json:"block_height"`
} }
// Validator on a chain // Validator on a chain
// Responsible for communication with the validator // Responsible for communication with the validator
// Returned over RPC but also used to manage state // Returned over RPC but also used to manage state
type ChainValidator struct { type ValidatorState struct {
Validator *Validator `json:"validator"` Config *ValidatorConfig `json:"config"`
Addr string `json:"addr"` // do we want multiple addrs? Status *ValidatorStatus `json:"status"`
Index int `json:"index"`
Latency float64 `json:"latency" wire:"unsafe"`
BlockHeight int `json:"block_height"`
em *eventmeter.EventMeter // holds a ws connection to the val em *eventmeter.EventMeter // holds a ws connection to the val
client *client.ClientURI // rpc client client *client.ClientURI // rpc client
} }
// Start a new event meter, including the websocket connection // Start a new event meter, including the websocket connection
// Also create the http rpc client for convenienve // Also create the http rpc client for convenienve
func (cv *ChainValidator) Start() error { func (vs *ValidatorState) Start() error {
em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", cv.Addr), UnmarshalEvent) em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", vs.Config.RPCAddr), UnmarshalEvent)
if err := em.Start(); err != nil { if err := em.Start(); err != nil {
return err return err
} }
cv.em = em vs.em = em
cv.client = client.NewClientURI(fmt.Sprintf("http://%s", cv.Addr)) vs.client = client.NewClientURI(fmt.Sprintf("http://%s", vs.Config.RPCAddr))
return nil return nil
} }
func (cv *ChainValidator) Stop() { func (vs *ValidatorState) Stop() {
cv.em.Stop() vs.em.Stop()
} }
func (cv *ChainValidator) EventMeter() *eventmeter.EventMeter { func (vs *ValidatorState) EventMeter() *eventmeter.EventMeter {
return cv.em return vs.em
} }
// Return the validators pubkey. If it's not yet set, get it from the node // Return the validators pubkey. If it's not yet set, get it from the node
// TODO: proof that it's the node's key // TODO: proof that it's the node's key
func (cv *ChainValidator) PubKey() crypto.PubKey { func (vs *ValidatorState) PubKey() crypto.PubKey {
if cv.Validator.PubKey != nil { if vs.Config.Validator.PubKey != nil {
return cv.Validator.PubKey return vs.Config.Validator.PubKey
} }
var result ctypes.TMResult var result ctypes.TMResult
_, err := cv.client.Call("status", nil, &result) _, err := vs.client.Call("status", nil, &result)
if err != nil { if err != nil {
log.Error("Error getting validator pubkey", "addr", cv.Addr, "val", cv.Validator.ID, "error", err) log.Error("Error getting validator pubkey", "addr", vs.Config.RPCAddr, "val", vs.Config.Validator.ID, "error", err)
return nil return nil
} }
status := result.(*ctypes.ResultStatus) status := result.(*ctypes.ResultStatus)
cv.Validator.PubKey = status.PubKey vs.Config.Validator.PubKey = status.PubKey
return cv.Validator.PubKey return vs.Config.Validator.PubKey
} }
//--------------------------------------------------- //---------------------------------------------------