From d835cfe3e7dae9c51f12b71052b39f5422184336 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 12 Jan 2016 19:07:45 -0500 Subject: [PATCH] refactor, fixes --- event_meter.go | 225 ------------------------------------------- handlers/handlers.go | 152 +++++++++++++++++++++++++++++ handlers/routes.go | 77 +++++++++++++++ main.go | 175 ++++++++++++++++++++++++++++++++- types/types.go | 119 +++++++++++++++++++++++ 5 files changed, 518 insertions(+), 230 deletions(-) delete mode 100644 event_meter.go create mode 100644 handlers/handlers.go create mode 100644 handlers/routes.go create mode 100644 types/types.go diff --git a/event_meter.go b/event_meter.go deleted file mode 100644 index 609711f9..00000000 --- a/event_meter.go +++ /dev/null @@ -1,225 +0,0 @@ -package main - -import ( - "bytes" - "encoding/json" - "fmt" - "os" - "reflect" - "sync" - "time" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-crypto" - - // register rpc and event types with go- - ctypes "github.com/tendermint/tendermint/rpc/core/types" - // "github.com/tendermint/tendermint/types" - client "github.com/tendermint/tendermint/rpc/client" - - "github.com/gorilla/websocket" - "github.com/rcrowley/go-metrics" -) - -//------------------------------------------------------ -// Connect to all validators for a blockchain - -type Blockchain struct { - ID string - Validators []Validator -} - -type Validator struct { - ID string - PubKey crypto.PubKey - IP string - Port int -} - -//------------------------------------------------------ -// Generic system to subscribe to events and record their frequency - -// Metrics for a given event -type EventMetric struct { - ID string `json:"id"` - Started time.Time `json:"start_time"` - LastHeard time.Time `json:"last_heard"` - MinDuration int64 `json:"min_duration"` - MaxDuration int64 `json:"max_duration"` - - // tracks event count and rate - meter metrics.Meter - - // filled in from the Meter - Count int64 `json:"count"` - Rate1 float64 `json:"rate_1"` - Rate5 float64 `json:"rate_5"` - Rate15 float64 `json:"rate_15"` - RateMean float64 `json:"rate_mean"` - - // XXX: move this - // latency for node itself (not related to event) - Latency float64 `json:"latency_mean"` -} - -// Each node gets an event meter to track events for that node -type EventMeter struct { - QuitService - - wsc *client.WSClient - - mtx sync.Mutex - events map[string]*EventMetric - - // to record latency - timer metrics.Timer - lastPing time.Time - receivedPong bool -} - -func NewEventMeter(addr string) *EventMeter { - em := &EventMeter{ - wsc: client.NewWSClient(addr), - events: make(map[string]*EventMetric), - timer: metrics.NewTimer(), - receivedPong: true, - } - em.QuitService = *NewQuitService(nil, "EventMeter", em) - return em -} - -func (em *EventMeter) OnStart() error { - em.QuitService.OnStart() - if err := em.wsc.OnStart(); err != nil { - return err - } - - em.wsc.Conn.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - em.mtx.Lock() - defer em.mtx.Unlock() - em.receivedPong = true - em.timer.UpdateSince(em.lastPing) - return nil - }) - go em.receiveRoutine() - return nil -} - -func (em *EventMeter) OnStop() { - em.wsc.OnStop() - em.QuitService.OnStop() -} - -func (em *EventMeter) Subscribe(eventid string) error { - em.mtx.Lock() - defer em.mtx.Unlock() - - if _, ok := em.events[eventid]; ok { - return fmt.Errorf("Subscription already exists") - } - if err := em.wsc.Subscribe(eventid); err != nil { - return err - } - em.events[eventid] = &EventMetric{ - Started: time.Now(), - MinDuration: 1 << 62, - meter: metrics.NewMeter(), - } - return nil -} - -func (em *EventMeter) Unsubscribe(eventid string) error { - em.mtx.Lock() - defer em.mtx.Unlock() - - if err := em.wsc.Unsubscribe(eventid); err != nil { - return err - } - // XXX: should we persist or save this info first? - delete(em.events, eventid) - return nil -} - -//------------------------------------------------------ - -func (em *EventMeter) receiveRoutine() { - logTicker := time.NewTicker(time.Second * 3) - pingTicker := time.NewTicker(time.Second * 1) - for { - select { - case <-logTicker.C: - em.mtx.Lock() - for _, metric := range em.events { - metric.Count = metric.meter.Count() - metric.Rate1 = metric.meter.Rate1() - metric.Rate5 = metric.meter.Rate5() - metric.Rate15 = metric.meter.Rate15() - metric.RateMean = metric.meter.RateMean() - - metric.Latency = em.timer.Mean() - - b, err := json.Marshal(metric) - if err != nil { - // TODO - log.Error(err.Error()) - continue - } - var out bytes.Buffer - json.Indent(&out, b, "", "\t") - out.WriteTo(os.Stdout) - } - em.mtx.Unlock() - case <-pingTicker.C: - em.mtx.Lock() - - // ping to record latency - if !em.receivedPong { - // XXX: why is the pong taking so long? should we stop the conn? - em.mtx.Unlock() - continue - } - - em.lastPing = time.Now() - em.receivedPong = false - err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{}) - if err != nil { - log.Error("Failed to write ping message on websocket", "error", err) - em.wsc.Stop() - return - } - - em.mtx.Unlock() - - case r := <-em.wsc.ResultsCh: - em.mtx.Lock() - switch r := r.(type) { - case *ctypes.ResultEvent: - id, _ := r.Event, r.Data - metric, ok := em.events[id] - if !ok { - // we already unsubscribed, or got an unexpected event - continue - } - - last := metric.LastHeard - metric.LastHeard = time.Now() - metric.meter.Mark(1) - dur := int64(metric.LastHeard.Sub(last)) - if dur < metric.MinDuration { - metric.MinDuration = dur - } - if !last.IsZero() && dur > metric.MaxDuration { - metric.MaxDuration = dur - } - default: - log.Error("Unknown result event type", "type", reflect.TypeOf(r)) - } - - em.mtx.Unlock() - case <-em.Quit: - break - } - - } -} diff --git a/handlers/handlers.go b/handlers/handlers.go new file mode 100644 index 00000000..604fa70e --- /dev/null +++ b/handlers/handlers.go @@ -0,0 +1,152 @@ +package handlers + +import ( + "fmt" + "sort" + "sync" + + "github.com/tendermint/go-event-meter" + "github.com/tendermint/go-wire" + + "github.com/tendermint/netmon/types" +) + +type NetMonResultInterface interface{} + +type NetMonResult struct { + Result NetMonResultInterface +} + +// for wire.readReflect +var _ = wire.RegisterInterface( + struct{ NetMonResultInterface }{}, + wire.ConcreteType{&types.ChainAndValidatorIDs{}, 0x01}, + wire.ConcreteType{&types.ChainStatus{}, 0x02}, + wire.ConcreteType{&types.Validator{}, 0x03}, + wire.ConcreteType{&eventmeter.EventMetric{}, 0x04}, +) + +//--------------------------------------------- +// global state and backend functions + +type TendermintNetwork struct { + mtx sync.Mutex + Chains map[string]*types.ChainStatus `json:"blockchains"` + ValSets map[string]*types.ValidatorSet `json:"validator_sets"` +} + +// TODO: populate validator sets +func NewTendermintNetwork(chains ...*types.ChainStatus) *TendermintNetwork { + network := &TendermintNetwork{ + Chains: make(map[string]*types.ChainStatus), + ValSets: make(map[string]*types.ValidatorSet), + } + for _, chain := range chains { + network.Chains[chain.Config.ID] = chain + } + return network +} + +//------------ +// RPC funcs + +func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorIDs, error) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + chains := make([]string, len(tn.Chains)) + valSets := make([]string, len(tn.ValSets)) + i := 0 + for chain, _ := range tn.Chains { + chains[i] = chain + i += 1 + } + i = 0 + for valset, _ := range tn.ValSets { + valSets[i] = valset + i += 1 + } + sort.StringSlice(chains).Sort() + sort.StringSlice(valSets).Sort() + return &types.ChainAndValidatorIDs{ + ChainIDs: chains, + ValidatorSetIDs: valSets, + }, nil + +} + +func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainStatus, error) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + chain, ok := tn.Chains[chainID] + if !ok { + return nil, fmt.Errorf("Unknown chain %s", chainID) + } + return chain, nil +} + +func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorSet, error) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + valSet, ok := tn.ValSets[valSetID] + if !ok { + return nil, fmt.Errorf("Unknown validator set %s", valSetID) + } + return valSet, nil +} + +func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Validator, error) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + valSet, ok := tn.ValSets[valSetID] + if !ok { + return nil, fmt.Errorf("Unknown validator set %s", valSetID) + } + val, err := valSet.Validator(valID) + if err != nil { + return nil, err + } + return val, nil +} + +func (tn *TendermintNetwork) StartMeter(chainID, valID, eventID string) error { + tn.mtx.Lock() + defer tn.mtx.Unlock() + val, err := tn.getChainVal(chainID, valID) + if err != nil { + return err + } + return val.EventMeter().Subscribe(eventID, nil) +} + +func (tn *TendermintNetwork) StopMeter(chainID, valID, eventID string) error { + tn.mtx.Lock() + defer tn.mtx.Unlock() + val, err := tn.getChainVal(chainID, valID) + if err != nil { + return err + } + return val.EventMeter().Unsubscribe(eventID) +} + +func (tn *TendermintNetwork) GetMeter(chainID, valID, eventID string) (*eventmeter.EventMetric, error) { + tn.mtx.Lock() + defer tn.mtx.Unlock() + val, err := tn.getChainVal(chainID, valID) + if err != nil { + return nil, err + } + + return val.EventMeter().GetMetric(eventID) +} + +func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ChainValidator, error) { + chain, ok := tn.Chains[chainID] + if !ok { + return nil, fmt.Errorf("Unknown chain %s", chainID) + } + val, err := chain.Config.GetValidatorByID(valID) + if err != nil { + return nil, err + } + return val, nil +} diff --git a/handlers/routes.go b/handlers/routes.go new file mode 100644 index 00000000..b1ba1483 --- /dev/null +++ b/handlers/routes.go @@ -0,0 +1,77 @@ +package handlers + +import ( + rpc "github.com/tendermint/go-rpc/server" +) + +func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc { + return map[string]*rpc.RPCFunc{ + // subscribe/unsubscribe are reserved for websocket events. + // "subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}), + // "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}), + + "status": rpc.NewRPCFunc(StatusResult(network), ""), + "blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"), + "validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"), + "validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"), + + "start_meter": rpc.NewRPCFunc(network.StartMeter, "chainID,valID,event"), + "stop_meter": rpc.NewRPCFunc(network.StopMeter, "chainID,valID,event"), + "meter": rpc.NewRPCFunc(GetMeterResult(network), "chainID,valID,event"), + } +} + +func StatusResult(network *TendermintNetwork) interface{} { + return func() (*NetMonResult, error) { + r, err := network.Status() + if err != nil { + return nil, err + } else { + return &NetMonResult{r}, nil + } + } +} + +func GetChainResult(network *TendermintNetwork) interface{} { + return func(chain string) (*NetMonResult, error) { + r, err := network.GetChain(chain) + if err != nil { + return nil, err + } else { + return &NetMonResult{r}, nil + } + } +} + +func GetValidatorSetResult(network *TendermintNetwork) interface{} { + return func(valSetID string) (*NetMonResult, error) { + r, err := network.GetValidatorSet(valSetID) + if err != nil { + return nil, err + } else { + return &NetMonResult{r}, nil + } + } +} + +func GetValidatorResult(network *TendermintNetwork) interface{} { + return func(valSetID, valID string) (*NetMonResult, error) { + r, err := network.GetValidator(valSetID, valID) + if err != nil { + return nil, err + } else { + return &NetMonResult{r}, nil + } + } +} + +func GetMeterResult(network *TendermintNetwork) interface{} { + return func(chainID, valID, eventID string) (*NetMonResult, error) { + r, err := network.GetMeter(chainID, valID, eventID) + if err != nil { + return nil, err + } else { + return &NetMonResult{r}, nil + } + } +} diff --git a/main.go b/main.go index 5c0297ee..9541ba29 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,30 @@ package main import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "strconv" + "strings" + + "github.com/tendermint/go-event-meter" + + "github.com/tendermint/netmon/handlers" + "github.com/tendermint/netmon/types" + + "github.com/codegangsta/cli" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" + "github.com/tendermint/go-events" + pcm "github.com/tendermint/go-process" + "github.com/tendermint/go-rpc/server" + "github.com/tendermint/go-rpc/types" + "github.com/tendermint/go-wire" tmcfg "github.com/tendermint/tendermint/config/tendermint" - "github.com/tendermint/tendermint/types" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" ) func init() { @@ -15,15 +35,160 @@ func init() { } func main() { - em := NewEventMeter("ws://localhost:46657/websocket") - if _, err := em.Start(); err != nil { + app := cli.NewApp() + app.Name = "netmon" + app.Usage = "netmon [command] [args...]" + app.Commands = []cli.Command{ + { + Name: "config", + Usage: "Create a config from a mintnet testnet", + ArgsUsage: "[chainID] [prefix] [N]", + Action: func(c *cli.Context) { + cmdConfig(c) + }, + }, + { + Name: "monitor", + Usage: "Monitor a chain", + ArgsUsage: "[config file]", + Action: func(c *cli.Context) { + cmdMonitor(c) + }, + }, + } + app.Run(os.Args) +} + +func cmdMonitor(c *cli.Context) { + args := c.Args() + if len(args) != 1 { + Exit("monitor expectes 1 arg") + } + configFile := args[0] + + b, err := ioutil.ReadFile(configFile) + if err != nil { Exit(err.Error()) } - if err := em.Subscribe(types.EventStringNewBlock()); err != nil { + + // for now we start with one blockchain; + // eventually more can be uploaded or created through endpoints + chainConfig := new(types.BlockchainConfig) + if err := json.Unmarshal(b, chainConfig); err != nil { Exit(err.Error()) } + + chainStatus := &types.ChainStatus{Config: chainConfig} + + // start the event meter and listen for new blocks on each validator + for _, v := range chainConfig.Validators { + if err := v.NewEventMeter(); err != nil { + Exit(err.Error()) + } + if err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), func(metric *eventmeter.EventMetric, data interface{}) { + // TODO: update chain status with block and metric + // chainStatus.NewBlock(data.(tmtypes.EventDataNewBlock).Block) + }); err != nil { + Exit(err.Error()) + } + + } + + // the main object that watches for changes and serves the rpc requests + network := handlers.NewTendermintNetwork(chainStatus) + + // the routes are functions on the network object + routes := handlers.Routes(network) + + // register the result objects with wire + wire.RegisterInterface( + struct{ rpctypes.Result }{}, + wire.ConcreteType{&events.EventResult{}, 0x1}, + wire.ConcreteType{&ctypes.TendermintResult{}, 0x2}, + wire.ConcreteType{&handlers.NetMonResult{}, 0x3}, + ) + // serve http and ws + mux := http.NewServeMux() + wm := rpcserver.NewWebsocketManager(routes, nil) // TODO: evsw + mux.HandleFunc("/websocket", wm.WebsocketHandler) + rpcserver.RegisterRPCFuncs(mux, routes) + if _, err := rpcserver.StartHTTPServer("0.0.0.0:46670", mux); err != nil { + Exit(err.Error()) + } + TrapSignal(func() { - em.Stop() + // TODO: clean shutdown server, maybe persist last state + for _, v := range chainConfig.Validators { + v.EventMeter().Stop() + } }) } + +func cmdConfig(c *cli.Context) { + args := c.Args() + if len(args) != 3 { + Exit("config expects 3 args") + } + id, prefix := args[0], args[1] + n, err := strconv.Atoi(args[2]) + if err != nil { + Exit(err.Error()) + } + chain, err := ConfigFromMachines(id, prefix, n) + if err != nil { + Exit(err.Error()) + } + + b, err := json.Marshal(chain) + if err != nil { + Exit(err.Error()) + } + fmt.Println(string(b)) +} + +func ConfigFromMachines(chainID, prefix string, N int) (*types.BlockchainConfig, error) { + + chain := &types.BlockchainConfig{ + ID: chainID, + Validators: make([]*types.ChainValidator, N), + } + for i := 0; i < N; i++ { + id := fmt.Sprintf("%s%d", prefix, i+1) + ip, success := runProcessGetResult(id+"-ip", "docker-machine", []string{"ip", id}) + if !success { + return nil, fmt.Errorf(ip) + } + + val := &types.Validator{ + ID: id, + // TODO: pubkey + } + chainVal := &types.ChainValidator{ + Validator: val, + Addr: fmt.Sprintf("%s:%d", strings.Trim(ip, "\n"), 46657), + Index: i, + } + chain.Validators[i] = chainVal + } + return chain, nil +} + +func runProcessGetResult(label string, command string, args []string) (string, bool) { + outFile := NewBufferCloser(nil) + fmt.Println(Green(command), Green(args)) + proc, err := pcm.StartProcess(label, command, args, nil, outFile) + if err != nil { + return "", false + } + + <-proc.WaitCh + if proc.ExitState.Success() { + fmt.Println(Blue(string(outFile.Bytes()))) + return string(outFile.Bytes()), true + } else { + // Error! + fmt.Println(Red(string(outFile.Bytes()))) + return string(outFile.Bytes()), false + } +} diff --git a/types/types.go b/types/types.go new file mode 100644 index 00000000..a1a7c4a0 --- /dev/null +++ b/types/types.go @@ -0,0 +1,119 @@ +package types + +import ( + "fmt" + "sync" + + "github.com/tendermint/go-event-meter" + "github.com/tendermint/go-crypto" +) + +//--------------------------------------------- +// core types + +// Known chain and validator set IDs (from which anything else can be found) +type ChainAndValidatorIDs struct { + ChainIDs []string `json:"chain_ids"` + ValidatorSetIDs []string `json:"validator_set_ids"` +} + +// state of a chain +type ChainStatus struct { + Config *BlockchainConfig `json:"config"` + Status *BlockchainStatus `json:"status"` +} + +// basic chain config +// threadsafe +type BlockchainConfig struct { + mtx sync.Mutex + + ID string `json:"id"` + ValSetID string `json:"val_set_id"` + Validators []*ChainValidator `json:"validators"` + valIDMap map[string]int // map IDs to indices +} + +func (bc *BlockchainConfig) PopulateValIDMap() { + bc.mtx.Lock() + defer bc.mtx.Unlock() + bc.valIDMap = make(map[string]int) + for i, v := range bc.Validators { + bc.valIDMap[v.ID] = i + } +} + +func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ChainValidator, error) { + bc.mtx.Lock() + defer bc.mtx.Unlock() + valIndex, ok := bc.valIDMap[valID] + if !ok { + return nil, fmt.Errorf("Unknown validator %s", valID) + } + return bc.Validators[valIndex], nil +} + +// basic chain status/metrics +// threadsafe +type BlockchainStatus struct { + mtx sync.Mutex + + Height int `json:"height"` + MeanBlockTime float64 `json:"mean_block_time"` + TxThroughput float64 `json:"tx_throughput"` + + BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ? +} + +// validator on a chain +type ChainValidator struct { + *Validator + Addr string `json:"addr"` // do we want multiple addrs? + Index int `json:"index"` + + em *eventmeter.EventMeter // holds a ws connection to the val + Latency float64 `json:"latency,omitempty"` +} + +func (cv *ChainValidator) NewEventMeter() error { + em := eventmeter.NewEventMeter(fmt.Sprintf("ws://%s/websocket", cv.Addr)) + if err := em.Start(); err != nil { + return err + } + cv.em = em + return nil +} + +func (cv *ChainValidator) EventMeter() *eventmeter.EventMeter { + return cv.em +} + +// validator set (independent of chains) +type ValidatorSet struct { + Validators []*Validator `json:"validators"` +} + +func (vs *ValidatorSet) Validator(valID string) (*Validator, error) { + for _, v := range vs.Validators { + if v.ID == valID { + return v, nil + } + } + return nil, fmt.Errorf("Unknwon validator %s", valID) +} + +// validator (independent of chain) +type Validator struct { + ID string `json:"id"` + PubKey crypto.PubKey `json:"pub_key,omitempty"` + Chains []*ChainStatus `json:"chains,omitempty"` +} + +func (v *Validator) Chain(chainID string) (*ChainStatus, error) { + for _, c := range v.Chains { + if c.Config.ID == chainID { + return c, nil + } + } + return nil, fmt.Errorf("Unknwon chain %s", chainID) +}