fixed race condition reported in issue #881

This commit is contained in:
caffix
2017-11-20 16:45:59 -05:00
parent f9bc22ec6a
commit 4087326f45
2 changed files with 49 additions and 43 deletions

View File

@ -144,6 +144,7 @@ func (tms *TrustMetricStore) loadFromDB() bool {
for key, p := range peers { for key, p := range peers {
tm := NewMetricWithConfig(tms.config) tm := NewMetricWithConfig(tms.config)
tm.mtx.Lock()
// Restore the number of time intervals we have previously tracked // Restore the number of time intervals we have previously tracked
if p.NumIntervals > tm.maxIntervals { if p.NumIntervals > tm.maxIntervals {
p.NumIntervals = tm.maxIntervals p.NumIntervals = tm.maxIntervals
@ -168,6 +169,8 @@ func (tms *TrustMetricStore) loadFromDB() bool {
} }
// Calculate the history value based on the loaded history data // Calculate the history value based on the loaded history data
tm.historyValue = tm.calcHistoryValue() tm.historyValue = tm.calcHistoryValue()
tm.mtx.Unlock()
// Load the peer trust metric into the store // Load the peer trust metric into the store
tms.peerMetrics[key] = tm tms.peerMetrics[key] = tm
} }
@ -181,11 +184,13 @@ func (tms *TrustMetricStore) saveToDB() {
peers := make(map[string]peerHistoryJSON, 0) peers := make(map[string]peerHistoryJSON, 0)
for key, tm := range tms.peerMetrics { for key, tm := range tms.peerMetrics {
tm.mtx.Lock()
// Add an entry for the peer identified by key // Add an entry for the peer identified by key
peers[key] = peerHistoryJSON{ peers[key] = peerHistoryJSON{
NumIntervals: tm.numIntervals, NumIntervals: tm.numIntervals,
History: tm.history, History: tm.history,
} }
tm.mtx.Unlock()
} }
// Write all the data back to the DB // Write all the data back to the DB
@ -236,6 +241,9 @@ const (
// TrustMetric - keeps track of peer reliability // TrustMetric - keeps track of peer reliability
// See tendermint/docs/architecture/adr-006-trust-metric.md for details // See tendermint/docs/architecture/adr-006-trust-metric.md for details
type TrustMetric struct { type TrustMetric struct {
// Mutex that protects the metric from concurrent access
mtx sync.Mutex
// Determines the percentage given to current behavior // Determines the percentage given to current behavior
proportionalWeight float64 proportionalWeight float64
@ -277,24 +285,6 @@ type TrustMetric struct {
// Sending true on this channel stops tracking, while false pauses tracking // Sending true on this channel stops tracking, while false pauses tracking
stop chan bool stop chan bool
// For sending information about new good/bad events to be recorded
update chan *updateBadGood
// The channel to request a newly calculated trust value
trustValue chan *reqTrustValue
}
// For the TrustMetric update channel
type updateBadGood struct {
IsBad bool
Add int
}
// For the TrustMetric trustValue channel
type reqTrustValue struct {
// The requested trust value is sent back on this channel
Resp chan float64
} }
// Pause tells the metric to pause recording data over time intervals. // Pause tells the metric to pause recording data over time intervals.
@ -310,20 +300,44 @@ func (tm *TrustMetric) Stop() {
// BadEvents indicates that an undesirable event(s) took place // BadEvents indicates that an undesirable event(s) took place
func (tm *TrustMetric) BadEvents(num int) { func (tm *TrustMetric) BadEvents(num int) {
tm.update <- &updateBadGood{IsBad: true, Add: num} tm.mtx.Lock()
defer tm.mtx.Unlock()
// Check if this is the first experience with
// what we are tracking since being paused
if tm.paused {
tm.good = 0
tm.bad = 0
// New events cause us to unpause the metric
tm.paused = false
}
tm.bad += float64(num)
} }
// GoodEvents indicates that a desirable event(s) took place // GoodEvents indicates that a desirable event(s) took place
func (tm *TrustMetric) GoodEvents(num int) { func (tm *TrustMetric) GoodEvents(num int) {
tm.update <- &updateBadGood{IsBad: false, Add: num} tm.mtx.Lock()
defer tm.mtx.Unlock()
// Check if this is the first experience with
// what we are tracking since being paused
if tm.paused {
tm.good = 0
tm.bad = 0
// New events cause us to unpause the metric
tm.paused = false
}
tm.good += float64(num)
} }
// TrustValue gets the dependable trust value; always between 0 and 1 // TrustValue gets the dependable trust value; always between 0 and 1
func (tm *TrustMetric) TrustValue() float64 { func (tm *TrustMetric) TrustValue() float64 {
resp := make(chan float64, 1) tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.trustValue <- &reqTrustValue{Resp: resp} return tm.calcTrustValue()
return <-resp
} }
// TrustScore gets a score based on the trust value always between 0 and 100 // TrustScore gets a score based on the trust value always between 0 and 100
@ -381,9 +395,7 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric {
tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1
// This metric has a perfect history so far // This metric has a perfect history so far
tm.historyValue = 1.0 tm.historyValue = 1.0
// Setup the channels // Setup the stop channel
tm.update = make(chan *updateBadGood, defaultUpdateChanCapacity)
tm.trustValue = make(chan *reqTrustValue, defaultRequestChanCapacity)
tm.stop = make(chan bool, 1) tm.stop = make(chan bool, 1)
go tm.processRequests() go tm.processRequests()
@ -516,24 +528,8 @@ func (tm *TrustMetric) processRequests() {
loop: loop:
for { for {
select { select {
case bg := <-tm.update:
// Check if this is the first experience with
// what we are tracking since being paused
if tm.paused {
tm.good = 0
tm.bad = 0
// New events cause us to unpause the metric
tm.paused = false
}
if bg.IsBad {
tm.bad += float64(bg.Add)
} else {
tm.good += float64(bg.Add)
}
case rtv := <-tm.trustValue:
rtv.Resp <- tm.calcTrustValue()
case <-t.C: case <-t.C:
tm.mtx.Lock()
if !tm.paused { if !tm.paused {
// Add the current trust value to the history data // Add the current trust value to the history data
newHist := tm.calcTrustValue() newHist := tm.calcTrustValue()
@ -563,13 +559,17 @@ loop:
tm.good = 0 tm.good = 0
tm.bad = 0 tm.bad = 0
} }
tm.mtx.Unlock()
case stop := <-tm.stop: case stop := <-tm.stop:
tm.mtx.Lock()
if stop { if stop {
// Stop all further tracking for this metric // Stop all further tracking for this metric
tm.mtx.Unlock()
break loop break loop
} }
// Pause the metric for now // Pause the metric for now
tm.paused = true tm.paused = true
tm.mtx.Unlock()
} }
} }
} }

View File

@ -210,7 +210,10 @@ func TestTrustMetricStopPause(t *testing.T) {
// Give the pause some time to take place // Give the pause some time to take place
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
tm.mtx.Lock()
first := tm.numIntervals first := tm.numIntervals
tm.mtx.Unlock()
// Allow more time to pass and check the intervals are unchanged // Allow more time to pass and check the intervals are unchanged
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
assert.Equal(t, first, tm.numIntervals) assert.Equal(t, first, tm.numIntervals)
@ -223,7 +226,10 @@ func TestTrustMetricStopPause(t *testing.T) {
// Give the stop some time to take place // Give the stop some time to take place
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
tm.mtx.Lock()
second := tm.numIntervals second := tm.numIntervals
tm.mtx.Unlock()
// Allow more time to pass and check the intervals are unchanged // Allow more time to pass and check the intervals are unchanged
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
assert.Equal(t, second, tm.numIntervals) assert.Equal(t, second, tm.numIntervals)