diff --git a/config/config.go b/config/config.go index 23da4f40..46fb55ec 100644 --- a/config/config.go +++ b/config/config.go @@ -214,6 +214,9 @@ type P2PConfig struct { // Set true for strict address routability rules AddrBookStrict bool `mapstructure:"addr_book_strict"` + // Path to the trust history file + TrustHistory string `mapstructure:"trust_history_file"` + // Set true to enable the peer-exchange reactor PexReactor bool `mapstructure:"pex"` @@ -239,6 +242,7 @@ func DefaultP2PConfig() *P2PConfig { ListenAddress: "tcp://0.0.0.0:46656", AddrBook: "addrbook.json", AddrBookStrict: true, + TrustHistory: "trusthistory.json", MaxNumPeers: 50, FlushThrottleTimeout: 100, MaxMsgPacketPayloadSize: 1024, // 1 kB @@ -255,11 +259,16 @@ func TestP2PConfig() *P2PConfig { return conf } -// AddrBookFile returns the full path to the address bool +// AddrBookFile returns the full path to the address book func (p *P2PConfig) AddrBookFile() string { return rootify(p.AddrBook, p.RootDir) } +// TrustHistoryFile returns the full path to the trust metric store history +func (p *P2PConfig) TrustHistoryFile() string { + return rootify(p.TrustHistory, p.RootDir) +} + //----------------------------------------------------------------------------- // MempoolConfig diff --git a/docs/architecture/adr-006-trust-metric.md b/docs/architecture/adr-006-trust-metric.md index 29861ce6..6fc5f9ea 100644 --- a/docs/architecture/adr-006-trust-metric.md +++ b/docs/architecture/adr-006-trust-metric.md @@ -76,40 +76,92 @@ R[0] = raw data for current time interval This section will cover the Go programming language API designed for the previously proposed process. Below is the interface for a TrustMetric: ```go + package trust -type TrustMetric struct { + +// TrustMetricStore - Manages all trust metrics for peers +type TrustMetricStore struct { + cmn.BaseService + // Private elements } +// OnStart implements Service +func (tms *TrustMetricStore) OnStart() error + +/ OnStop implements Service +func (tms *TrustMetricStore) OnStop() + +// NewTrustMetricStore returns a store that optionally saves data to +// the file path and uses the optional config when creating new trust metrics +func NewTrustMetricStore(filePath string, tmc *TrustMetricConfig) *TrustMetricStore + +// GetPeerTrustMetric returns a trust metric by peer key +func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric + +// PeerDisconnected pauses the trust metric associated with the peer identified by the key +func (tms *TrustMetricStore) PeerDisconnected(key string) + + +//---------------------------------------------------------------------------------------- +// TrustMetric - keeps track of peer reliability +type TrustMetric struct { + // Private elements. +} + +// Pause tells the metric to pause recording data over time intervals +func (tm *TrustMetric) Pause() + +// Stop tells the metric to stop recording data over time intervals +func (tm *TrustMetric) Stop() + +// BadEvent indicates that an undesirable event took place +func (tm *TrustMetric) BadEvent() + +// AddBadEvents acknowledges multiple undesirable events +func (tm *TrustMetric) AddBadEvents(num int) + +// GoodEvent indicates that a desirable event took place +func (tm *TrustMetric) GoodEvent() + +// AddGoodEvents acknowledges multiple desirable events +func (tm *TrustMetric) AddGoodEvents(num int) + +// TrustValue gets the dependable trust value; always between 0 and 1 +func (tm *TrustMetric) TrustValue() float64 + +// TrustScore gets a score based on the trust value always between 0 and 100 +func (tm *TrustMetric) TrustScore() int + +// NewMetric returns a trust metric with the default configuration +func NewMetric() *TrustMetric + + +// TrustMetricConfig - Configures the weight functions and time intervals for the metric type TrustMetricConfig struct { + // Determines the percentage given to current behavior ProportionalWeight float64 + + // Determines the percentage given to prior behavior IntegralWeight float64 - HistoryMaxSize int + + // The window of time that the trust metric will track events across. + // This can be set to cover many days without issue + TrackingWindow time.Duration + + // Each interval should be short for adapability. + // Less than 30 seconds is too sensitive, + // and greater than 5 minutes will make the metric numb IntervalLen time.Duration } -func (tm *TrustMetric) Stop() - -func (tm *TrustMetric) IncBad() - -func (tm *TrustMetric) AddBad(num int) - -func (tm *TrustMetric) IncGood() - -func (tm *TrustMetric) AddGood(num int) - -// get the dependable trust value -func (tm *TrustMetric) TrustValue() float64 - -func NewMetric() *TrustMetric +// DefaultConfig returns a config with values that have been tested and produce desirable results +func DefaultConfig() *TrustMetricConfig +// NewMetricWithConfig returns a trust metric with a custom configuration func NewMetricWithConfig(tmc *TrustMetricConfig) *TrustMetric -func GetPeerTrustMetric(key string) *TrustMetric - -func PeerDisconnected(key string) - ``` ## References diff --git a/node/node.go b/node/node.go index c8029cf8..29be71ca 100644 --- a/node/node.go +++ b/node/node.go @@ -22,6 +22,7 @@ import ( "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/trust" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" grpccore "github.com/tendermint/tendermint/rpc/grpc" @@ -95,9 +96,10 @@ type Node struct { privValidator types.PrivValidator // local node's validator key // network - privKey crypto.PrivKeyEd25519 // local node's p2p key - sw *p2p.Switch // p2p connections - addrBook *p2p.AddrBook // known peers + privKey crypto.PrivKeyEd25519 // local node's p2p key + sw *p2p.Switch // p2p connections + addrBook *p2p.AddrBook // known peers + tmStore *trust.TrustMetricStore // trust metrics for all peers // services eventBus *types.EventBus // pub/sub for services @@ -239,9 +241,12 @@ func NewNode(config *cfg.Config, // Optionally, start the pex reactor var addrBook *p2p.AddrBook + var tmStore *trust.TrustMetricStore if config.P2P.PexReactor { addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) + tmStore = trust.NewTrustMetricStore(config.P2P.TrustHistoryFile(), nil) + tmStore.SetLogger(p2pLogger.With("trust", config.P2P.TrustHistoryFile())) pexReactor := p2p.NewPEXReactor(addrBook) pexReactor.SetLogger(p2pLogger) sw.AddReactor("PEX", pexReactor) @@ -297,6 +302,7 @@ func NewNode(config *cfg.Config, privKey: privKey, sw: sw, addrBook: addrBook, + tmStore: tmStore, blockStore: blockStore, bcReactor: bcReactor, diff --git a/p2p/trust/trustmetric.go b/p2p/trust/trustmetric.go index a1257f4b..e4f202bb 100644 --- a/p2p/trust/trustmetric.go +++ b/p2p/trust/trustmetric.go @@ -1,253 +1,450 @@ +// Copyright 2017 Tendermint. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + package trust import ( "encoding/json" "io/ioutil" "math" - "os" - "path/filepath" + "sync" "time" + + cmn "github.com/tendermint/tmlibs/common" ) -var ( - store *trustMetricStore -) +// TrustMetricStore - Manages all trust metrics for peers +type TrustMetricStore struct { + cmn.BaseService -type peerMetricRequest struct { - Key string - Resp chan *TrustMetric + // Maps a Peer.Key to that peer's TrustMetric + peerMetrics map[string]*TrustMetric + + // Mutex that protects the map and history data file + mtx sync.Mutex + + // The file path where peer trust metric history data will be stored + filePath string + + // This configuration will be used when creating new TrustMetrics + config *TrustMetricConfig } -type trustMetricStore struct { - PeerMetrics map[string]*TrustMetric - Requests chan *peerMetricRequest - Disconn chan string -} - -func init() { - store = &trustMetricStore{ - PeerMetrics: make(map[string]*TrustMetric), - Requests: make(chan *peerMetricRequest, 10), - Disconn: make(chan string, 10), +// NewTrustMetricStore returns a store that optionally saves data to +// the file path and uses the optional config when creating new trust metrics +func NewTrustMetricStore(filePath string, tmc *TrustMetricConfig) *TrustMetricStore { + tms := &TrustMetricStore{ + peerMetrics: make(map[string]*TrustMetric), + filePath: filePath, + config: tmc, } - go store.processRequests() + tms.BaseService = *cmn.NewBaseService(nil, "TrustMetricStore", tms) + return tms } -type peerHistory struct { - NumIntervals int `json:"intervals"` - History []float64 `json:"history"` +// OnStart implements Service +func (tms *TrustMetricStore) OnStart() error { + tms.BaseService.OnStart() + + tms.mtx.Lock() + defer tms.mtx.Unlock() + tms.loadFromFile() + return nil } -func loadSaveFromFile(key string, isLoad bool, data *peerHistory) *peerHistory { - tmhome, ok := os.LookupEnv("TMHOME") +// OnStop implements Service +func (tms *TrustMetricStore) OnStop() { + tms.mtx.Lock() + defer tms.mtx.Unlock() + + // Stop all trust metric goroutines + for _, tm := range tms.peerMetrics { + tm.Stop() + } + + tms.saveToFile() + tms.BaseService.OnStop() +} + +// GetPeerTrustMetric returns a trust metric by peer key +func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric { + tms.mtx.Lock() + defer tms.mtx.Unlock() + + tm, ok := tms.peerMetrics[key] if !ok { - return nil - } - - filename := filepath.Join(tmhome, "trust_history.json") - - peers := make(map[string]peerHistory, 0) - // read in previously written history data - content, err := ioutil.ReadFile(filename) - if err == nil { - err = json.Unmarshal(content, &peers) - } - - var result *peerHistory - - if isLoad { - if p, ok := peers[key]; ok { - result = &p - } - } else { - peers[key] = *data - - b, err := json.Marshal(peers) - if err == nil { - err = ioutil.WriteFile(filename, b, 0644) - } - } - return result -} - -func createLoadPeerMetric(key string) *TrustMetric { - tm := NewMetric() - - if tm == nil { - return tm - } - - // attempt to load the peer's trust history data - if ph := loadSaveFromFile(key, true, nil); ph != nil { - tm.historySize = len(ph.History) - - if tm.historySize > 0 { - tm.numIntervals = ph.NumIntervals - tm.history = ph.History - - tm.historyValue = tm.calcHistoryValue() + // If the metric is not available, we will create it + tm = NewMetricWithConfig(tms.config) + if tm != nil { + // The metric needs to be in the map + tms.peerMetrics[key] = tm } } return tm } -func (tms *trustMetricStore) processRequests() { - for { - select { - case req := <-tms.Requests: - tm, ok := tms.PeerMetrics[req.Key] +// PeerDisconnected pauses the trust metric associated with the peer identified by the key +func (tms *TrustMetricStore) PeerDisconnected(key string) { + tms.mtx.Lock() + defer tms.mtx.Unlock() - if !ok { - tm = createLoadPeerMetric(req.Key) + // If the Peer that disconnected has a metric, pause it + if tm, ok := tms.peerMetrics[key]; ok { + tm.Pause() + } +} - if tm != nil { - tms.PeerMetrics[req.Key] = tm - } - } +/* Loading & Saving */ - req.Resp <- tm - case key := <-tms.Disconn: - if tm, ok := tms.PeerMetrics[key]; ok { - ph := peerHistory{ - NumIntervals: tm.numIntervals, - History: tm.history, - } +type peerHistoryJSON struct { + NumIntervals int `json:"intervals"` + History []float64 `json:"history"` +} - tm.Stop() - delete(tms.PeerMetrics, key) - loadSaveFromFile(key, false, &ph) - } +// Loads the history data for the Peer identified by key from the store file. +// cmn.Panics if file is corrupt +func (tms *TrustMetricStore) loadFromFile() bool { + // Check that a file has been configured for use + if tms.filePath == "" { + // The trust metric store can operate without the file + return false + } + + // Obtain the history data we have so far + content, err := ioutil.ReadFile(tms.filePath) + if err != nil { + cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", tms.filePath, err)) + } + + peers := make(map[string]peerHistoryJSON, 0) + err = json.Unmarshal(content, &peers) + if err != nil { + cmn.PanicCrisis(cmn.Fmt("Error decoding file %s: %v", tms.filePath, err)) + } + + // If history data exists in the file, + // load it into trust metrics and recalc + for key, p := range peers { + tm := NewMetricWithConfig(tms.config) + if tm == nil { + continue + } + // Restore the number of time intervals we have previously tracked + if p.NumIntervals > tm.maxIntervals { + p.NumIntervals = tm.maxIntervals + } + tm.numIntervals = p.NumIntervals + // Restore the history and its current size + if len(p.History) > tm.historyMaxSize { + p.History = p.History[:tm.historyMaxSize] + } + tm.history = p.History + tm.historySize = len(tm.history) + // Calculate the history value based on the loaded history data + tm.historyValue = tm.calcHistoryValue() + // Load the peer trust metric into the store + tms.peerMetrics[key] = tm + } + return true +} + +// Saves the history data for all peers to the store file +func (tms *TrustMetricStore) saveToFile() { + // Check that a file has been configured for use + if tms.filePath == "" { + // The trust metric store can operate without the file + return + } + + tms.Logger.Info("Saving TrustHistory to file", "size", len(tms.peerMetrics)) + + peers := make(map[string]peerHistoryJSON, 0) + + for key, tm := range tms.peerMetrics { + // Add an entry for the peer identified by key + peers[key] = peerHistoryJSON{ + NumIntervals: tm.numIntervals, + History: tm.history, } } -} -// request a TrustMetric by Peer Key -func GetPeerTrustMetric(key string) *TrustMetric { - resp := make(chan *TrustMetric, 1) + // Write all the data back to the file + b, err := json.Marshal(peers) + if err != nil { + tms.Logger.Error("Failed to encode the TrustHistory", "err", err) + return + } - store.Requests <- &peerMetricRequest{Key: key, Resp: resp} - return <-resp -} - -// the trust metric store should know when a Peer disconnects -func PeerDisconnected(key string) { - store.Disconn <- key -} - -// keep track of Peer reliability -type TrustMetric struct { - proportionalWeight float64 - integralWeight float64 - numIntervals int - maxIntervals int - intervalLen time.Duration - history []float64 - historySize int - historyMaxSize int - historyValue float64 - bad, good float64 - stop chan int - update chan *updateBadGood - trustValue chan *reqTrustValue -} - -type TrustMetricConfig struct { - // be careful changing these weights - ProportionalWeight float64 - IntegralWeight float64 - // don't allow 2^HistoryMaxSize to be greater than int max value - HistoryMaxSize int - // each interval should be short for adapability - // less than 30 seconds is too sensitive, - // and greater than 5 minutes will make the metric numb - IntervalLen time.Duration -} - -func defaultConfig() *TrustMetricConfig { - return &TrustMetricConfig{ - ProportionalWeight: 0.4, - IntegralWeight: 0.6, - HistoryMaxSize: 16, - IntervalLen: 1 * time.Minute, + err = ioutil.WriteFile(tms.filePath, b, 0644) + if err != nil { + tms.Logger.Error("Failed to save TrustHistory to file", "err", err) } } +//--------------------------------------------------------------------------------------- +// TrustMetric - keeps track of peer reliability +// See tendermint/docs/architecture/adr-006-trust-metric.md for details +type TrustMetric struct { + // Determines the percentage given to current behavior + proportionalWeight float64 + + // Determines the percentage given to prior behavior + integralWeight float64 + + // Count of how many time intervals this metric has been tracking + numIntervals int + + // Size of the time interval window for this trust metric + maxIntervals int + + // The time duration for a single time interval + intervalLen time.Duration + + // Stores the trust history data for this metric + history []float64 + + // The current number of history data elements + historySize int + + // The maximum number of history data elements + historyMaxSize int + + // The calculated history value for the current time interval + historyValue float64 + + // The number of recorded good and bad events for the current time interval + bad, good float64 + + // Sending true on this channel stops tracking, while false pauses tracking + stop chan bool + + // For sending information about new good/bad events to be recorded + update chan *updateBadGood + + // The channel to request a newly calculated trust value + trustValue chan *reqTrustValue +} + +// For the TrustMetric update channel type updateBadGood struct { IsBad bool Add int } +// For the TrustMetric trustValue channel type reqTrustValue struct { + // The requested trust value is sent back on this channel Resp chan float64 } -// calculates the derivative component +// Pause tells the metric to pause recording data over time intervals +func (tm *TrustMetric) Pause() { + tm.stop <- false +} + +// Stop tells the metric to stop recording data over time intervals +func (tm *TrustMetric) Stop() { + tm.stop <- true +} + +// BadEvent indicates that an undesirable event took place +func (tm *TrustMetric) BadEvent() { + tm.update <- &updateBadGood{IsBad: true, Add: 1} +} + +// AddBadEvents acknowledges multiple undesirable events +func (tm *TrustMetric) AddBadEvents(num int) { + tm.update <- &updateBadGood{IsBad: true, Add: num} +} + +// GoodEvent indicates that a desirable event took place +func (tm *TrustMetric) GoodEvent() { + tm.update <- &updateBadGood{IsBad: false, Add: 1} +} + +// AddGoodEvents acknowledges multiple desirable events +func (tm *TrustMetric) AddGoodEvents(num int) { + tm.update <- &updateBadGood{IsBad: false, Add: num} +} + +// TrustValue gets the dependable trust value; always between 0 and 1 +func (tm *TrustMetric) TrustValue() float64 { + resp := make(chan float64, 1) + + tm.trustValue <- &reqTrustValue{Resp: resp} + return <-resp +} + +// TrustScore gets a score based on the trust value always between 0 and 100 +func (tm *TrustMetric) TrustScore() int { + resp := make(chan float64, 1) + + tm.trustValue <- &reqTrustValue{Resp: resp} + return int(math.Floor(<-resp * 100)) +} + +// TrustMetricConfig - Configures the weight functions and time intervals for the metric +type TrustMetricConfig struct { + // Determines the percentage given to current behavior + ProportionalWeight float64 + + // Determines the percentage given to prior behavior + IntegralWeight float64 + + // The window of time that the trust metric will track events across. + // This can be set to cover many days without issue + TrackingWindow time.Duration + + // Each interval should be short for adapability. + // Less than 30 seconds is too sensitive, + // and greater than 5 minutes will make the metric numb + IntervalLen time.Duration +} + +// DefaultConfig returns a config with values that have been tested and produce desirable results +func DefaultConfig() *TrustMetricConfig { + return &TrustMetricConfig{ + ProportionalWeight: 0.4, + IntegralWeight: 0.6, + TrackingWindow: (time.Minute * 60 * 24) * 14, // 14 days. + IntervalLen: 1 * time.Minute, + } +} + +// NewMetric returns a trust metric with the default configuration +func NewMetric() *TrustMetric { + return NewMetricWithConfig(nil) +} + +// NewMetricWithConfig returns a trust metric with a custom configuration +func NewMetricWithConfig(tmc *TrustMetricConfig) *TrustMetric { + var config *TrustMetricConfig + + if tmc == nil { + config = DefaultConfig() + } else { + config = customConfig(tmc) + } + + tm := new(TrustMetric) + + // Setup using the configuration values + tm.proportionalWeight = config.ProportionalWeight + tm.integralWeight = config.IntegralWeight + tm.intervalLen = config.IntervalLen + // The maximum number of time intervals is the tracking window / interval length + tm.maxIntervals = int(config.TrackingWindow / tm.intervalLen) + // The history size will be determined by the maximum number of time intervals + tm.historyMaxSize = intervalToHistoryIndex(tm.maxIntervals) + 1 + // This metric has a perfect history so far + tm.historyValue = 1.0 + // Setup the channels + tm.update = make(chan *updateBadGood, 10) + tm.trustValue = make(chan *reqTrustValue, 10) + tm.stop = make(chan bool, 2) + + go tm.processRequests() + return tm +} + +/* Private methods */ + +// Ensures that all configuration elements have valid values +func customConfig(tmc *TrustMetricConfig) *TrustMetricConfig { + config := DefaultConfig() + + // Check the config for set values, and setup appropriately + if tmc.ProportionalWeight != 0 { + config.ProportionalWeight = tmc.ProportionalWeight + } + + if tmc.IntegralWeight != 0 { + config.IntegralWeight = tmc.IntegralWeight + } + + if tmc.TrackingWindow != time.Duration(0) { + config.TrackingWindow = tmc.TrackingWindow + } + + if tmc.IntervalLen != time.Duration(0) { + config.IntervalLen = tmc.IntervalLen + } + return config +} + +// Calculates the derivative component func (tm *TrustMetric) derivativeValue() float64 { return tm.proportionalValue() - tm.historyValue } -// strengthens the derivative component +// Strengthens the derivative component when the change is negative func (tm *TrustMetric) weightedDerivative() float64 { var weight float64 d := tm.derivativeValue() + if d < 0 { weight = 1.0 } - return weight * d } -func (tm *TrustMetric) fadedMemoryValue(interval int) float64 { - if interval == 0 { - // base case - return tm.history[0] - } - - index := int(math.Floor(math.Log(float64(interval)) / math.Log(2))) - // map the interval value down to an actual history index - return tm.history[index] +// Map the interval value down to an actual history index +func intervalToHistoryIndex(interval int) int { + return int(math.Floor(math.Log(float64(interval)) / math.Log(2))) } +// Retrieves the actual history data value that represents the requested time interval +func (tm *TrustMetric) fadedMemoryValue(interval int) float64 { + if interval == 0 { + // Base case + return tm.history[0] + } + return tm.history[intervalToHistoryIndex(interval)] +} + +// Performs the update for our Faded Memories process, which allows the +// trust metric tracking window to be large while maintaining a small +// number of history data values func (tm *TrustMetric) updateFadedMemory() { if tm.historySize < 2 { return } - // keep the last history element + // Keep the most recent history element faded := tm.history[:1] for i := 1; i < tm.historySize; i++ { + // The older the data is, the more we spread it out x := math.Pow(2, float64(i)) - + // Two history data values are merged into a single value ftv := ((tm.history[i] * (x - 1)) + tm.history[i-1]) / x - faded = append(faded, ftv) } tm.history = faded } -// calculates the integral (history) component of the trust value +// Calculates the integral (history) component of the trust value func (tm *TrustMetric) calcHistoryValue() float64 { var wk []float64 - // create the weights + // Create the weights. hlen := tm.numIntervals for i := 0; i < hlen; i++ { - x := math.Pow(.8, float64(i+1)) // optimistic wk + x := math.Pow(.8, float64(i+1)) // Optimistic weight wk = append(wk, x) } var wsum float64 - // calculate the sum of the weights + // Calculate the sum of the weights for _, v := range wk { wsum += v } var hv float64 - // calculate the history value + // Calculate the history value for i := 0; i < hlen; i++ { weight := wk[i] / wsum hv += tm.fadedMemoryValue(i) * weight @@ -255,10 +452,10 @@ func (tm *TrustMetric) calcHistoryValue() float64 { return hv } -// calculates the current score for good experiences +// Calculates the current score for good/bad experiences func (tm *TrustMetric) proportionalValue() float64 { value := 1.0 - // bad events are worth more + // Bad events are worth more in the calculation of our score total := tm.good + math.Pow(tm.bad, 2) if tm.bad > 0 || tm.good > 0 { @@ -267,37 +464,49 @@ func (tm *TrustMetric) proportionalValue() float64 { return value } +// Calculates the trust value for the request processing func (tm *TrustMetric) calcTrustValue() float64 { weightedP := tm.proportionalWeight * tm.proportionalValue() weightedI := tm.integralWeight * tm.historyValue weightedD := tm.weightedDerivative() tv := weightedP + weightedI + weightedD + // Do not return a negative value. if tv < 0 { tv = 0 } return tv } +// This method is for a goroutine that handles all requests on the metric func (tm *TrustMetric) processRequests() { - t := time.NewTicker(tm.intervalLen) - defer t.Stop() + var t *time.Ticker + loop: for { select { case bg := <-tm.update: + // Check if this is the first experience with + // what we are tracking since being started or paused + if t == nil { + t = time.NewTicker(tm.intervalLen) + tm.good = 0 + tm.bad = 0 + } + if bg.IsBad { tm.bad += float64(bg.Add) } else { tm.good += float64(bg.Add) } case rtv := <-tm.trustValue: - // send the calculated trust value back rtv.Resp <- tm.calcTrustValue() case <-t.C: + // Add the current trust value to the history data newHist := tm.calcTrustValue() tm.history = append([]float64{newHist}, tm.history...) + // Update history and interval counters if tm.historySize < tm.historyMaxSize { tm.historySize++ } else { @@ -308,87 +517,26 @@ loop: tm.numIntervals++ } + // Update the history data using Faded Memories tm.updateFadedMemory() + // Calculate the history value for the upcoming time interval tm.historyValue = tm.calcHistoryValue() tm.good = 0 tm.bad = 0 - case <-tm.stop: - break loop + case stop := <-tm.stop: + if stop { + // Stop all further tracking for this metric + break loop + } + // Pause the metric for now by stopping the ticker + if t != nil { + t.Stop() + t = nil + } } } -} -func (tm *TrustMetric) Stop() { - tm.stop <- 1 -} - -// indicate that an undesirable event took place -func (tm *TrustMetric) IncBad() { - tm.update <- &updateBadGood{IsBad: true, Add: 1} -} - -// multiple undesirable events need to be acknowledged -func (tm *TrustMetric) AddBad(num int) { - tm.update <- &updateBadGood{IsBad: true, Add: num} -} - -// positive events need to be recorded as well -func (tm *TrustMetric) IncGood() { - tm.update <- &updateBadGood{IsBad: false, Add: 1} -} - -// multiple positive can be indicated in a single call -func (tm *TrustMetric) AddGood(num int) { - tm.update <- &updateBadGood{IsBad: false, Add: num} -} - -// get the dependable trust value; a score that takes a long history into account -func (tm *TrustMetric) TrustValue() float64 { - resp := make(chan float64, 1) - - tm.trustValue <- &reqTrustValue{Resp: resp} - return <-resp -} - -func NewMetric() *TrustMetric { - return NewMetricWithConfig(defaultConfig()) -} - -func NewMetricWithConfig(tmc *TrustMetricConfig) *TrustMetric { - tm := new(TrustMetric) - dc := defaultConfig() - - if tmc.ProportionalWeight != 0 { - tm.proportionalWeight = tmc.ProportionalWeight - } else { - tm.proportionalWeight = dc.ProportionalWeight + if t != nil { + t.Stop() } - - if tmc.IntegralWeight != 0 { - tm.integralWeight = tmc.IntegralWeight - } else { - tm.integralWeight = dc.IntegralWeight - } - - if tmc.HistoryMaxSize != 0 { - tm.historyMaxSize = tmc.HistoryMaxSize - } else { - tm.historyMaxSize = dc.HistoryMaxSize - } - - if tmc.IntervalLen != time.Duration(0) { - tm.intervalLen = tmc.IntervalLen - } else { - tm.intervalLen = dc.IntervalLen - } - - // this gives our metric a tracking window of days - tm.maxIntervals = int(math.Pow(2, float64(tm.historyMaxSize))) - tm.historyValue = 1.0 - tm.update = make(chan *updateBadGood, 10) - tm.trustValue = make(chan *reqTrustValue, 10) - tm.stop = make(chan int, 1) - - go tm.processRequests() - return tm }