diff --git a/tm-monitor/eventmeter/eventmeter.go b/tm-monitor/eventmeter/eventmeter.go index 13294930..73707ce4 100644 --- a/tm-monitor/eventmeter/eventmeter.go +++ b/tm-monitor/eventmeter/eventmeter.go @@ -65,7 +65,7 @@ func (metric *EventMetric) fillMetric() *EventMetric { // event. type EventCallbackFunc func(em *EventMetric, data interface{}) -// EventUnmarshalFunc is a closure to get the eventType and data out of the raw +// EventUnmarshalFunc is a closure to get the query and data out of the raw // JSON received over the RPC WebSocket. type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error) @@ -81,7 +81,7 @@ type EventMeter struct { wsc *client.WSClient mtx sync.Mutex - events map[string]*EventMetric + queries map[string]*EventMetric unmarshalEvent EventUnmarshalFunc latencyCallback LatencyCallbackFunc @@ -96,7 +96,7 @@ type EventMeter struct { func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { return &EventMeter{ wsc: client.NewWSClient(addr, "/websocket", client.PingPeriod(1*time.Second)), - events: make(map[string]*EventMetric), + queries: make(map[string]*EventMetric), unmarshalEvent: unmarshalEvent, logger: log.NewNopLogger(), } @@ -140,16 +140,13 @@ func (em *EventMeter) Stop() { } } -// Subscribe for the given event type. Callback function will be called upon +// Subscribe for the given query. Callback function will be called upon // receiving an event. -func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { +func (em *EventMeter) Subscribe(query string, cb EventCallbackFunc) error { em.mtx.Lock() defer em.mtx.Unlock() - if _, ok := em.events[eventType]; ok { - return fmt.Errorf("subscribtion already exists") - } - if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { + if err := em.wsc.Subscribe(context.TODO(), query); err != nil { return err } @@ -157,29 +154,28 @@ func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { meter: metrics.NewMeter(), callback: cb, } - em.events[eventType] = metric + em.queries[query] = metric return nil } -// Unsubscribe from the given event type. -func (em *EventMeter) Unsubscribe(eventType string) error { +// Unsubscribe from the given query. +func (em *EventMeter) Unsubscribe(query string) error { em.mtx.Lock() defer em.mtx.Unlock() - if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil { + if err := em.wsc.Unsubscribe(context.TODO(), query); err != nil { return err } - // XXX: should we persist or save this info first? - delete(em.events, eventType) + return nil } -// GetMetric fills in the latest data for an event and return a copy. -func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) { +// GetMetric fills in the latest data for an query and return a copy. +func (em *EventMeter) GetMetric(query string) (*EventMetric, error) { em.mtx.Lock() defer em.mtx.Unlock() - metric, ok := em.events[eventType] + metric, ok := em.queries[query] if !ok { - return nil, fmt.Errorf("unknown event: %s", eventType) + return nil, fmt.Errorf("unknown query: %s", query) } return metric.fillMetric().Copy(), nil } @@ -202,8 +198,8 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { // Private func (em *EventMeter) subscribe() error { - for eventType, _ := range em.events { - if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { + for query, _ := range em.queries { + if err := em.wsc.Subscribe(context.TODO(), query); err != nil { return err } } @@ -219,13 +215,13 @@ func (em *EventMeter) receiveRoutine() { em.logger.Error("expected some event, got error", "err", resp.Error.Error()) continue } - eventType, data, err := em.unmarshalEvent(resp.Result) + query, data, err := em.unmarshalEvent(resp.Result) if err != nil { em.logger.Error("failed to unmarshal event", "err", err) continue } - if eventType != "" { // FIXME how can it be an empty string? - em.updateMetric(eventType, data) + if query != "" { // FIXME how can it be an empty string? + em.updateMetric(query, data) } case <-latencyTicker.C: if em.wsc.IsActive() { @@ -259,13 +255,13 @@ func (em *EventMeter) disconnectRoutine() { } } -func (em *EventMeter) updateMetric(eventType string, data events.EventData) { +func (em *EventMeter) updateMetric(query string, data events.EventData) { em.mtx.Lock() defer em.mtx.Unlock() - metric, ok := em.events[eventType] + metric, ok := em.queries[query] if !ok { - // we already unsubscribed, or got an unexpected event + // we already unsubscribed, or got an unexpected query return } diff --git a/tm-monitor/mock/eventmeter.go b/tm-monitor/mock/eventmeter.go index 949afe99..bab88e16 100644 --- a/tm-monitor/mock/eventmeter.go +++ b/tm-monitor/mock/eventmeter.go @@ -21,11 +21,11 @@ func (e *EventMeter) RegisterLatencyCallback(cb em.LatencyCallbackFunc) { e.late func (e *EventMeter) RegisterDisconnectCallback(cb em.DisconnectCallbackFunc) { e.disconnectCallback = cb } -func (e *EventMeter) Subscribe(eventID string, cb em.EventCallbackFunc) error { +func (e *EventMeter) Subscribe(query string, cb em.EventCallbackFunc) error { e.eventCallback = cb return nil } -func (e *EventMeter) Unsubscribe(eventID string) error { +func (e *EventMeter) Unsubscribe(query string) error { e.eventCallback = nil return nil }