diff --git a/.gitignore b/.gitignore index 62f28681..e0a06eaf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ +*.swp vendor .glide + +pubsub/query/fuzz_test/output diff --git a/CHANGELOG.md b/CHANGELOG.md index e6783601..bf39e544 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ # Changelog +## 0.3.0 (September 22, 2017) + +BREAKING CHANGES: + +- [log] logger functions no longer returns an error +- [common] NewBaseService takes the new logger +- [cli] RunCaptureWithArgs now captures stderr and stdout + - +func RunCaptureWithArgs(cmd Executable, args []string, env map[string]string) (stdout, stderr string, err error) + - -func RunCaptureWithArgs(cmd Executable, args []string, env map[string]string) (output string, err error) + +FEATURES: + +- [common] various common HTTP functionality +- [common] Date range parsing from string (ex. "2015-12-31:2017-12-31") +- [common] ProtocolAndAddress function +- [pubsub] New package for publish-subscribe with more advanced filtering + +BUG FIXES: + +- [common] fix atomicity of WriteFileAtomic by calling fsync +- [db] fix memDb iteration index out of range +- [autofile] fix Flush by calling fsync + ## 0.2.2 (June 16, 2017) FEATURES: @@ -10,13 +33,12 @@ FEATURES: IMPROVEMENTS: - [cli] Error handling for tests -- [cli] Support dashes in ENV variables +- [cli] Support dashes in ENV variables BUG FIXES: - [flowrate] Fix non-deterministic test failures - ## 0.2.1 (June 2, 2017) FEATURES: diff --git a/autofile/group.go b/autofile/group.go index 39f274e0..ce3e3000 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -153,7 +153,11 @@ func (g *Group) WriteLine(line string) error { func (g *Group) Flush() error { g.mtx.Lock() defer g.mtx.Unlock() - return g.headBuf.Flush() + err := g.headBuf.Flush() + if err == nil { + err = g.Head.Sync() + } + return err } func (g *Group) processTicks() { diff --git a/cli/helper.go b/cli/helper.go index 79654bc3..845c17db 100644 --- a/cli/helper.go +++ b/cli/helper.go @@ -54,31 +54,40 @@ func RunWithArgs(cmd Executable, args []string, env map[string]string) error { return cmd.Execute() } -// RunCaptureWithArgs executes the given command with the specified command line args -// and environmental variables set. It returns whatever was writen to -// stdout along with any error returned from cmd.Execute() -func RunCaptureWithArgs(cmd Executable, args []string, env map[string]string) (output string, err error) { - old := os.Stdout // keep backup of the real stdout - r, w, _ := os.Pipe() - os.Stdout = w +// RunCaptureWithArgs executes the given command with the specified command +// line args and environmental variables set. It returns string fields +// representing output written to stdout and stderr, additionally any error +// from cmd.Execute() is also returned +func RunCaptureWithArgs(cmd Executable, args []string, env map[string]string) (stdout, stderr string, err error) { + oldout, olderr := os.Stdout, os.Stderr // keep backup of the real stdout + rOut, wOut, _ := os.Pipe() + rErr, wErr, _ := os.Pipe() + os.Stdout, os.Stderr = wOut, wErr defer func() { - os.Stdout = old // restoring the real stdout + os.Stdout, os.Stderr = oldout, olderr // restoring the real stdout }() - outC := make(chan string) // copy the output in a separate goroutine so printing can't block indefinitely - go func() { - var buf bytes.Buffer - // io.Copy will end when we call w.Close() below - io.Copy(&buf, r) - outC <- buf.String() - }() + copyStd := func(reader *os.File) *(chan string) { + stdC := make(chan string) + go func() { + var buf bytes.Buffer + // io.Copy will end when we call reader.Close() below + io.Copy(&buf, reader) + stdC <- buf.String() + }() + return &stdC + } + outC := copyStd(rOut) + errC := copyStd(rErr) // now run the command err = RunWithArgs(cmd, args, env) // and grab the stdout to return - w.Close() - output = <-outC - return output, err + wOut.Close() + wErr.Close() + stdout = <-*outC + stderr = <-*errC + return stdout, stderr, err } diff --git a/cli/setup.go b/cli/setup.go index 35362ed8..78151015 100644 --- a/cli/setup.go +++ b/cli/setup.go @@ -97,9 +97,9 @@ func (e Executor) Execute() error { err := e.Command.Execute() if err != nil { if viper.GetBool(TraceFlag) { - fmt.Printf("ERROR: %+v\n", err) + fmt.Fprintf(os.Stderr, "ERROR: %+v\n", err) } else { - fmt.Println("ERROR:", err.Error()) + fmt.Fprintf(os.Stderr, "ERROR: %v\n", err) } // return error code 1 by default, can override it with a special error type diff --git a/cli/setup_test.go b/cli/setup_test.go index 36cbbcc9..4e606ac7 100644 --- a/cli/setup_test.go +++ b/cli/setup_test.go @@ -223,9 +223,11 @@ func TestSetupTrace(t *testing.T) { viper.Reset() args := append([]string{cmd.Use}, tc.args...) - out, err := RunCaptureWithArgs(cmd, args, tc.env) + stdout, stderr, err := RunCaptureWithArgs(cmd, args, tc.env) require.NotNil(err, i) - msg := strings.Split(out, "\n") + require.Equal("", stdout, i) + require.NotEqual("", stderr, i) + msg := strings.Split(stderr, "\n") desired := fmt.Sprintf("ERROR: %s", tc.expected) assert.Equal(desired, msg[0], i) if tc.long && assert.True(len(msg) > 2, i) { diff --git a/common/byteslice.go b/common/byteslice.go index be828f06..ceaf06bd 100644 --- a/common/byteslice.go +++ b/common/byteslice.go @@ -4,6 +4,9 @@ import ( "bytes" ) +// Fingerprint returns the first 6 bytes of a byte slice. +// If the slice is less than 6 bytes, the fingerprint +// contains trailing zeroes. func Fingerprint(slice []byte) []byte { fingerprint := make([]byte, 6) copy(fingerprint, slice) diff --git a/common/date.go b/common/date.go new file mode 100644 index 00000000..e017a4b4 --- /dev/null +++ b/common/date.go @@ -0,0 +1,43 @@ +package common + +import ( + "strings" + "time" + + "github.com/pkg/errors" +) + +// TimeLayout helps to parse a date string of the format YYYY-MM-DD +// Intended to be used with the following function: +// time.Parse(TimeLayout, date) +var TimeLayout = "2006-01-02" //this represents YYYY-MM-DD + +// ParseDateRange parses a date range string of the format start:end +// where the start and end date are of the format YYYY-MM-DD. +// The parsed dates are time.Time and will return the zero time for +// unbounded dates, ex: +// unbounded start: :2000-12-31 +// unbounded end: 2000-12-31: +func ParseDateRange(dateRange string) (startDate, endDate time.Time, err error) { + dates := strings.Split(dateRange, ":") + if len(dates) != 2 { + err = errors.New("bad date range, must be in format date:date") + return + } + parseDate := func(date string) (out time.Time, err error) { + if len(date) == 0 { + return + } + out, err = time.Parse(TimeLayout, date) + return + } + startDate, err = parseDate(dates[0]) + if err != nil { + return + } + endDate, err = parseDate(dates[1]) + if err != nil { + return + } + return +} diff --git a/common/date_test.go b/common/date_test.go new file mode 100644 index 00000000..2c063247 --- /dev/null +++ b/common/date_test.go @@ -0,0 +1,46 @@ +package common + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var ( + date = time.Date(2015, time.Month(12), 31, 0, 0, 0, 0, time.UTC) + date2 = time.Date(2016, time.Month(12), 31, 0, 0, 0, 0, time.UTC) + zero time.Time +) + +func TestParseDateRange(t *testing.T) { + assert := assert.New(t) + + var testDates = []struct { + dateStr string + start time.Time + end time.Time + errNil bool + }{ + {"2015-12-31:2016-12-31", date, date2, true}, + {"2015-12-31:", date, zero, true}, + {":2016-12-31", zero, date2, true}, + {"2016-12-31", zero, zero, false}, + {"2016-31-12:", zero, zero, false}, + {":2016-31-12", zero, zero, false}, + } + + for _, test := range testDates { + start, end, err := ParseDateRange(test.dateStr) + if test.errNil { + assert.Nil(err) + testPtr := func(want, have time.Time) { + assert.True(have.Equal(want)) + } + testPtr(test.start, start) + testPtr(test.end, end) + } else { + assert.NotNil(err) + } + } +} diff --git a/common/http.go b/common/http.go new file mode 100644 index 00000000..56b5b6c6 --- /dev/null +++ b/common/http.go @@ -0,0 +1,153 @@ +package common + +import ( + "encoding/json" + "io" + "net/http" + + "gopkg.in/go-playground/validator.v9" + + "github.com/pkg/errors" +) + +type ErrorResponse struct { + Success bool `json:"success,omitempty"` + + // Err is the error message if Success is false + Err string `json:"error,omitempty"` + + // Code is set if Success is false + Code int `json:"code,omitempty"` +} + +// ErrorWithCode makes an ErrorResponse with the +// provided err's Error() content, and status code. +// It panics if err is nil. +func ErrorWithCode(err error, code int) *ErrorResponse { + return &ErrorResponse{ + Err: err.Error(), + Code: code, + } +} + +// Ensure that ErrorResponse implements error +var _ error = (*ErrorResponse)(nil) + +func (er *ErrorResponse) Error() string { + return er.Err +} + +// Ensure that ErrorResponse implements httpCoder +var _ httpCoder = (*ErrorResponse)(nil) + +func (er *ErrorResponse) HTTPCode() int { + return er.Code +} + +var errNilBody = errors.Errorf("expecting a non-nil body") + +// FparseJSON unmarshals into save, the body of the provided reader. +// Since it uses json.Unmarshal, save must be of a pointer type +// or compatible with json.Unmarshal. +func FparseJSON(r io.Reader, save interface{}) error { + if r == nil { + return errors.Wrap(errNilBody, "Reader") + } + + dec := json.NewDecoder(r) + if err := dec.Decode(save); err != nil { + return errors.Wrap(err, "Decode/Unmarshal") + } + return nil +} + +// ParseRequestJSON unmarshals into save, the body of the +// request. It closes the body of the request after parsing. +// Since it uses json.Unmarshal, save must be of a pointer type +// or compatible with json.Unmarshal. +func ParseRequestJSON(r *http.Request, save interface{}) error { + if r == nil || r.Body == nil { + return errNilBody + } + defer r.Body.Close() + + return FparseJSON(r.Body, save) +} + +// ParseRequestAndValidateJSON unmarshals into save, the body of the +// request and invokes a validator on the saved content. To ensure +// validation, make sure to set tags "validate" on your struct as +// per https://godoc.org/gopkg.in/go-playground/validator.v9. +// It closes the body of the request after parsing. +// Since it uses json.Unmarshal, save must be of a pointer type +// or compatible with json.Unmarshal. +func ParseRequestAndValidateJSON(r *http.Request, save interface{}) error { + if r == nil || r.Body == nil { + return errNilBody + } + defer r.Body.Close() + + return FparseAndValidateJSON(r.Body, save) +} + +// FparseAndValidateJSON like FparseJSON unmarshals into save, +// the body of the provided reader. However, it invokes the validator +// to check the set validators on your struct fields as per +// per https://godoc.org/gopkg.in/go-playground/validator.v9. +// Since it uses json.Unmarshal, save must be of a pointer type +// or compatible with json.Unmarshal. +func FparseAndValidateJSON(r io.Reader, save interface{}) error { + if err := FparseJSON(r, save); err != nil { + return err + } + return validate(save) +} + +var theValidator = validator.New() + +func validate(obj interface{}) error { + return errors.Wrap(theValidator.Struct(obj), "Validate") +} + +// WriteSuccess JSON marshals the content provided, to an HTTP +// response, setting the provided status code and setting header +// "Content-Type" to "application/json". +func WriteSuccess(w http.ResponseWriter, data interface{}) { + WriteCode(w, data, 200) +} + +// WriteCode JSON marshals content, to an HTTP response, +// setting the provided status code, and setting header +// "Content-Type" to "application/json". If JSON marshalling fails +// with an error, WriteCode instead writes out the error invoking +// WriteError. +func WriteCode(w http.ResponseWriter, out interface{}, code int) { + blob, err := json.MarshalIndent(out, "", " ") + if err != nil { + WriteError(w, err) + } else { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + w.Write(blob) + } +} + +type httpCoder interface { + HTTPCode() int +} + +// WriteError is a convenience function to write out an +// error to an http.ResponseWriter, to send out an error +// that's structured as JSON i.e the form +// {"error": sss, "code": ddd} +// If err implements the interface HTTPCode() int, +// it will use that status code otherwise, it will +// set code to be http.StatusBadRequest +func WriteError(w http.ResponseWriter, err error) { + code := http.StatusBadRequest + if httpC, ok := err.(httpCoder); ok { + code = httpC.HTTPCode() + } + + WriteCode(w, ErrorWithCode(err, code), code) +} diff --git a/common/http_test.go b/common/http_test.go new file mode 100644 index 00000000..73761fb1 --- /dev/null +++ b/common/http_test.go @@ -0,0 +1,250 @@ +package common_test + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "reflect" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/common" +) + +func TestWriteSuccess(t *testing.T) { + w := httptest.NewRecorder() + common.WriteSuccess(w, "foo") + assert.Equal(t, w.Code, 200, "should get a 200") +} + +var blankErrResponse = new(common.ErrorResponse) + +func TestWriteError(t *testing.T) { + tests := [...]struct { + msg string + code int + }{ + 0: { + msg: "this is a message", + code: 419, + }, + } + + for i, tt := range tests { + w := httptest.NewRecorder() + msg := tt.msg + + // First check without a defined code, should send back a 400 + common.WriteError(w, errors.New(msg)) + assert.Equal(t, w.Code, http.StatusBadRequest, "#%d: should get a 400", i) + blob, err := ioutil.ReadAll(w.Body) + if err != nil { + assert.Fail(t, "expecting a successful ioutil.ReadAll", "#%d", i) + continue + } + + recv := new(common.ErrorResponse) + if err := json.Unmarshal(blob, recv); err != nil { + assert.Fail(t, "expecting a successful json.Unmarshal", "#%d", i) + continue + } + + assert.Equal(t, reflect.DeepEqual(recv, blankErrResponse), false, "expecting a non-blank error response") + + // Now test with an error that's .HTTPCode() int conforming + + // Reset w + w = httptest.NewRecorder() + + common.WriteError(w, common.ErrorWithCode(errors.New("foo"), tt.code)) + assert.Equal(t, w.Code, tt.code, "case #%d", i) + } +} + +type marshalFailer struct{} + +var errFooFailed = errors.New("foo failed here") + +func (mf *marshalFailer) MarshalJSON() ([]byte, error) { + return nil, errFooFailed +} + +func TestWriteCode(t *testing.T) { + codes := [...]int{ + 0: http.StatusOK, + 1: http.StatusBadRequest, + 2: http.StatusUnauthorized, + 3: http.StatusInternalServerError, + } + + for i, code := range codes { + w := httptest.NewRecorder() + common.WriteCode(w, "foo", code) + assert.Equal(t, w.Code, code, "#%d", i) + + // Then for the failed JSON marshaling + w = httptest.NewRecorder() + common.WriteCode(w, &marshalFailer{}, code) + wantCode := http.StatusBadRequest + assert.Equal(t, w.Code, wantCode, "#%d", i) + assert.True(t, strings.Contains(string(w.Body.Bytes()), errFooFailed.Error()), + "#%d: expected %q in the error message", i, errFooFailed) + } +} + +type saver struct { + Foo int `json:"foo" validate:"min=10"` + Bar string `json:"bar"` +} + +type rcloser struct { + closeOnce sync.Once + body *bytes.Buffer + closeChan chan bool +} + +var errAlreadyClosed = errors.New("already closed") + +func (rc *rcloser) Close() error { + var err = errAlreadyClosed + rc.closeOnce.Do(func() { + err = nil + rc.closeChan <- true + close(rc.closeChan) + }) + return err +} + +func (rc *rcloser) Read(b []byte) (int, error) { + return rc.body.Read(b) +} + +var _ io.ReadCloser = (*rcloser)(nil) + +func makeReq(strBody string) (*http.Request, <-chan bool) { + closeChan := make(chan bool, 1) + buf := new(bytes.Buffer) + buf.Write([]byte(strBody)) + req := &http.Request{ + Header: make(http.Header), + Body: &rcloser{body: buf, closeChan: closeChan}, + } + return req, closeChan +} + +func TestParseRequestJSON(t *testing.T) { + tests := [...]struct { + body string + wantErr bool + useNil bool + }{ + 0: {wantErr: true, body: ``}, + 1: {body: `{}`}, + 2: {body: `{"foo": 2}`}, // Not that the validate tags don't matter here since we are just parsing + 3: {body: `{"foo": "abcd"}`, wantErr: true}, + 4: {useNil: true, wantErr: true}, + } + + for i, tt := range tests { + req, closeChan := makeReq(tt.body) + if tt.useNil { + req.Body = nil + } + sav := new(saver) + err := common.ParseRequestJSON(req, sav) + if tt.wantErr { + assert.NotEqual(t, err, nil, "#%d: want non-nil error", i) + continue + } + assert.Equal(t, err, nil, "#%d: want nil error", i) + wasClosed := <-closeChan + assert.Equal(t, wasClosed, true, "#%d: should have invoked close", i) + } +} + +func TestFparseJSON(t *testing.T) { + r1 := strings.NewReader(`{"foo": 1}`) + sav := new(saver) + require.Equal(t, common.FparseJSON(r1, sav), nil, "expecting successful parsing") + r2 := strings.NewReader(`{"bar": "blockchain"}`) + require.Equal(t, common.FparseJSON(r2, sav), nil, "expecting successful parsing") + require.Equal(t, reflect.DeepEqual(sav, &saver{Foo: 1, Bar: "blockchain"}), true, "should have parsed both") + + // Now with a nil body + require.NotEqual(t, nil, common.FparseJSON(nil, sav), "expecting a nil error report") +} + +func TestFparseAndValidateJSON(t *testing.T) { + r1 := strings.NewReader(`{"foo": 1}`) + sav := new(saver) + require.NotEqual(t, common.FparseAndValidateJSON(r1, sav), nil, "expecting validation to fail") + r1 = strings.NewReader(`{"foo": 100}`) + require.Equal(t, common.FparseJSON(r1, sav), nil, "expecting successful parsing") + r2 := strings.NewReader(`{"bar": "blockchain"}`) + require.Equal(t, common.FparseAndValidateJSON(r2, sav), nil, "expecting successful parsing") + require.Equal(t, reflect.DeepEqual(sav, &saver{Foo: 100, Bar: "blockchain"}), true, "should have parsed both") + + // Now with a nil body + require.NotEqual(t, nil, common.FparseJSON(nil, sav), "expecting a nil error report") +} + +var blankSaver = new(saver) + +func TestParseAndValidateRequestJSON(t *testing.T) { + tests := [...]struct { + body string + wantErr bool + useNil bool + }{ + 0: {wantErr: true, body: ``}, + 1: {body: `{}`, wantErr: true}, // Here it should fail since Foo doesn't meet the minimum value + 2: {body: `{"foo": 2}`, wantErr: true}, // Here validation should fail + 3: {body: `{"foo": "abcd"}`, wantErr: true}, + 4: {useNil: true, wantErr: true}, + 5: {body: `{"foo": 100}`}, // Must succeed + } + + for i, tt := range tests { + req, closeChan := makeReq(tt.body) + if tt.useNil { + req.Body = nil + } + sav := new(saver) + err := common.ParseRequestAndValidateJSON(req, sav) + if tt.wantErr { + assert.NotEqual(t, err, nil, "#%d: want non-nil error", i) + continue + } + + assert.Equal(t, err, nil, "#%d: want nil error", i) + assert.False(t, reflect.DeepEqual(blankSaver, sav), "#%d: expecting a set saver", i) + + wasClosed := <-closeChan + assert.Equal(t, wasClosed, true, "#%d: should have invoked close", i) + } +} + +func TestErrorWithCode(t *testing.T) { + tests := [...]struct { + code int + err error + }{ + 0: {code: 500, err: errors.New("funky")}, + 1: {code: 406, err: errors.New("purist")}, + } + + for i, tt := range tests { + errRes := common.ErrorWithCode(tt.err, tt.code) + assert.Equal(t, errRes.Error(), tt.err.Error(), "#%d: expecting the error values to be equal", i) + assert.Equal(t, errRes.Code, tt.code, "expecting the same status code", i) + assert.Equal(t, errRes.HTTPCode(), tt.code, "expecting the same status code", i) + } +} diff --git a/common/net.go b/common/net.go index 2f9c9c8c..bdbe38f7 100644 --- a/common/net.go +++ b/common/net.go @@ -5,10 +5,22 @@ import ( "strings" ) -// protoAddr: e.g. "tcp://127.0.0.1:8080" or "unix:///tmp/test.sock" +// Connect dials the given address and returns a net.Conn. The protoAddr argument should be prefixed with the protocol, +// eg. "tcp://127.0.0.1:8080" or "unix:///tmp/test.sock" func Connect(protoAddr string) (net.Conn, error) { - parts := strings.SplitN(protoAddr, "://", 2) - proto, address := parts[0], parts[1] + proto, address := ProtocolAndAddress(protoAddr) conn, err := net.Dial(proto, address) return conn, err } + +// ProtocolAndAddress splits an address into the protocol and address components. +// For instance, "tcp://127.0.0.1:8080" will be split into "tcp" and "127.0.0.1:8080". +// If the address has no protocol prefix, the default is "tcp". +func ProtocolAndAddress(listenAddr string) (string, string) { + protocol, address := "tcp", listenAddr + parts := strings.SplitN(address, "://", 2) + if len(parts) == 2 { + protocol, address = parts[0], parts[1] + } + return protocol, address +} diff --git a/common/net_test.go b/common/net_test.go new file mode 100644 index 00000000..38d2ae82 --- /dev/null +++ b/common/net_test.go @@ -0,0 +1,38 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProtocolAndAddress(t *testing.T) { + + cases := []struct { + fullAddr string + proto string + addr string + }{ + { + "tcp://mydomain:80", + "tcp", + "mydomain:80", + }, + { + "mydomain:80", + "tcp", + "mydomain:80", + }, + { + "unix://mydomain:80", + "unix", + "mydomain:80", + }, + } + + for _, c := range cases { + proto, addr := ProtocolAndAddress(c.fullAddr) + assert.Equal(t, proto, c.proto) + assert.Equal(t, addr, c.addr) + } +} diff --git a/common/os.go b/common/os.go index 9dc81c57..9c2bda50 100644 --- a/common/os.go +++ b/common/os.go @@ -48,7 +48,12 @@ func EnsureDir(dir string, mode os.FileMode) error { func IsDirEmpty(name string) (bool, error) { f, err := os.Open(name) if err != nil { - return true, err //folder is non-existent + if os.IsNotExist(err) { + return true, err + } + // Otherwise perhaps a permission + // error or some other error. + return false, err } defer f.Close() @@ -93,28 +98,30 @@ func MustWriteFile(filePath string, contents []byte, mode os.FileMode) { } } -// Writes to newBytes to filePath. -// Guaranteed not to lose *both* oldBytes and newBytes, -// (assuming that the OS is perfect) +// WriteFileAtomic writes newBytes to temp and atomically moves to filePath +// when everything else succeeds. func WriteFileAtomic(filePath string, newBytes []byte, mode os.FileMode) error { - // If a file already exists there, copy to filePath+".bak" (overwrite anything) - if _, err := os.Stat(filePath); !os.IsNotExist(err) { - fileBytes, err := ioutil.ReadFile(filePath) - if err != nil { - return fmt.Errorf("Could not read file %v. %v", filePath, err) - } - err = ioutil.WriteFile(filePath+".bak", fileBytes, mode) - if err != nil { - return fmt.Errorf("Could not write file %v. %v", filePath+".bak", err) - } - } - // Write newBytes to filePath.new - err := ioutil.WriteFile(filePath+".new", newBytes, mode) + f, err := ioutil.TempFile("", "") if err != nil { - return fmt.Errorf("Could not write file %v. %v", filePath+".new", err) + return err + } + _, err = f.Write(newBytes) + if err == nil { + err = f.Sync() + } + if closeErr := f.Close(); err == nil { + err = closeErr + } + if permErr := os.Chmod(f.Name(), mode); err == nil { + err = permErr + } + if err == nil { + err = os.Rename(f.Name(), filePath) + } + // any err should result in full cleanup + if err != nil { + os.Remove(f.Name()) } - // Move filePath.new to filePath - err = os.Rename(filePath+".new", filePath) return err } diff --git a/common/os_test.go b/common/os_test.go new file mode 100644 index 00000000..05359e36 --- /dev/null +++ b/common/os_test.go @@ -0,0 +1,29 @@ +package common + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "testing" + "time" +) + +func TestWriteFileAtomic(t *testing.T) { + data := []byte("Becatron") + fname := fmt.Sprintf("/tmp/write-file-atomic-test-%v.txt", time.Now().UnixNano()) + err := WriteFileAtomic(fname, data, 0664) + if err != nil { + t.Fatal(err) + } + rData, err := ioutil.ReadFile(fname) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(data, rData) { + t.Fatalf("data mismatch: %v != %v", data, rData) + } + if err := os.Remove(fname); err != nil { + t.Fatal(err) + } +} diff --git a/db/mem_db.go b/db/mem_db.go index 28662429..db40227e 100644 --- a/db/mem_db.go +++ b/db/mem_db.go @@ -82,7 +82,7 @@ func newMemDBIterator() *memDBIterator { } func (it *memDBIterator) Next() bool { - if it.last >= len(it.keys) { + if it.last >= len(it.keys)-1 { return false } it.last++ diff --git a/db/mem_db_test.go b/db/mem_db_test.go new file mode 100644 index 00000000..a76e10dc --- /dev/null +++ b/db/mem_db_test.go @@ -0,0 +1,28 @@ +package db + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMemDbIterator(t *testing.T) { + db := NewMemDB() + keys := make([][]byte, 100) + for i := 0; i < 100; i++ { + keys[i] = []byte{byte(i)} + } + + value := []byte{5} + for _, k := range keys { + db.Set(k, value) + } + + iter := db.Iterator() + i := 0 + for iter.Next() { + assert.Equal(t, db.Get(iter.Key()), iter.Value(), "values dont match for key") + i += 1 + } + assert.Equal(t, i, len(db.db), "iterator didnt cover whole db") +} diff --git a/glide.lock b/glide.lock index b30f538a..b0b3ff3c 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 69359a39dbb6957c9f09167520317ad72d4bfa75f37a614b347e2510768c8a42 -updated: 2017-05-05T17:46:34.975369143Z +hash: 6efda1f3891a7211fc3dc1499c0079267868ced9739b781928af8e225420f867 +updated: 2017-08-11T20:28:34.550901198Z imports: - name: github.com/fsnotify/fsnotify version: 4da3e2cfbabc9f751898f250b49f2439785783a1 @@ -11,6 +11,12 @@ imports: - log/term - name: github.com/go-logfmt/logfmt version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 +- name: github.com/go-playground/locales + version: 1e5f1161c6416a5ff48840eb8724a394e48cc534 + subpackages: + - currency +- name: github.com/go-playground/universal-translator + version: 71201497bace774495daed26a3874fd339e0b538 - name: github.com/go-stack/stack version: 7a2f19628aabfe68f0766b59e74d6315f8347d22 - name: github.com/golang/snappy @@ -97,11 +103,13 @@ imports: subpackages: - transform - unicode/norm +- name: gopkg.in/go-playground/validator.v9 + version: d529ee1b0f30352444f507cc6cdac96bfd12decc - name: gopkg.in/yaml.v2 version: cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b testImports: - name: github.com/davecgh/go-spew - version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 + version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 subpackages: - spew - name: github.com/pmezard/go-difflib @@ -109,7 +117,7 @@ testImports: subpackages: - difflib - name: github.com/stretchr/testify - version: 4d4bfba8f1d1027c4fdbe371823030df51419987 + version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 subpackages: - assert - require diff --git a/glide.yaml b/glide.yaml index d8bdd587..22825a27 100644 --- a/glide.yaml +++ b/glide.yaml @@ -23,6 +23,7 @@ import: - package: golang.org/x/crypto subpackages: - ripemd160 +- package: gopkg.in/go-playground/validator.v9 testImport: - package: github.com/stretchr/testify version: ^1.1.4 diff --git a/log/filter.go b/log/filter.go index 45108883..768c09b8 100644 --- a/log/filter.go +++ b/log/filter.go @@ -2,6 +2,25 @@ package log import "fmt" +type level byte + +const ( + levelDebug level = 1 << iota + levelInfo + levelError +) + +type filter struct { + next Logger + allowed level // XOR'd levels for default case + allowedKeyvals map[keyval]level // When key-value match, use this level +} + +type keyval struct { + key interface{} + value interface{} +} + // NewFilter wraps next and implements filtering. See the commentary on the // Option functions for a detailed description of how to configure levels. If // no options are provided, all leveled log events created with Debug, Info or @@ -17,57 +36,28 @@ func NewFilter(next Logger, options ...Option) Logger { return l } -// AllowLevel returns an option for the given level or error if no option exist -// for such level. -func AllowLevel(lvl string) (Option, error) { - switch lvl { - case "debug": - return AllowDebug(), nil - case "info": - return AllowInfo(), nil - case "error": - return AllowError(), nil - case "none": - return AllowNone(), nil - default: - return nil, fmt.Errorf("Expected either \"info\", \"debug\", \"error\" or \"none\" level, given %s", lvl) - } -} - -type filter struct { - next Logger - allowed level - allowedKeyvals map[keyval]level - errNotAllowed error -} - -type keyval struct { - key interface{} - value interface{} -} - -func (l *filter) Info(msg string, keyvals ...interface{}) error { +func (l *filter) Info(msg string, keyvals ...interface{}) { levelAllowed := l.allowed&levelInfo != 0 if !levelAllowed { - return l.errNotAllowed + return } - return l.next.Info(msg, keyvals...) + l.next.Info(msg, keyvals...) } -func (l *filter) Debug(msg string, keyvals ...interface{}) error { +func (l *filter) Debug(msg string, keyvals ...interface{}) { levelAllowed := l.allowed&levelDebug != 0 if !levelAllowed { - return l.errNotAllowed + return } - return l.next.Debug(msg, keyvals...) + l.next.Debug(msg, keyvals...) } -func (l *filter) Error(msg string, keyvals ...interface{}) error { +func (l *filter) Error(msg string, keyvals ...interface{}) { levelAllowed := l.allowed&levelError != 0 if !levelAllowed { - return l.errNotAllowed + return } - return l.next.Error(msg, keyvals...) + l.next.Error(msg, keyvals...) } // With implements Logger by constructing a new filter with a keyvals appended @@ -89,16 +79,35 @@ func (l *filter) With(keyvals ...interface{}) Logger { for i := len(keyvals) - 2; i >= 0; i -= 2 { for kv, allowed := range l.allowedKeyvals { if keyvals[i] == kv.key && keyvals[i+1] == kv.value { - return &filter{next: l.next.With(keyvals...), allowed: allowed, errNotAllowed: l.errNotAllowed, allowedKeyvals: l.allowedKeyvals} + return &filter{next: l.next.With(keyvals...), allowed: allowed, allowedKeyvals: l.allowedKeyvals} } } } - return &filter{next: l.next.With(keyvals...), allowed: l.allowed, errNotAllowed: l.errNotAllowed, allowedKeyvals: l.allowedKeyvals} + return &filter{next: l.next.With(keyvals...), allowed: l.allowed, allowedKeyvals: l.allowedKeyvals} } +//-------------------------------------------------------------------------------- + // Option sets a parameter for the filter. type Option func(*filter) +// AllowLevel returns an option for the given level or error if no option exist +// for such level. +func AllowLevel(lvl string) (Option, error) { + switch lvl { + case "debug": + return AllowDebug(), nil + case "info": + return AllowInfo(), nil + case "error": + return AllowError(), nil + case "none": + return AllowNone(), nil + default: + return nil, fmt.Errorf("Expected either \"info\", \"debug\", \"error\" or \"none\" level, given %s", lvl) + } +} + // AllowAll is an alias for AllowDebug. func AllowAll() Option { return AllowDebug() @@ -128,14 +137,6 @@ func allowed(allowed level) Option { return func(l *filter) { l.allowed = allowed } } -// ErrNotAllowed sets the error to return from Log when it squelches a log -// event disallowed by the configured Allow[Level] option. By default, -// ErrNotAllowed is nil; in this case the log event is squelched with no -// error. -func ErrNotAllowed(err error) Option { - return func(l *filter) { l.errNotAllowed = err } -} - // AllowDebugWith allows error, info and debug level log events to pass for a specific key value pair. func AllowDebugWith(key interface{}, value interface{}) Option { return func(l *filter) { l.allowedKeyvals[keyval{key, value}] = levelError | levelInfo | levelDebug } @@ -155,11 +156,3 @@ func AllowErrorWith(key interface{}, value interface{}) Option { func AllowNoneWith(key interface{}, value interface{}) Option { return func(l *filter) { l.allowedKeyvals[keyval{key, value}] = 0 } } - -type level byte - -const ( - levelDebug level = 1 << iota - levelInfo - levelError -) diff --git a/log/filter_test.go b/log/filter_test.go index 4665db3d..fafafacb 100644 --- a/log/filter_test.go +++ b/log/filter_test.go @@ -2,7 +2,6 @@ package log_test import ( "bytes" - "errors" "strings" "testing" @@ -71,23 +70,6 @@ func TestVariousLevels(t *testing.T) { } } -func TestErrNotAllowed(t *testing.T) { - myError := errors.New("squelched!") - opts := []log.Option{ - log.AllowError(), - log.ErrNotAllowed(myError), - } - logger := log.NewFilter(log.NewNopLogger(), opts...) - - if want, have := myError, logger.Info("foo", "bar", "baz"); want != have { - t.Errorf("want %#+v, have %#+v", want, have) - } - - if want, have := error(nil), logger.Error("foo", "bar", "baz"); want != have { - t.Errorf("want %#+v, have %#+v", want, have) - } -} - func TestLevelContext(t *testing.T) { var buf bytes.Buffer diff --git a/log/logger.go b/log/logger.go index be273f48..ddb187bc 100644 --- a/log/logger.go +++ b/log/logger.go @@ -8,9 +8,9 @@ import ( // Logger is what any Tendermint library should take. type Logger interface { - Debug(msg string, keyvals ...interface{}) error - Info(msg string, keyvals ...interface{}) error - Error(msg string, keyvals ...interface{}) error + Debug(msg string, keyvals ...interface{}) + Info(msg string, keyvals ...interface{}) + Error(msg string, keyvals ...interface{}) With(keyvals ...interface{}) Logger } diff --git a/log/nop_logger.go b/log/nop_logger.go index 306a8405..12d75abe 100644 --- a/log/nop_logger.go +++ b/log/nop_logger.go @@ -8,17 +8,9 @@ var _ Logger = (*nopLogger)(nil) // NewNopLogger returns a logger that doesn't do anything. func NewNopLogger() Logger { return &nopLogger{} } -func (nopLogger) Info(string, ...interface{}) error { - return nil -} - -func (nopLogger) Debug(string, ...interface{}) error { - return nil -} - -func (nopLogger) Error(string, ...interface{}) error { - return nil -} +func (nopLogger) Info(string, ...interface{}) {} +func (nopLogger) Debug(string, ...interface{}) {} +func (nopLogger) Error(string, ...interface{}) {} func (l *nopLogger) With(...interface{}) Logger { return l diff --git a/log/nop_logger_test.go b/log/nop_logger_test.go deleted file mode 100644 index d2009fdf..00000000 --- a/log/nop_logger_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package log_test - -import ( - "testing" - - "github.com/tendermint/tmlibs/log" -) - -func TestNopLogger(t *testing.T) { - t.Parallel() - logger := log.NewNopLogger() - if err := logger.Info("Hello", "abc", 123); err != nil { - t.Error(err) - } - if err := logger.With("def", "ghi").Debug(""); err != nil { - t.Error(err) - } -} diff --git a/log/tm_logger.go b/log/tm_logger.go index a903dbe8..dc6932dd 100644 --- a/log/tm_logger.go +++ b/log/tm_logger.go @@ -50,21 +50,21 @@ func NewTMLoggerWithColorFn(w io.Writer, colorFn func(keyvals ...interface{}) te } // Info logs a message at level Info. -func (l *tmLogger) Info(msg string, keyvals ...interface{}) error { +func (l *tmLogger) Info(msg string, keyvals ...interface{}) { lWithLevel := kitlevel.Info(l.srcLogger) - return kitlog.With(lWithLevel, msgKey, msg).Log(keyvals...) + kitlog.With(lWithLevel, msgKey, msg).Log(keyvals...) } // Debug logs a message at level Debug. -func (l *tmLogger) Debug(msg string, keyvals ...interface{}) error { +func (l *tmLogger) Debug(msg string, keyvals ...interface{}) { lWithLevel := kitlevel.Debug(l.srcLogger) - return kitlog.With(lWithLevel, msgKey, msg).Log(keyvals...) + kitlog.With(lWithLevel, msgKey, msg).Log(keyvals...) } // Error logs a message at level Error. -func (l *tmLogger) Error(msg string, keyvals ...interface{}) error { +func (l *tmLogger) Error(msg string, keyvals ...interface{}) { lWithLevel := kitlevel.Error(l.srcLogger) - return kitlog.With(lWithLevel, msgKey, msg).Log(keyvals...) + kitlog.With(lWithLevel, msgKey, msg).Log(keyvals...) } // With returns a new contextual logger with keyvals prepended to those passed diff --git a/log/tm_logger_test.go b/log/tm_logger_test.go index 15c940ce..8cd2f827 100644 --- a/log/tm_logger_test.go +++ b/log/tm_logger_test.go @@ -7,17 +7,6 @@ import ( "github.com/tendermint/tmlibs/log" ) -func TestTMLogger(t *testing.T) { - t.Parallel() - logger := log.NewTMLogger(ioutil.Discard) - if err := logger.Info("Hello", "abc", 123); err != nil { - t.Error(err) - } - if err := logger.With("def", "ghi").Debug(""); err != nil { - t.Error(err) - } -} - func BenchmarkTMLoggerSimple(b *testing.B) { benchmarkRunner(b, log.NewTMLogger(ioutil.Discard), baseInfoMessage) } diff --git a/log/tracing_logger.go b/log/tracing_logger.go index 794bdaeb..d2a6ff44 100644 --- a/log/tracing_logger.go +++ b/log/tracing_logger.go @@ -28,16 +28,16 @@ type tracingLogger struct { next Logger } -func (l *tracingLogger) Info(msg string, keyvals ...interface{}) error { - return l.next.Info(msg, formatErrors(keyvals)...) +func (l *tracingLogger) Info(msg string, keyvals ...interface{}) { + l.next.Info(msg, formatErrors(keyvals)...) } -func (l *tracingLogger) Debug(msg string, keyvals ...interface{}) error { - return l.next.Debug(msg, formatErrors(keyvals)...) +func (l *tracingLogger) Debug(msg string, keyvals ...interface{}) { + l.next.Debug(msg, formatErrors(keyvals)...) } -func (l *tracingLogger) Error(msg string, keyvals ...interface{}) error { - return l.next.Error(msg, formatErrors(keyvals)...) +func (l *tracingLogger) Error(msg string, keyvals ...interface{}) { + l.next.Error(msg, formatErrors(keyvals)...) } func (l *tracingLogger) With(keyvals ...interface{}) Logger { diff --git a/pubsub/example_test.go b/pubsub/example_test.go new file mode 100644 index 00000000..3eda7d32 --- /dev/null +++ b/pubsub/example_test.go @@ -0,0 +1,27 @@ +package pubsub_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tmlibs/pubsub" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestExample(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch := make(chan interface{}, 1) + err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"}) + require.NoError(t, err) + assertReceive(t, "Tombstone", ch) +} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go new file mode 100644 index 00000000..52b8361f --- /dev/null +++ b/pubsub/pubsub.go @@ -0,0 +1,253 @@ +// Package pubsub implements a pub-sub model with a single publisher (Server) +// and multiple subscribers (clients). +// +// Though you can have multiple publishers by sharing a pointer to a server or +// by giving the same channel to each publisher and publishing messages from +// that channel (fan-in). +// +// Clients subscribe for messages, which could be of any type, using a query. +// When some message is published, we match it with all queries. If there is a +// match, this message will be pushed to all clients, subscribed to that query. +// See query subpackage for our implementation. +package pubsub + +import ( + "context" + + cmn "github.com/tendermint/tmlibs/common" +) + +type operation int + +const ( + sub operation = iota + pub + unsub + shutdown +) + +type cmd struct { + op operation + query Query + ch chan<- interface{} + clientID string + msg interface{} + tags map[string]interface{} +} + +// Query defines an interface for a query to be used for subscribing. +type Query interface { + Matches(tags map[string]interface{}) bool +} + +// Server allows clients to subscribe/unsubscribe for messages, publishing +// messages with or without tags, and manages internal state. +type Server struct { + cmn.BaseService + + cmds chan cmd + cmdsCap int +} + +// Option sets a parameter for the server. +type Option func(*Server) + +// NewServer returns a new server. See the commentary on the Option functions +// for a detailed description of how to configure buffering. If no options are +// provided, the resulting server's queue is unbuffered. +func NewServer(options ...Option) *Server { + s := &Server{} + s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) + + for _, option := range options { + option(s) + } + + // if BufferCapacity option was not set, the channel is unbuffered + s.cmds = make(chan cmd, s.cmdsCap) + + return s +} + +// BufferCapacity allows you to specify capacity for the internal server's +// queue. Since the server, given Y subscribers, could only process X messages, +// this option could be used to survive spikes (e.g. high amount of +// transactions during peak hours). +func BufferCapacity(cap int) Option { + return func(s *Server) { + if cap > 0 { + s.cmdsCap = cap + } + } +} + +// BufferCapacity returns capacity of the internal server's queue. +func (s Server) BufferCapacity() int { + return s.cmdsCap +} + +// Subscribe creates a subscription for the given client. It accepts a channel +// on which messages matching the given query can be received. If the +// subscription already exists, the old channel will be closed. An error will +// be returned to the caller if the context is canceled. +func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { + select { + case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Unsubscribe removes the subscription on the given query. An error will be +// returned to the caller if the context is canceled. +func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { + select { + case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// UnsubscribeAll removes all client subscriptions. An error will be returned +// to the caller if the context is canceled. +func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { + select { + case s.cmds <- cmd{op: unsub, clientID: clientID}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Publish publishes the given message. An error will be returned to the caller +// if the context is canceled. +func (s *Server) Publish(ctx context.Context, msg interface{}) error { + return s.PublishWithTags(ctx, msg, make(map[string]interface{})) +} + +// PublishWithTags publishes the given message with the set of tags. The set is +// matched with clients queries. If there is a match, the message is sent to +// the client. +func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error { + select { + case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// OnStop implements Service.OnStop by shutting down the server. +func (s *Server) OnStop() { + s.cmds <- cmd{op: shutdown} +} + +// NOTE: not goroutine safe +type state struct { + // query -> client -> ch + queries map[Query]map[string]chan<- interface{} + // client -> query -> struct{} + clients map[string]map[Query]struct{} +} + +// OnStart implements Service.OnStart by starting the server. +func (s *Server) OnStart() error { + go s.loop(state{ + queries: make(map[Query]map[string]chan<- interface{}), + clients: make(map[string]map[Query]struct{}), + }) + return nil +} + +func (s *Server) loop(state state) { +loop: + for cmd := range s.cmds { + switch cmd.op { + case unsub: + if cmd.query != nil { + state.remove(cmd.clientID, cmd.query) + } else { + state.removeAll(cmd.clientID) + } + case shutdown: + for clientID := range state.clients { + state.removeAll(clientID) + } + break loop + case sub: + state.add(cmd.clientID, cmd.query, cmd.ch) + case pub: + state.send(cmd.msg, cmd.tags) + } + } +} + +func (state *state) add(clientID string, q Query, ch chan<- interface{}) { + // add query if needed + if clientToChannelMap, ok := state.queries[q]; !ok { + state.queries[q] = make(map[string]chan<- interface{}) + } else { + // check if already subscribed + if oldCh, ok := clientToChannelMap[clientID]; ok { + close(oldCh) + } + } + + // create subscription + state.queries[q][clientID] = ch + + // add client if needed + if _, ok := state.clients[clientID]; !ok { + state.clients[clientID] = make(map[Query]struct{}) + } + state.clients[clientID][q] = struct{}{} +} + +func (state *state) remove(clientID string, q Query) { + clientToChannelMap, ok := state.queries[q] + if !ok { + return + } + + ch, ok := clientToChannelMap[clientID] + if ok { + close(ch) + + delete(state.clients[clientID], q) + + // if it not subscribed to anything else, remove the client + if len(state.clients[clientID]) == 0 { + delete(state.clients, clientID) + } + + delete(state.queries[q], clientID) + } +} + +func (state *state) removeAll(clientID string) { + queryMap, ok := state.clients[clientID] + if !ok { + return + } + + for q := range queryMap { + ch := state.queries[q][clientID] + close(ch) + + delete(state.queries[q], clientID) + } + + delete(state.clients, clientID) +} + +func (state *state) send(msg interface{}, tags map[string]interface{}) { + for q, clientToChannelMap := range state.queries { + if q.Matches(tags) { + for _, ch := range clientToChannelMap { + ch <- msg + } + } + } +} diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go new file mode 100644 index 00000000..7bf7b41f --- /dev/null +++ b/pubsub/pubsub_test.go @@ -0,0 +1,234 @@ +package pubsub_test + +import ( + "context" + "fmt" + "runtime/debug" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tmlibs/pubsub" + "github.com/tendermint/tmlibs/pubsub/query" +) + +const ( + clientID = "test-client" +) + +func TestSubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch := make(chan interface{}, 1) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + err = s.Publish(ctx, "Ka-Zar") + require.NoError(t, err) + assertReceive(t, "Ka-Zar", ch) + + err = s.Publish(ctx, "Quicksilver") + require.NoError(t, err) + assertReceive(t, "Quicksilver", ch) +} + +func TestDifferentClients(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch1 := make(chan interface{}, 1) + err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Iceman", ch1) + + ch2 := make(chan interface{}, 1) + err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) + require.NoError(t, err) + assertReceive(t, "Ultimo", ch1) + assertReceive(t, "Ultimo", ch2) + + ch3 := make(chan interface{}, 1) + err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) + require.NoError(t, err) + assert.Zero(t, len(ch3)) +} + +func TestClientSubscribesTwice(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + q := query.MustParse("tm.events.type='NewBlock'") + + ch1 := make(chan interface{}, 1) + err := s.Subscribe(ctx, clientID, q, ch1) + require.NoError(t, err) + err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Goblin Queen", ch1) + + ch2 := make(chan interface{}, 1) + err = s.Subscribe(ctx, clientID, q, ch2) + require.NoError(t, err) + + _, ok := <-ch1 + assert.False(t, ok) + + err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) + require.NoError(t, err) + assertReceive(t, "Spider-Man", ch2) +} + +func TestUnsubscribe(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch := make(chan interface{}) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch) + require.NoError(t, err) + err = s.Unsubscribe(ctx, clientID, query.Empty{}) + require.NoError(t, err) + + err = s.Publish(ctx, "Nick Fury") + require.NoError(t, err) + assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") + + _, ok := <-ch + assert.False(t, ok) +} + +func TestUnsubscribeAll(t *testing.T) { + s := pubsub.NewServer() + s.SetLogger(log.TestingLogger()) + s.Start() + defer s.Stop() + + ctx := context.Background() + ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) + err := s.Subscribe(ctx, clientID, query.Empty{}, ch1) + require.NoError(t, err) + err = s.Subscribe(ctx, clientID, query.Empty{}, ch2) + require.NoError(t, err) + + err = s.UnsubscribeAll(ctx, clientID) + require.NoError(t, err) + + err = s.Publish(ctx, "Nick Fury") + require.NoError(t, err) + assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") + assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") + + _, ok := <-ch1 + assert.False(t, ok) + _, ok = <-ch2 + assert.False(t, ok) +} + +func TestBufferCapacity(t *testing.T) { + s := pubsub.NewServer(pubsub.BufferCapacity(2)) + s.SetLogger(log.TestingLogger()) + + assert.Equal(t, 2, s.BufferCapacity()) + + ctx := context.Background() + err := s.Publish(ctx, "Nighthawk") + require.NoError(t, err) + err = s.Publish(ctx, "Sage") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + err = s.Publish(ctx, "Ironclad") + if assert.Error(t, err) { + assert.Equal(t, context.DeadlineExceeded, err) + } +} + +func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } +func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } +func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } + +func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) } +func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) } +func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) } + +func benchmarkNClients(n int, b *testing.B) { + s := pubsub.NewServer() + s.Start() + defer s.Stop() + + ctx := context.Background() + for i := 0; i < n; i++ { + ch := make(chan interface{}) + go func() { + for range ch { + } + }() + s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) + } +} + +func benchmarkNClientsOneQuery(n int, b *testing.B) { + s := pubsub.NewServer() + s.Start() + defer s.Stop() + + ctx := context.Background() + q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1") + for i := 0; i < n; i++ { + ch := make(chan interface{}) + go func() { + for range ch { + } + }() + s.Subscribe(ctx, clientID, q, ch) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) + } +} + +/////////////////////////////////////////////////////////////////////////////// +/// HELPERS +/////////////////////////////////////////////////////////////////////////////// + +func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { + select { + case actual := <-ch: + if actual != nil { + assert.Equal(t, expected, actual, msgAndArgs...) + } + case <-time.After(1 * time.Second): + t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) + debug.PrintStack() + } +} diff --git a/pubsub/query/Makefile b/pubsub/query/Makefile new file mode 100644 index 00000000..ca3ff5b5 --- /dev/null +++ b/pubsub/query/Makefile @@ -0,0 +1,11 @@ +gen_query_parser: + @go get github.com/pointlander/peg + peg -inline -switch query.peg + +fuzzy_test: + @go get github.com/dvyukov/go-fuzz/go-fuzz + @go get github.com/dvyukov/go-fuzz/go-fuzz-build + go-fuzz-build github.com/tendermint/tmlibs/pubsub/query/fuzz_test + go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output + +.PHONY: gen_query_parser fuzzy_test diff --git a/pubsub/query/empty.go b/pubsub/query/empty.go new file mode 100644 index 00000000..2d60a892 --- /dev/null +++ b/pubsub/query/empty.go @@ -0,0 +1,14 @@ +package query + +// Empty query matches any set of tags. +type Empty struct { +} + +// Matches always returns true. +func (Empty) Matches(tags map[string]interface{}) bool { + return true +} + +func (Empty) String() string { + return "empty" +} diff --git a/pubsub/query/empty_test.go b/pubsub/query/empty_test.go new file mode 100644 index 00000000..663acb19 --- /dev/null +++ b/pubsub/query/empty_test.go @@ -0,0 +1,16 @@ +package query_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestEmptyQueryMatchesAnything(t *testing.T) { + q := query.Empty{} + assert.True(t, q.Matches(map[string]interface{}{})) + assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"})) + assert.True(t, q.Matches(map[string]interface{}{"Route": 66})) + assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"})) +} diff --git a/pubsub/query/fuzz_test/main.go b/pubsub/query/fuzz_test/main.go new file mode 100644 index 00000000..3b0ef147 --- /dev/null +++ b/pubsub/query/fuzz_test/main.go @@ -0,0 +1,30 @@ +package fuzz_test + +import ( + "fmt" + + "github.com/tendermint/tmlibs/pubsub/query" +) + +func Fuzz(data []byte) int { + sdata := string(data) + q0, err := query.New(sdata) + if err != nil { + return 0 + } + + sdata1 := q0.String() + q1, err := query.New(sdata1) + if err != nil { + panic(err) + } + + sdata2 := q1.String() + if sdata1 != sdata2 { + fmt.Printf("q0: %q\n", sdata1) + fmt.Printf("q1: %q\n", sdata2) + panic("query changed") + } + + return 1 +} diff --git a/pubsub/query/parser_test.go b/pubsub/query/parser_test.go new file mode 100644 index 00000000..165ddda7 --- /dev/null +++ b/pubsub/query/parser_test.go @@ -0,0 +1,91 @@ +package query_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tmlibs/pubsub/query" +) + +// TODO: fuzzy testing? +func TestParser(t *testing.T) { + cases := []struct { + query string + valid bool + }{ + {"tm.events.type='NewBlock'", true}, + {"tm.events.type = 'NewBlock'", true}, + {"tm.events.name = ''", true}, + {"tm.events.type='TIME'", true}, + {"tm.events.type='DATE'", true}, + {"tm.events.type='='", true}, + {"tm.events.type='TIME", false}, + {"tm.events.type=TIME'", false}, + {"tm.events.type==", false}, + {"tm.events.type=NewBlock", false}, + {">==", false}, + {"tm.events.type 'NewBlock' =", false}, + {"tm.events.type>'NewBlock'", false}, + {"", false}, + {"=", false}, + {"='NewBlock'", false}, + {"tm.events.type=", false}, + + {"tm.events.typeNewBlock", false}, + {"tm.events.type'NewBlock'", false}, + {"'NewBlock'", false}, + {"NewBlock", false}, + {"", false}, + + {"tm.events.type='NewBlock' AND abci.account.name='Igor'", true}, + {"tm.events.type='NewBlock' AND", false}, + {"tm.events.type='NewBlock' AN", false}, + {"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false}, + {"AND tm.events.type='NewBlock' ", false}, + + {"abci.account.name CONTAINS 'Igor'", true}, + + {"tx.date > DATE 2013-05-03", true}, + {"tx.date < DATE 2013-05-03", true}, + {"tx.date <= DATE 2013-05-03", true}, + {"tx.date >= DATE 2013-05-03", true}, + {"tx.date >= DAT 2013-05-03", false}, + {"tx.date <= DATE2013-05-03", false}, + {"tx.date <= DATE -05-03", false}, + {"tx.date >= DATE 20130503", false}, + {"tx.date >= DATE 2013+01-03", false}, + // incorrect year, month, day + {"tx.date >= DATE 0013-01-03", false}, + {"tx.date >= DATE 2013-31-03", false}, + {"tx.date >= DATE 2013-01-83", false}, + + {"tx.date > TIME 2013-05-03T14:45:00+07:00", true}, + {"tx.date < TIME 2013-05-03T14:45:00-02:00", true}, + {"tx.date <= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME 2013-05-03T14:45:00Z", true}, + {"tx.date >= TIME2013-05-03T14:45:00Z", false}, + {"tx.date = IME 2013-05-03T14:45:00Z", false}, + {"tx.date = TIME 2013-05-:45:00Z", false}, + {"tx.date >= TIME 2013-05-03T14:45:00", false}, + {"tx.date >= TIME 0013-00-00T14:45:00Z", false}, + {"tx.date >= TIME 2013+05=03T14:45:00Z", false}, + + {"account.balance=100", true}, + {"account.balance >= 200", true}, + {"account.balance >= -300", false}, + {"account.balance >>= 400", false}, + {"account.balance=33.22.1", false}, + + {"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true}, + {"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false}, + } + + for _, c := range cases { + _, err := query.New(c.query) + if c.valid { + assert.NoError(t, err, "Query was '%s'", c.query) + } else { + assert.Error(t, err, "Query was '%s'", c.query) + } + } +} diff --git a/pubsub/query/query.go b/pubsub/query/query.go new file mode 100644 index 00000000..fdfb87d7 --- /dev/null +++ b/pubsub/query/query.go @@ -0,0 +1,261 @@ +// Package query provides a parser for a custom query format: +// +// abci.invoice.number=22 AND abci.invoice.owner=Ivan +// +// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. +// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics +// +// It has a support for numbers (integer and floating point), dates and times. +package query + +import ( + "fmt" + "reflect" + "strconv" + "strings" + "time" +) + +// Query holds the query string and the query parser. +type Query struct { + str string + parser *QueryParser +} + +// New parses the given string and returns a query or error if the string is +// invalid. +func New(s string) (*Query, error) { + p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} + p.Init() + if err := p.Parse(); err != nil { + return nil, err + } + return &Query{str: s, parser: p}, nil +} + +// MustParse turns the given string into a query or panics; for tests or others +// cases where you know the string is valid. +func MustParse(s string) *Query { + q, err := New(s) + if err != nil { + panic(fmt.Sprintf("failed to parse %s: %v", s, err)) + } + return q +} + +// String returns the original string. +func (q *Query) String() string { + return q.str +} + +type operator uint8 + +const ( + opLessEqual operator = iota + opGreaterEqual + opLess + opGreater + opEqual + opContains +) + +// Matches returns true if the query matches the given set of tags, false otherwise. +// +// For example, query "name=John" matches tags = {"name": "John"}. More +// examples could be found in parser_test.go and query_test.go. +func (q *Query) Matches(tags map[string]interface{}) bool { + if len(tags) == 0 { + return false + } + + buffer, begin, end := q.parser.Buffer, 0, 0 + + var tag string + var op operator + + // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") + for _, token := range q.parser.Tokens() { + switch token.pegRule { + + case rulePegText: + begin, end = int(token.begin), int(token.end) + case ruletag: + tag = buffer[begin:end] + case rulele: + op = opLessEqual + case rulege: + op = opGreaterEqual + case rulel: + op = opLess + case ruleg: + op = opGreater + case ruleequal: + op = opEqual + case rulecontains: + op = opContains + case rulevalue: + // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") + valueWithoutSingleQuotes := buffer[begin+1 : end-1] + + // see if the triplet (tag, operator, operand) matches any tag + // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } + if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), tags) { + return false + } + case rulenumber: + number := buffer[begin:end] + if strings.Contains(number, ".") { // if it looks like a floating-point number + value, err := strconv.ParseFloat(number, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } else { + value, err := strconv.ParseInt(number, 10, 64) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } + case ruletime: + value, err := time.Parse(time.RFC3339, buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + case ruledate: + value, err := time.Parse("2006-01-02", buffer[begin:end]) + if err != nil { + panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) + } + if !match(tag, op, reflect.ValueOf(value), tags) { + return false + } + } + } + + return true +} + +// match returns true if the given triplet (tag, operator, operand) matches any tag. +// +// First, it looks up the tag in tags and if it finds one, tries to compare the +// value from it to the operand using the operator. +// +// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } +func match(tag string, op operator, operand reflect.Value, tags map[string]interface{}) bool { + // look up the tag from the query in tags + value, ok := tags[tag] + if !ok { + return false + } + switch operand.Kind() { + case reflect.Struct: // time + operandAsTime := operand.Interface().(time.Time) + v, ok := value.(time.Time) + if !ok { // if value from tags is not time.Time + return false + } + switch op { + case opLessEqual: + return v.Before(operandAsTime) || v.Equal(operandAsTime) + case opGreaterEqual: + return v.Equal(operandAsTime) || v.After(operandAsTime) + case opLess: + return v.Before(operandAsTime) + case opGreater: + return v.After(operandAsTime) + case opEqual: + return v.Equal(operandAsTime) + } + case reflect.Float64: + operandFloat64 := operand.Interface().(float64) + var v float64 + // try our best to convert value from tags to float64 + switch vt := value.(type) { + case float64: + v = vt + case float32: + v = float64(vt) + case int: + v = float64(vt) + case int8: + v = float64(vt) + case int16: + v = float64(vt) + case int32: + v = float64(vt) + case int64: + v = float64(vt) + default: // fail for all other types + panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64)) + } + switch op { + case opLessEqual: + return v <= operandFloat64 + case opGreaterEqual: + return v >= operandFloat64 + case opLess: + return v < operandFloat64 + case opGreater: + return v > operandFloat64 + case opEqual: + return v == operandFloat64 + } + case reflect.Int64: + operandInt := operand.Interface().(int64) + var v int64 + // try our best to convert value from tags to int64 + switch vt := value.(type) { + case int64: + v = vt + case int8: + v = int64(vt) + case int16: + v = int64(vt) + case int32: + v = int64(vt) + case int: + v = int64(vt) + case float64: + v = int64(vt) + case float32: + v = int64(vt) + default: // fail for all other types + panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt)) + } + switch op { + case opLessEqual: + return v <= operandInt + case opGreaterEqual: + return v >= operandInt + case opLess: + return v < operandInt + case opGreater: + return v > operandInt + case opEqual: + return v == operandInt + } + case reflect.String: + v, ok := value.(string) + if !ok { // if value from tags is not string + return false + } + switch op { + case opEqual: + return v == operand.String() + case opContains: + return strings.Contains(v, operand.String()) + } + default: + panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) + } + + return false +} diff --git a/pubsub/query/query.peg b/pubsub/query/query.peg new file mode 100644 index 00000000..739892e4 --- /dev/null +++ b/pubsub/query/query.peg @@ -0,0 +1,33 @@ +package query + +type QueryParser Peg { +} + +e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !. + +condition <- tag ' '* (le ' '* (number / time / date) + / ge ' '* (number / time / date) + / l ' '* (number / time / date) + / g ' '* (number / time / date) + / equal ' '* (number / time / date / value) + / contains ' '* value + ) + +tag <- < (![ \t\n\r\\()"'=><] .)+ > +value <- < '\'' (!["'] .)* '\''> +number <- < ('0' + / [1-9] digit* ('.' digit*)?) > +digit <- [0-9] +time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') > +date <- "DATE " < year '-' month '-' day > +year <- ('1' / '2') digit digit digit +month <- ('0' / '1') digit +day <- ('0' / '1' / '2' / '3') digit +and <- "AND" + +equal <- "=" +contains <- "CONTAINS" +le <- "<=" +ge <- ">=" +l <- "<" +g <- ">" diff --git a/pubsub/query/query.peg.go b/pubsub/query/query.peg.go new file mode 100644 index 00000000..37ce75cd --- /dev/null +++ b/pubsub/query/query.peg.go @@ -0,0 +1,1552 @@ +package query + +import ( + "fmt" + "math" + "sort" + "strconv" +) + +const endSymbol rune = 1114112 + +/* The rule types inferred from the grammar are below. */ +type pegRule uint8 + +const ( + ruleUnknown pegRule = iota + rulee + rulecondition + ruletag + rulevalue + rulenumber + ruledigit + ruletime + ruledate + ruleyear + rulemonth + ruleday + ruleand + ruleequal + rulecontains + rulele + rulege + rulel + ruleg + rulePegText +) + +var rul3s = [...]string{ + "Unknown", + "e", + "condition", + "tag", + "value", + "number", + "digit", + "time", + "date", + "year", + "month", + "day", + "and", + "equal", + "contains", + "le", + "ge", + "l", + "g", + "PegText", +} + +type token32 struct { + pegRule + begin, end uint32 +} + +func (t *token32) String() string { + return fmt.Sprintf("\x1B[34m%v\x1B[m %v %v", rul3s[t.pegRule], t.begin, t.end) +} + +type node32 struct { + token32 + up, next *node32 +} + +func (node *node32) print(pretty bool, buffer string) { + var print func(node *node32, depth int) + print = func(node *node32, depth int) { + for node != nil { + for c := 0; c < depth; c++ { + fmt.Printf(" ") + } + rule := rul3s[node.pegRule] + quote := strconv.Quote(string(([]rune(buffer)[node.begin:node.end]))) + if !pretty { + fmt.Printf("%v %v\n", rule, quote) + } else { + fmt.Printf("\x1B[34m%v\x1B[m %v\n", rule, quote) + } + if node.up != nil { + print(node.up, depth+1) + } + node = node.next + } + } + print(node, 0) +} + +func (node *node32) Print(buffer string) { + node.print(false, buffer) +} + +func (node *node32) PrettyPrint(buffer string) { + node.print(true, buffer) +} + +type tokens32 struct { + tree []token32 +} + +func (t *tokens32) Trim(length uint32) { + t.tree = t.tree[:length] +} + +func (t *tokens32) Print() { + for _, token := range t.tree { + fmt.Println(token.String()) + } +} + +func (t *tokens32) AST() *node32 { + type element struct { + node *node32 + down *element + } + tokens := t.Tokens() + var stack *element + for _, token := range tokens { + if token.begin == token.end { + continue + } + node := &node32{token32: token} + for stack != nil && stack.node.begin >= token.begin && stack.node.end <= token.end { + stack.node.next = node.up + node.up = stack.node + stack = stack.down + } + stack = &element{node: node, down: stack} + } + if stack != nil { + return stack.node + } + return nil +} + +func (t *tokens32) PrintSyntaxTree(buffer string) { + t.AST().Print(buffer) +} + +func (t *tokens32) PrettyPrintSyntaxTree(buffer string) { + t.AST().PrettyPrint(buffer) +} + +func (t *tokens32) Add(rule pegRule, begin, end, index uint32) { + if tree := t.tree; int(index) >= len(tree) { + expanded := make([]token32, 2*len(tree)) + copy(expanded, tree) + t.tree = expanded + } + t.tree[index] = token32{ + pegRule: rule, + begin: begin, + end: end, + } +} + +func (t *tokens32) Tokens() []token32 { + return t.tree +} + +type QueryParser struct { + Buffer string + buffer []rune + rules [20]func() bool + parse func(rule ...int) error + reset func() + Pretty bool + tokens32 +} + +func (p *QueryParser) Parse(rule ...int) error { + return p.parse(rule...) +} + +func (p *QueryParser) Reset() { + p.reset() +} + +type textPosition struct { + line, symbol int +} + +type textPositionMap map[int]textPosition + +func translatePositions(buffer []rune, positions []int) textPositionMap { + length, translations, j, line, symbol := len(positions), make(textPositionMap, len(positions)), 0, 1, 0 + sort.Ints(positions) + +search: + for i, c := range buffer { + if c == '\n' { + line, symbol = line+1, 0 + } else { + symbol++ + } + if i == positions[j] { + translations[positions[j]] = textPosition{line, symbol} + for j++; j < length; j++ { + if i != positions[j] { + continue search + } + } + break search + } + } + + return translations +} + +type parseError struct { + p *QueryParser + max token32 +} + +func (e *parseError) Error() string { + tokens, error := []token32{e.max}, "\n" + positions, p := make([]int, 2*len(tokens)), 0 + for _, token := range tokens { + positions[p], p = int(token.begin), p+1 + positions[p], p = int(token.end), p+1 + } + translations := translatePositions(e.p.buffer, positions) + format := "parse error near %v (line %v symbol %v - line %v symbol %v):\n%v\n" + if e.p.Pretty { + format = "parse error near \x1B[34m%v\x1B[m (line %v symbol %v - line %v symbol %v):\n%v\n" + } + for _, token := range tokens { + begin, end := int(token.begin), int(token.end) + error += fmt.Sprintf(format, + rul3s[token.pegRule], + translations[begin].line, translations[begin].symbol, + translations[end].line, translations[end].symbol, + strconv.Quote(string(e.p.buffer[begin:end]))) + } + + return error +} + +func (p *QueryParser) PrintSyntaxTree() { + if p.Pretty { + p.tokens32.PrettyPrintSyntaxTree(p.Buffer) + } else { + p.tokens32.PrintSyntaxTree(p.Buffer) + } +} + +func (p *QueryParser) Init() { + var ( + max token32 + position, tokenIndex uint32 + buffer []rune + ) + p.reset = func() { + max = token32{} + position, tokenIndex = 0, 0 + + p.buffer = []rune(p.Buffer) + if len(p.buffer) == 0 || p.buffer[len(p.buffer)-1] != endSymbol { + p.buffer = append(p.buffer, endSymbol) + } + buffer = p.buffer + } + p.reset() + + _rules := p.rules + tree := tokens32{tree: make([]token32, math.MaxInt16)} + p.parse = func(rule ...int) error { + r := 1 + if len(rule) > 0 { + r = rule[0] + } + matches := p.rules[r]() + p.tokens32 = tree + if matches { + p.Trim(tokenIndex) + return nil + } + return &parseError{p, max} + } + + add := func(rule pegRule, begin uint32) { + tree.Add(rule, begin, position, tokenIndex) + tokenIndex++ + if begin != position && position > max.end { + max = token32{rule, begin, position} + } + } + + matchDot := func() bool { + if buffer[position] != endSymbol { + position++ + return true + } + return false + } + + /*matchChar := func(c byte) bool { + if buffer[position] == c { + position++ + return true + } + return false + }*/ + + /*matchRange := func(lower byte, upper byte) bool { + if c := buffer[position]; c >= lower && c <= upper { + position++ + return true + } + return false + }*/ + + _rules = [...]func() bool{ + nil, + /* 0 e <- <('"' condition (' '+ and ' '+ condition)* '"' !.)> */ + func() bool { + position0, tokenIndex0 := position, tokenIndex + { + position1 := position + if buffer[position] != rune('"') { + goto l0 + } + position++ + if !_rules[rulecondition]() { + goto l0 + } + l2: + { + position3, tokenIndex3 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l3 + } + position++ + l4: + { + position5, tokenIndex5 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l5 + } + position++ + goto l4 + l5: + position, tokenIndex = position5, tokenIndex5 + } + { + position6 := position + { + position7, tokenIndex7 := position, tokenIndex + if buffer[position] != rune('a') { + goto l8 + } + position++ + goto l7 + l8: + position, tokenIndex = position7, tokenIndex7 + if buffer[position] != rune('A') { + goto l3 + } + position++ + } + l7: + { + position9, tokenIndex9 := position, tokenIndex + if buffer[position] != rune('n') { + goto l10 + } + position++ + goto l9 + l10: + position, tokenIndex = position9, tokenIndex9 + if buffer[position] != rune('N') { + goto l3 + } + position++ + } + l9: + { + position11, tokenIndex11 := position, tokenIndex + if buffer[position] != rune('d') { + goto l12 + } + position++ + goto l11 + l12: + position, tokenIndex = position11, tokenIndex11 + if buffer[position] != rune('D') { + goto l3 + } + position++ + } + l11: + add(ruleand, position6) + } + if buffer[position] != rune(' ') { + goto l3 + } + position++ + l13: + { + position14, tokenIndex14 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l14 + } + position++ + goto l13 + l14: + position, tokenIndex = position14, tokenIndex14 + } + if !_rules[rulecondition]() { + goto l3 + } + goto l2 + l3: + position, tokenIndex = position3, tokenIndex3 + } + if buffer[position] != rune('"') { + goto l0 + } + position++ + { + position15, tokenIndex15 := position, tokenIndex + if !matchDot() { + goto l15 + } + goto l0 + l15: + position, tokenIndex = position15, tokenIndex15 + } + add(rulee, position1) + } + return true + l0: + position, tokenIndex = position0, tokenIndex0 + return false + }, + /* 1 condition <- <(tag ' '* ((le ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / (ge ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number))) / ((&('=') (equal ' '* ((&('\'') value) | (&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('>') (g ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('<') (l ' '* ((&('D' | 'd') date) | (&('T' | 't') time) | (&('0' | '1' | '2' | '3' | '4' | '5' | '6' | '7' | '8' | '9') number)))) | (&('C' | 'c') (contains ' '* value)))))> */ + func() bool { + position16, tokenIndex16 := position, tokenIndex + { + position17 := position + { + position18 := position + { + position19 := position + { + position22, tokenIndex22 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l22 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l22 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l22 + } + position++ + break + case '\'': + if buffer[position] != rune('\'') { + goto l22 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l22 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l22 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l22 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l22 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l22 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l22 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l22 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l22 + } + position++ + break + } + } + + goto l16 + l22: + position, tokenIndex = position22, tokenIndex22 + } + if !matchDot() { + goto l16 + } + l20: + { + position21, tokenIndex21 := position, tokenIndex + { + position24, tokenIndex24 := position, tokenIndex + { + switch buffer[position] { + case '<': + if buffer[position] != rune('<') { + goto l24 + } + position++ + break + case '>': + if buffer[position] != rune('>') { + goto l24 + } + position++ + break + case '=': + if buffer[position] != rune('=') { + goto l24 + } + position++ + break + case '\'': + if buffer[position] != rune('\'') { + goto l24 + } + position++ + break + case '"': + if buffer[position] != rune('"') { + goto l24 + } + position++ + break + case ')': + if buffer[position] != rune(')') { + goto l24 + } + position++ + break + case '(': + if buffer[position] != rune('(') { + goto l24 + } + position++ + break + case '\\': + if buffer[position] != rune('\\') { + goto l24 + } + position++ + break + case '\r': + if buffer[position] != rune('\r') { + goto l24 + } + position++ + break + case '\n': + if buffer[position] != rune('\n') { + goto l24 + } + position++ + break + case '\t': + if buffer[position] != rune('\t') { + goto l24 + } + position++ + break + default: + if buffer[position] != rune(' ') { + goto l24 + } + position++ + break + } + } + + goto l21 + l24: + position, tokenIndex = position24, tokenIndex24 + } + if !matchDot() { + goto l21 + } + goto l20 + l21: + position, tokenIndex = position21, tokenIndex21 + } + add(rulePegText, position19) + } + add(ruletag, position18) + } + l26: + { + position27, tokenIndex27 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l27 + } + position++ + goto l26 + l27: + position, tokenIndex = position27, tokenIndex27 + } + { + position28, tokenIndex28 := position, tokenIndex + { + position30 := position + if buffer[position] != rune('<') { + goto l29 + } + position++ + if buffer[position] != rune('=') { + goto l29 + } + position++ + add(rulele, position30) + } + l31: + { + position32, tokenIndex32 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l32 + } + position++ + goto l31 + l32: + position, tokenIndex = position32, tokenIndex32 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l29 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l29 + } + break + default: + if !_rules[rulenumber]() { + goto l29 + } + break + } + } + + goto l28 + l29: + position, tokenIndex = position28, tokenIndex28 + { + position35 := position + if buffer[position] != rune('>') { + goto l34 + } + position++ + if buffer[position] != rune('=') { + goto l34 + } + position++ + add(rulege, position35) + } + l36: + { + position37, tokenIndex37 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l37 + } + position++ + goto l36 + l37: + position, tokenIndex = position37, tokenIndex37 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l34 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l34 + } + break + default: + if !_rules[rulenumber]() { + goto l34 + } + break + } + } + + goto l28 + l34: + position, tokenIndex = position28, tokenIndex28 + { + switch buffer[position] { + case '=': + { + position40 := position + if buffer[position] != rune('=') { + goto l16 + } + position++ + add(ruleequal, position40) + } + l41: + { + position42, tokenIndex42 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l42 + } + position++ + goto l41 + l42: + position, tokenIndex = position42, tokenIndex42 + } + { + switch buffer[position] { + case '\'': + if !_rules[rulevalue]() { + goto l16 + } + break + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + case '>': + { + position44 := position + if buffer[position] != rune('>') { + goto l16 + } + position++ + add(ruleg, position44) + } + l45: + { + position46, tokenIndex46 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l46 + } + position++ + goto l45 + l46: + position, tokenIndex = position46, tokenIndex46 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + case '<': + { + position48 := position + if buffer[position] != rune('<') { + goto l16 + } + position++ + add(rulel, position48) + } + l49: + { + position50, tokenIndex50 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l50 + } + position++ + goto l49 + l50: + position, tokenIndex = position50, tokenIndex50 + } + { + switch buffer[position] { + case 'D', 'd': + if !_rules[ruledate]() { + goto l16 + } + break + case 'T', 't': + if !_rules[ruletime]() { + goto l16 + } + break + default: + if !_rules[rulenumber]() { + goto l16 + } + break + } + } + + break + default: + { + position52 := position + { + position53, tokenIndex53 := position, tokenIndex + if buffer[position] != rune('c') { + goto l54 + } + position++ + goto l53 + l54: + position, tokenIndex = position53, tokenIndex53 + if buffer[position] != rune('C') { + goto l16 + } + position++ + } + l53: + { + position55, tokenIndex55 := position, tokenIndex + if buffer[position] != rune('o') { + goto l56 + } + position++ + goto l55 + l56: + position, tokenIndex = position55, tokenIndex55 + if buffer[position] != rune('O') { + goto l16 + } + position++ + } + l55: + { + position57, tokenIndex57 := position, tokenIndex + if buffer[position] != rune('n') { + goto l58 + } + position++ + goto l57 + l58: + position, tokenIndex = position57, tokenIndex57 + if buffer[position] != rune('N') { + goto l16 + } + position++ + } + l57: + { + position59, tokenIndex59 := position, tokenIndex + if buffer[position] != rune('t') { + goto l60 + } + position++ + goto l59 + l60: + position, tokenIndex = position59, tokenIndex59 + if buffer[position] != rune('T') { + goto l16 + } + position++ + } + l59: + { + position61, tokenIndex61 := position, tokenIndex + if buffer[position] != rune('a') { + goto l62 + } + position++ + goto l61 + l62: + position, tokenIndex = position61, tokenIndex61 + if buffer[position] != rune('A') { + goto l16 + } + position++ + } + l61: + { + position63, tokenIndex63 := position, tokenIndex + if buffer[position] != rune('i') { + goto l64 + } + position++ + goto l63 + l64: + position, tokenIndex = position63, tokenIndex63 + if buffer[position] != rune('I') { + goto l16 + } + position++ + } + l63: + { + position65, tokenIndex65 := position, tokenIndex + if buffer[position] != rune('n') { + goto l66 + } + position++ + goto l65 + l66: + position, tokenIndex = position65, tokenIndex65 + if buffer[position] != rune('N') { + goto l16 + } + position++ + } + l65: + { + position67, tokenIndex67 := position, tokenIndex + if buffer[position] != rune('s') { + goto l68 + } + position++ + goto l67 + l68: + position, tokenIndex = position67, tokenIndex67 + if buffer[position] != rune('S') { + goto l16 + } + position++ + } + l67: + add(rulecontains, position52) + } + l69: + { + position70, tokenIndex70 := position, tokenIndex + if buffer[position] != rune(' ') { + goto l70 + } + position++ + goto l69 + l70: + position, tokenIndex = position70, tokenIndex70 + } + if !_rules[rulevalue]() { + goto l16 + } + break + } + } + + } + l28: + add(rulecondition, position17) + } + return true + l16: + position, tokenIndex = position16, tokenIndex16 + return false + }, + /* 2 tag <- <<(!((&('<') '<') | (&('>') '>') | (&('=') '=') | (&('\'') '\'') | (&('"') '"') | (&(')') ')') | (&('(') '(') | (&('\\') '\\') | (&('\r') '\r') | (&('\n') '\n') | (&('\t') '\t') | (&(' ') ' ')) .)+>> */ + nil, + /* 3 value <- <<('\'' (!('"' / '\'') .)* '\'')>> */ + func() bool { + position72, tokenIndex72 := position, tokenIndex + { + position73 := position + { + position74 := position + if buffer[position] != rune('\'') { + goto l72 + } + position++ + l75: + { + position76, tokenIndex76 := position, tokenIndex + { + position77, tokenIndex77 := position, tokenIndex + { + position78, tokenIndex78 := position, tokenIndex + if buffer[position] != rune('"') { + goto l79 + } + position++ + goto l78 + l79: + position, tokenIndex = position78, tokenIndex78 + if buffer[position] != rune('\'') { + goto l77 + } + position++ + } + l78: + goto l76 + l77: + position, tokenIndex = position77, tokenIndex77 + } + if !matchDot() { + goto l76 + } + goto l75 + l76: + position, tokenIndex = position76, tokenIndex76 + } + if buffer[position] != rune('\'') { + goto l72 + } + position++ + add(rulePegText, position74) + } + add(rulevalue, position73) + } + return true + l72: + position, tokenIndex = position72, tokenIndex72 + return false + }, + /* 4 number <- <<('0' / ([1-9] digit* ('.' digit*)?))>> */ + func() bool { + position80, tokenIndex80 := position, tokenIndex + { + position81 := position + { + position82 := position + { + position83, tokenIndex83 := position, tokenIndex + if buffer[position] != rune('0') { + goto l84 + } + position++ + goto l83 + l84: + position, tokenIndex = position83, tokenIndex83 + if c := buffer[position]; c < rune('1') || c > rune('9') { + goto l80 + } + position++ + l85: + { + position86, tokenIndex86 := position, tokenIndex + if !_rules[ruledigit]() { + goto l86 + } + goto l85 + l86: + position, tokenIndex = position86, tokenIndex86 + } + { + position87, tokenIndex87 := position, tokenIndex + if buffer[position] != rune('.') { + goto l87 + } + position++ + l89: + { + position90, tokenIndex90 := position, tokenIndex + if !_rules[ruledigit]() { + goto l90 + } + goto l89 + l90: + position, tokenIndex = position90, tokenIndex90 + } + goto l88 + l87: + position, tokenIndex = position87, tokenIndex87 + } + l88: + } + l83: + add(rulePegText, position82) + } + add(rulenumber, position81) + } + return true + l80: + position, tokenIndex = position80, tokenIndex80 + return false + }, + /* 5 digit <- <[0-9]> */ + func() bool { + position91, tokenIndex91 := position, tokenIndex + { + position92 := position + if c := buffer[position]; c < rune('0') || c > rune('9') { + goto l91 + } + position++ + add(ruledigit, position92) + } + return true + l91: + position, tokenIndex = position91, tokenIndex91 + return false + }, + /* 6 time <- <(('t' / 'T') ('i' / 'I') ('m' / 'M') ('e' / 'E') ' ' <(year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit ((('-' / '+') digit digit ':' digit digit) / 'Z'))>)> */ + func() bool { + position93, tokenIndex93 := position, tokenIndex + { + position94 := position + { + position95, tokenIndex95 := position, tokenIndex + if buffer[position] != rune('t') { + goto l96 + } + position++ + goto l95 + l96: + position, tokenIndex = position95, tokenIndex95 + if buffer[position] != rune('T') { + goto l93 + } + position++ + } + l95: + { + position97, tokenIndex97 := position, tokenIndex + if buffer[position] != rune('i') { + goto l98 + } + position++ + goto l97 + l98: + position, tokenIndex = position97, tokenIndex97 + if buffer[position] != rune('I') { + goto l93 + } + position++ + } + l97: + { + position99, tokenIndex99 := position, tokenIndex + if buffer[position] != rune('m') { + goto l100 + } + position++ + goto l99 + l100: + position, tokenIndex = position99, tokenIndex99 + if buffer[position] != rune('M') { + goto l93 + } + position++ + } + l99: + { + position101, tokenIndex101 := position, tokenIndex + if buffer[position] != rune('e') { + goto l102 + } + position++ + goto l101 + l102: + position, tokenIndex = position101, tokenIndex101 + if buffer[position] != rune('E') { + goto l93 + } + position++ + } + l101: + if buffer[position] != rune(' ') { + goto l93 + } + position++ + { + position103 := position + if !_rules[ruleyear]() { + goto l93 + } + if buffer[position] != rune('-') { + goto l93 + } + position++ + if !_rules[rulemonth]() { + goto l93 + } + if buffer[position] != rune('-') { + goto l93 + } + position++ + if !_rules[ruleday]() { + goto l93 + } + if buffer[position] != rune('T') { + goto l93 + } + position++ + if !_rules[ruledigit]() { + goto l93 + } + if !_rules[ruledigit]() { + goto l93 + } + if buffer[position] != rune(':') { + goto l93 + } + position++ + if !_rules[ruledigit]() { + goto l93 + } + if !_rules[ruledigit]() { + goto l93 + } + if buffer[position] != rune(':') { + goto l93 + } + position++ + if !_rules[ruledigit]() { + goto l93 + } + if !_rules[ruledigit]() { + goto l93 + } + { + position104, tokenIndex104 := position, tokenIndex + { + position106, tokenIndex106 := position, tokenIndex + if buffer[position] != rune('-') { + goto l107 + } + position++ + goto l106 + l107: + position, tokenIndex = position106, tokenIndex106 + if buffer[position] != rune('+') { + goto l105 + } + position++ + } + l106: + if !_rules[ruledigit]() { + goto l105 + } + if !_rules[ruledigit]() { + goto l105 + } + if buffer[position] != rune(':') { + goto l105 + } + position++ + if !_rules[ruledigit]() { + goto l105 + } + if !_rules[ruledigit]() { + goto l105 + } + goto l104 + l105: + position, tokenIndex = position104, tokenIndex104 + if buffer[position] != rune('Z') { + goto l93 + } + position++ + } + l104: + add(rulePegText, position103) + } + add(ruletime, position94) + } + return true + l93: + position, tokenIndex = position93, tokenIndex93 + return false + }, + /* 7 date <- <(('d' / 'D') ('a' / 'A') ('t' / 'T') ('e' / 'E') ' ' <(year '-' month '-' day)>)> */ + func() bool { + position108, tokenIndex108 := position, tokenIndex + { + position109 := position + { + position110, tokenIndex110 := position, tokenIndex + if buffer[position] != rune('d') { + goto l111 + } + position++ + goto l110 + l111: + position, tokenIndex = position110, tokenIndex110 + if buffer[position] != rune('D') { + goto l108 + } + position++ + } + l110: + { + position112, tokenIndex112 := position, tokenIndex + if buffer[position] != rune('a') { + goto l113 + } + position++ + goto l112 + l113: + position, tokenIndex = position112, tokenIndex112 + if buffer[position] != rune('A') { + goto l108 + } + position++ + } + l112: + { + position114, tokenIndex114 := position, tokenIndex + if buffer[position] != rune('t') { + goto l115 + } + position++ + goto l114 + l115: + position, tokenIndex = position114, tokenIndex114 + if buffer[position] != rune('T') { + goto l108 + } + position++ + } + l114: + { + position116, tokenIndex116 := position, tokenIndex + if buffer[position] != rune('e') { + goto l117 + } + position++ + goto l116 + l117: + position, tokenIndex = position116, tokenIndex116 + if buffer[position] != rune('E') { + goto l108 + } + position++ + } + l116: + if buffer[position] != rune(' ') { + goto l108 + } + position++ + { + position118 := position + if !_rules[ruleyear]() { + goto l108 + } + if buffer[position] != rune('-') { + goto l108 + } + position++ + if !_rules[rulemonth]() { + goto l108 + } + if buffer[position] != rune('-') { + goto l108 + } + position++ + if !_rules[ruleday]() { + goto l108 + } + add(rulePegText, position118) + } + add(ruledate, position109) + } + return true + l108: + position, tokenIndex = position108, tokenIndex108 + return false + }, + /* 8 year <- <(('1' / '2') digit digit digit)> */ + func() bool { + position119, tokenIndex119 := position, tokenIndex + { + position120 := position + { + position121, tokenIndex121 := position, tokenIndex + if buffer[position] != rune('1') { + goto l122 + } + position++ + goto l121 + l122: + position, tokenIndex = position121, tokenIndex121 + if buffer[position] != rune('2') { + goto l119 + } + position++ + } + l121: + if !_rules[ruledigit]() { + goto l119 + } + if !_rules[ruledigit]() { + goto l119 + } + if !_rules[ruledigit]() { + goto l119 + } + add(ruleyear, position120) + } + return true + l119: + position, tokenIndex = position119, tokenIndex119 + return false + }, + /* 9 month <- <(('0' / '1') digit)> */ + func() bool { + position123, tokenIndex123 := position, tokenIndex + { + position124 := position + { + position125, tokenIndex125 := position, tokenIndex + if buffer[position] != rune('0') { + goto l126 + } + position++ + goto l125 + l126: + position, tokenIndex = position125, tokenIndex125 + if buffer[position] != rune('1') { + goto l123 + } + position++ + } + l125: + if !_rules[ruledigit]() { + goto l123 + } + add(rulemonth, position124) + } + return true + l123: + position, tokenIndex = position123, tokenIndex123 + return false + }, + /* 10 day <- <(((&('3') '3') | (&('2') '2') | (&('1') '1') | (&('0') '0')) digit)> */ + func() bool { + position127, tokenIndex127 := position, tokenIndex + { + position128 := position + { + switch buffer[position] { + case '3': + if buffer[position] != rune('3') { + goto l127 + } + position++ + break + case '2': + if buffer[position] != rune('2') { + goto l127 + } + position++ + break + case '1': + if buffer[position] != rune('1') { + goto l127 + } + position++ + break + default: + if buffer[position] != rune('0') { + goto l127 + } + position++ + break + } + } + + if !_rules[ruledigit]() { + goto l127 + } + add(ruleday, position128) + } + return true + l127: + position, tokenIndex = position127, tokenIndex127 + return false + }, + /* 11 and <- <(('a' / 'A') ('n' / 'N') ('d' / 'D'))> */ + nil, + /* 12 equal <- <'='> */ + nil, + /* 13 contains <- <(('c' / 'C') ('o' / 'O') ('n' / 'N') ('t' / 'T') ('a' / 'A') ('i' / 'I') ('n' / 'N') ('s' / 'S'))> */ + nil, + /* 14 le <- <('<' '=')> */ + nil, + /* 15 ge <- <('>' '=')> */ + nil, + /* 16 l <- <'<'> */ + nil, + /* 17 g <- <'>'> */ + nil, + nil, + } + p.rules = _rules +} diff --git a/pubsub/query/query_test.go b/pubsub/query/query_test.go new file mode 100644 index 00000000..431ae1fe --- /dev/null +++ b/pubsub/query/query_test.go @@ -0,0 +1,64 @@ +package query_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tmlibs/pubsub/query" +) + +func TestMatches(t *testing.T) { + const shortForm = "2006-Jan-02" + txDate, err := time.Parse(shortForm, "2017-Jan-01") + require.NoError(t, err) + txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") + require.NoError(t, err) + + testCases := []struct { + s string + tags map[string]interface{} + err bool + matches bool + }{ + {"tm.events.type='NewBlock'", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, + + {"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, + {"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, + {"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, + {"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, + {"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, + {"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, + {"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, + {"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, + + {"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, + {"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, + {"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, + + {"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, + {"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, + + {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, + {"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, + } + + for _, tc := range testCases { + query, err := query.New(tc.s) + if !tc.err { + require.Nil(t, err) + } + + if tc.matches { + assert.True(t, query.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags) + } else { + assert.False(t, query.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags) + } + } +} + +func TestMustParse(t *testing.T) { + assert.Panics(t, func() { query.MustParse("=") }) + assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) +} diff --git a/version/version.go b/version/version.go index 42af8ff7..ee59a7ca 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,3 @@ package version -const Version = "0.2.2" +const Version = "0.3.0"