diff --git a/doc/10-Channels.md b/doc/10-Channels.md index 506ca637..e65ad569 100644 --- a/doc/10-Channels.md +++ b/doc/10-Channels.md @@ -256,16 +256,6 @@ or if the channel is missing required configuration values. "tags": { "host": "dummy-816", "service": "random fortune" - }, - "extra_tags": { - "hostgroup/app-mobile": "", - "hostgroup/department-dev": "", - "hostgroup/env-prod": "", - "hostgroup/location-rome": "", - "servicegroup/app-storage": "", - "servicegroup/department-ps": "", - "servicegroup/env-prod": "", - "servicegroup/location-rome": "" } }, "incident": { diff --git a/doc/20-HTTP-API.md b/doc/20-HTTP-API.md index f482fa64..8bacf76c 100644 --- a/doc/20-HTTP-API.md +++ b/doc/20-HTTP-API.md @@ -19,14 +19,13 @@ The authentication is performed via HTTP Basic Authentication using the source's When upgrading a setup from an earlier version, these usernames are still valid, but can be changed in Icinga Notifications Web. Events sent to Icinga Notifications are expected to match rules that describe further event escalations. -These rules can be created in the web interface. -Next to an array of `rule_ids`, a `rules_version` must be provided to ensure that the source has no outdated state. +These rules can be configured in Icinga Notifications Web and should be designed to match the `relations` of the +submitted events. When submitting an event without the expected relations to evaluate the rules, Icinga Notifications +will reject the request with a `422 Unprocessable Entity` status code and a message describing the missing relations +when the `X-Icinga-Enable-Attributes-Negotiation` header is set to `true`. Otherwise, the request will be accepted +nonetheless, but some or even all the configured rules might not match, and thus the event will not be escalated. -When the submitted `rules_version` is either outdated or empty, the `/process-event` endpoint returns an HTTP 412 response. -The response's body is a JSON-encoded version of the -[`RulesInfo`](https://github.com/Icinga/icinga-go-library/blob/main/notifications/source/client.go), -containing the latest `rules_version` together with all rules for this source. -After reevaluating these rules, one can resubmit the event with the updated `rules_version`. +An example request to submit an event looks like this: ``` curl -v -u 'source-2:insecureinsecure' -d '@-' 'http://localhost:5680/process-event' < 255 { - return fmt.Errorf( - "invalid event: extra tag %q is too long, at most 255 chars allowed, %d given", tag, len(tag), - ) - } - } - if e.SourceId == 0 { return fmt.Errorf("invalid event: source ID must not be empty") } @@ -120,6 +123,90 @@ func (e *Event) Sync(ctx context.Context, tx *sqlx.Tx, db *database.DB, objectId return err } +// ExtractMissingRelations determines which of the given filter columns are missing in the Relations field of this event. +// +// It evaluates the filter columns as JSONPath expressions against the Relations field and returns +// a list of filter columns that do not have any matching nodes in the Relations field and are not +// part of the CompleteRelations field. For filter columns that do have matching nodes, it caches +// the evaluated nodes for potential later use during rules evaluation. +func (e *Event) ExtractMissingRelations(filterColumns ...string) []string { + if e.evaluatedRelations == nil { + e.evaluatedRelations = make(map[string]jsonpath.NodeList) + } + + jpp := pool.GetJSONPathParser() + defer pool.PutJSONPathParser(jpp) + + var missing []string + for _, filterColumn := range filterColumns { + if _, cached := e.evaluatedRelations[filterColumn]; cached { + continue + } + // This should never panic, as the filter columns have already been validated when loading the rules. + path := jpp.MustParse(utils.PrefixWithJSONPathRootSelector(filterColumn)) + if nodes := path.Select(e.Relations); len(nodes) == 0 { + isComplete := slices.ContainsFunc(e.CompleteRelations, func(relation string) bool { + return strings.HasPrefix(filterColumn, relation) + }) + if !isComplete { + missing = append(missing, filterColumn) + } + } else { + // Cache the evaluated nodes for this filter column for potentially later use during rules evaluation. + e.evaluatedRelations[filterColumn] = nodes + } + } + return missing +} + +func (e *Event) EvalEqual(key string, value string) (bool, error) { + return slices.ContainsFunc(e.retrieveValuesFor(key), func(v any) bool { return fmt.Sprint(v) == value }), nil +} + +func (e *Event) EvalLess(key string, value string) (bool, error) { + return slices.ContainsFunc(e.retrieveValuesFor(key), func(v any) bool { return fmt.Sprint(v) < value }), nil +} + +func (e *Event) EvalLike(key string, value string) (bool, error) { + var builder strings.Builder + builder.WriteRune('^') + for segment := range strings.SplitSeq(value, "*") { + if segment == "" { + builder.WriteString(".*") + } + builder.WriteString(regexp.QuoteMeta(segment)) + } + builder.WriteRune('$') + + rgx := regexp.MustCompile(builder.String()) + + return slices.ContainsFunc(e.retrieveValuesFor(key), func(v any) bool { return rgx.MatchString(fmt.Sprint(v)) }), nil +} + +func (e *Event) EvalLessOrEqual(key string, value string) (bool, error) { + return slices.ContainsFunc(e.retrieveValuesFor(key), func(v any) bool { return fmt.Sprint(v) <= value }), nil +} + +func (e *Event) EvalExists(key string) bool { return len(e.retrieveValuesFor(key)) > 0 } + +// retrieveValuesFor retrieves the values for the given key from the Relations field of this event. +func (e *Event) retrieveValuesFor(key string) jsonpath.NodeList { + if e.evaluatedRelations == nil { + e.evaluatedRelations = make(map[string]jsonpath.NodeList) + } + + nodes, cached := e.evaluatedRelations[key] + if !cached { + jpp := pool.GetJSONPathParser() + defer pool.PutJSONPathParser(jpp) + + path := jpp.MustParse(utils.PrefixWithJSONPathRootSelector(key)) + nodes = path.Select(e.Relations) + e.evaluatedRelations[key] = nodes + } + return nodes +} + // EventRow represents a single event database row and isn't an in-memory representation of an event. type EventRow struct { ID int64 `db:"id"` diff --git a/internal/event/event_test.go b/internal/event/event_test.go new file mode 100644 index 00000000..2aa53792 --- /dev/null +++ b/internal/event/event_test.go @@ -0,0 +1,101 @@ +package event + +import ( + "encoding/json" + "testing" + + baseEv "github.com/icinga/icinga-go-library/notifications/event" + "github.com/icinga/icinga-notifications/internal/filter" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFilter(t *testing.T) { + t.Parallel() + + ev := &Event{ + Event: baseEv.Event{ + Relations: map[string]any{ + "host": map[string]any{ + "name": "test-host", + "vars": map[string]any{ + "dict": map[string]any{ + "key": "value", + "key_int": 42, + "key_array": []any{ + "value1", + "value2", + map[string]any{ + "dict_in_array": "dict_in_array1", + }, + }, + }, + "array": []any{ + "value1-from-array", + map[string]any{ + "dict_in_array": "dict_in_array2", + }, + map[string]any{ + "dict_in_array": "dict_in_array3", + }, + }, + }, + }, + }, + }, + } + + filterData := []struct { + Expr []byte + Expected bool + }{ + // ... expected positive matches + {Expr: makeJsonFilterExpr(t, "host.name", "=", "test-host"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.name", "~", "test*"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key", "=", "value"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key", "!~", "something*"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_int", ">=", 42), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_int", "=", 42.0), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_array[0]", "!=", "value2"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_array[1]", "!=", "value1"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_array[2].dict_in_array", "=", "dict_in_array1"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.array[0]", "~", "value1-from*"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.array[*].dict_in_array", "~", "dict_in_array*"), Expected: true}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_array[2].dict_in_array", "=", "dict_in_array1"), Expected: true}, + + // ... expected negative matches + {Expr: makeJsonFilterExpr(t, "host.name", "=", "wrong-host"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.name", "!=", "test-host"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key", "=", "wrong-value"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.missing", "=", "foo"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.missing", "=", "foo"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_array[3]", "=", "value"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_array[2].missing", "=", "foo"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.array[1].dict_in_array", "=", "wrong-value"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_int", "=", 043), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_int", "<=", 30), Expected: false}, + {Expr: makeJsonFilterExpr(t, "service.name", "~", "whatever"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.dict.key_array[2]", "=", "dict_in_array1"), Expected: false}, + {Expr: makeJsonFilterExpr(t, "host.vars.array", "~", "value1-from*"), Expected: false}, + } + + for _, data := range filterData { + f, err := filter.UnmarshalJSON(data.Expr) + if assert.NoErrorf(t, err, "parsing %q should not fail", data.Expr) { + matched, err := f.Eval(ev) + assert.NoErrorf(t, err, "evaluating %q should not fail", data.Expr) + assert.Equal(t, data.Expected, matched, "unexpected result for %q", data.Expr) + } + } +} + +// makeJsonFilterExpr is a helper function to create a JSON filter expression for testing purposes. +func makeJsonFilterExpr(t *testing.T, jsonPath, operator, value any) []byte { + data, err := json.Marshal(map[string]any{ + "column": jsonPath, + "op": operator, + "value": value, + }) + require.NoError(t, err) + return data +} diff --git a/internal/filter/types.go b/internal/filter/types.go index 4c1104d0..406259c0 100644 --- a/internal/filter/types.go +++ b/internal/filter/types.go @@ -1,7 +1,12 @@ package filter import ( + "encoding/json" "fmt" + + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/pool" + "github.com/icinga/icinga-notifications/internal/utils" ) // LogicalOp is a type used for grouping the logical operators of a filter string. @@ -209,3 +214,97 @@ var ( _ Filter = (*Exists)(nil) _ Filter = (*Condition)(nil) ) + +// UnmarshalJSON is a helper function to unmarshal a JSON representation of a filter into a [Filter] interface. +// +// It recursively parses the JSON data to deduce the filter type ([Chain] or [Condition]) based on the `op` field +// and constructs the appropriate filter structure. +// +// Returns nil if JSON null value is provided, and an error if the JSON is invalid or if required fields are missing. +func UnmarshalJSON(data []byte) (Filter, error) { + if string(data) == "null" { + return nil, nil + } + + message := map[string]json.RawMessage{} + if err := types.UnmarshalJSON(data, &message); err != nil { + return nil, err + } + + opBytes, opExists := message["op"] + if !opExists { + return nil, fmt.Errorf("missing required field: op") + } + + var op string + if err := types.UnmarshalJSON(opBytes, &op); err != nil { + return nil, err + } + + if isLogicalOp(op) { + rulesBytes, exists := message["rules"] + if !exists { + return nil, fmt.Errorf("missing required field: rules") + } + + var rules []json.RawMessage + if err := json.Unmarshal(rulesBytes, &rules); err != nil { + return nil, err + } + chain := &Chain{op: LogicalOp(op)} + for _, rawRule := range rules { + filter, err := UnmarshalJSON(rawRule) + if err != nil { + return nil, err + } + chain.rules = append(chain.rules, filter) + } + return chain, nil + } + + if isCompOperator(op) { + condition := &Condition{op: CompOperator(op)} + if column, exists := message["column"]; !exists { + return nil, fmt.Errorf("missing required filter condition field: column") + } else if err := types.UnmarshalJSON(column, &condition.column); err != nil { + return nil, err + } + jpp := pool.GetJSONPathParser() + defer pool.PutJSONPathParser(jpp) + + // Validate the column as a JSONPath expression to ensure it's valid for evaluation later on. + if _, err := jpp.Parse(utils.PrefixWithJSONPathRootSelector(condition.Column())); err != nil { + return nil, err + } + + var value any // The JSON value might represent any type, so we can't directly unmarshal it into a string. + if rawValue, exists := message["value"]; !exists { + return nil, fmt.Errorf("missing required filter condition field: value") + } else if err := types.UnmarshalJSON(rawValue, &value); err != nil { + return nil, err + } + condition.value = fmt.Sprint(value) // Convert the value to a string representation for consistent evaluation. + return condition, nil + } + return nil, fmt.Errorf("unknown filter operator: %s", op) +} + +// isLogicalOp checks if the provided operator is a valid logical operator. +func isLogicalOp(op string) bool { + switch LogicalOp(op) { + case All, Any, None: + return true + default: + return false + } +} + +// isCompOperator checks if the provided operator is a valid comparison operator. +func isCompOperator(op string) bool { + switch CompOperator(op) { + case Equal, UnEqual, Like, UnLike, GreaterThan, LessThan, GreaterThanEqual, LessThanEqual: + return true + default: + return false + } +} diff --git a/internal/filter/types_test.go b/internal/filter/types_test.go new file mode 100644 index 00000000..c05307dd --- /dev/null +++ b/internal/filter/types_test.go @@ -0,0 +1,159 @@ +package filter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUnmarshalJSON(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + json string + expectErr bool + errString string + verify func(t *testing.T, f Filter) + }{ + { + name: "Null JSON", + json: "null", + expectErr: false, + verify: func(t *testing.T, f Filter) { assert.Nil(t, f) }, + }, + { + name: "Filter Condition", + json: `{"op":"=","column":"foo","value":"bar"}`, + expectErr: false, + verify: func(t *testing.T, f Filter) { + c, ok := f.(*Condition) + require.Truef(t, ok, "expected Condition, got %T", f) + assert.Equal(t, Equal, c.op) + assert.Equal(t, "foo", c.column) + assert.Equal(t, "bar", c.value) + }, + }, + { + name: "Simple Filter Chain", + json: `{"op":"&","rules":[{"op":"=","column":"a","value":"1"},{"op":"!=","column":"b","value":2}]}`, + expectErr: false, + verify: func(t *testing.T, f Filter) { + ch, ok := f.(*Chain) + require.Truef(t, ok, "expected Chain, got %T", f) + assert.Equal(t, All, ch.op) + assert.Len(t, ch.rules, 2) + + c1, ok := ch.rules[0].(*Condition) + require.Truef(t, ok, "expected Condition, got %T", ch.rules[0]) + assert.Equal(t, Equal, c1.op) + assert.Equal(t, "a", c1.column) + assert.Equal(t, "1", c1.value) + + c2, ok := ch.rules[1].(*Condition) + require.Truef(t, ok, "expected Condition, got %T", ch.rules[1]) + assert.Equal(t, UnEqual, c2.op) + assert.Equal(t, "b", c2.column) + assert.Equal(t, "2", c2.value) + }, + }, + { + name: "Nested Filter Chain", + json: `{"op":"&","rules":[{"op":"~","column":"x","value":"v*"},{"op":"|","rules":[{"op":"!~","column":"y","value":"some*"},{"op":"!=","column":"z","value":"2"}]}]}`, + expectErr: false, + verify: func(t *testing.T, f Filter) { + ch, ok := f.(*Chain) + require.Truef(t, ok, "expected Chain, got %T", f) + assert.Equal(t, All, ch.op) + assert.Len(t, ch.rules, 2) + + condition, ok := ch.rules[0].(*Condition) + require.Truef(t, ok, "expected Condition, got %T", ch.rules[0]) + assert.Equal(t, Like, condition.op) + assert.Equal(t, "x", condition.column) + assert.Equal(t, "v*", condition.value) + + ch2, ok := ch.rules[1].(*Chain) + require.Truef(t, ok, "expected Condition, got %T", ch.rules[1]) + assert.Equal(t, Any, ch2.op) + assert.Len(t, ch2.rules, 2) + + c1, ok := ch2.rules[0].(*Condition) + require.Truef(t, ok, "expected Condition, got %T", ch2.rules[0]) + assert.Equal(t, UnLike, c1.op) + assert.Equal(t, "y", c1.column) + assert.Equal(t, "some*", c1.value) + + c2, ok := ch2.rules[1].(*Condition) + require.Truef(t, ok, "expected Condition, got %T", ch2.rules[1]) + assert.Equal(t, UnEqual, c2.op) + assert.Equal(t, "z", c2.column) + assert.Equal(t, "2", c2.value) + }, + }, + { + name: "Missing Operator", + json: `{"column":"a","value":"1"}`, + expectErr: true, + errString: "missing required field: op", + }, + { + name: "Unknown Operator", + json: `{"op":"?","column":"a","value":"1"}`, + expectErr: true, + errString: "unknown filter operator", + }, + { + name: "Missing Chain Rules", + json: `{"op":"&"}`, + expectErr: true, + errString: "missing required field: rules", + }, + { + name: "Missing Filter Column", + json: `{"op":"=","value":"1"}`, + expectErr: true, + errString: "missing required filter condition field: column", + }, + { + name: "Rules Not an Array", + json: `{"op":"!","rules":"notarray"}`, + expectErr: true, + // error message from json.Unmarshal when trying to unmarshal a string into a []json.RawMessage + errString: "cannot unmarshal string into Go value of type", + }, + { + name: "Invalid JSON", + json: `not a json`, + expectErr: true, + errString: "invalid character", + }, + { + name: "Invalid JSONPath Column", + json: `{"op":"=","column":"invalid[","value":"1"}`, + expectErr: true, + errString: "unexpected eof", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f, err := UnmarshalJSON([]byte(tc.json)) + if tc.expectErr { + assert.Errorf(t, err, "expected error but got nil; filter=%#v", f) + assert.Nil(t, f) + if tc.errString != "" { + assert.ErrorContainsf(t, err, tc.errString, "error mismatch: want contains %q, got %q", tc.errString, err.Error()) + } + return + } + require.NoErrorf(t, err, "unexpected error: %v", err) + if tc.verify != nil { + tc.verify(t, f) + } + }) + } +} diff --git a/internal/incident/incident.go b/internal/incident/incident.go index d674ba9b..1fc05809 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -16,7 +16,6 @@ import ( "github.com/icinga/icinga-notifications/internal/rule" "github.com/jmoiron/sqlx" "go.uber.org/zap" - "strconv" "sync" "time" ) @@ -429,33 +428,33 @@ func (i *Incident) applyMatchingRules(ctx context.Context, tx *sqlx.Tx, ev *even i.Rules = make(map[int64]struct{}) } - for _, ruleId := range ev.RuleIds { - ruleIdInt, err := strconv.ParseInt(ruleId, 10, 64) - if err != nil { - i.logger.Errorw("Event rule is not an integer", zap.String("rule_id", ruleId), zap.Error(err)) - return fmt.Errorf("cannot convert rule id %q to an int: %w", ruleId, err) - } + src, ok := i.runtimeConfig.Sources[ev.SourceId] + if !ok { + i.logger.Warnw("Received event from unknown source, might got deleted", zap.Int64("source_id", ev.SourceId)) + return nil + } - r, ok := i.runtimeConfig.Rules[ruleIdInt] - if !ok { - i.logger.Errorw("Event refers to non-existing event rule, might got deleted", zap.Int64("rule_id", ruleIdInt)) - return fmt.Errorf("cannot apply unknown rule %d", ruleIdInt) - } + for _, id := range src.RuleIDs() { + if _, ok := i.Rules[id]; !ok { + r, ok := i.runtimeConfig.Rules[id] + if !ok { + i.logger.Errorw("BUG: source references unknown event rule", zap.Object("source", src)) + continue + } - if r.SourceID != ev.SourceId { - i.logger.Errorw("Rule source ID does not match event source ID", - zap.Int64("event_source_id", ev.SourceId), - zap.Int64("rule_source_id", r.SourceID), - zap.Int64("rule_id", ruleIdInt)) - return fmt.Errorf("rule %d source ID %d does not match event source %d", ruleIdInt, r.SourceID, ev.SourceId) - } + matched, err := r.Eval(ev) + if err != nil { + i.logger.Errorw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) + } + + if err != nil || !matched { + continue + } - if _, ok := i.Rules[r.ID]; !ok { i.Rules[r.ID] = struct{}{} i.logger.Infow("Rule matches", zap.Object("rule", r)) - err := i.AddRuleMatched(ctx, tx, r) - if err != nil { + if err := i.AddRuleMatched(ctx, tx, r); err != nil { i.logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) return err } diff --git a/internal/incident/incidents_test.go b/internal/incident/incidents_test.go index 627141f9..cc83b1f1 100644 --- a/internal/incident/incidents_test.go +++ b/internal/incident/incidents_test.go @@ -156,10 +156,6 @@ func makeIncident(ctx context.Context, db *database.DB, t *testing.T, sourceID i "host": testutils.MakeRandomString(t), "service": testutils.MakeRandomString(t), }, - ExtraTags: map[string]string{ - "hostgroup/database-server": "", - "servicegroup/webserver": "", - }, }, } diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 794d8466..7996b463 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -8,8 +8,8 @@ import ( "fmt" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/notifications" baseEv "github.com/icinga/icinga-go-library/notifications/event" - baseSource "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-notifications/internal" "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" @@ -17,7 +17,6 @@ import ( "github.com/icinga/icinga-notifications/internal/incident" "go.uber.org/zap" "net/http" - "strconv" "time" ) @@ -138,19 +137,6 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { return } - // If the client uses an outdated rules version, reject the request but also send the current rules version - // and rules for this source back to the client, so it can retry the request with the updated rules. - if latestRuleVersion := l.runtimeConfig.GetRulesVersionFor(src.ID); ev.RulesVersion != latestRuleVersion { - w.WriteHeader(http.StatusPreconditionFailed) - l.writeSourceRulesInfo(w, src) - - l.logger.Debugw("Abort event processing due to outdated rules version", - zap.String("current_version", latestRuleVersion), - zap.String("provided_version", ev.RulesVersion), - zap.String("source", src.Name)) - return - } - ev.CompleteURL(daemon.Config().Icingaweb2URL) ev.Time = time.Now() ev.SourceId = src.ID @@ -167,6 +153,14 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { return } + if SupportsAttrsNegotiation(r) { + missingRelations := ev.ExtractMissingRelations(l.runtimeConfig.GetRulesFilterColumnsForSource(src)...) + if len(missingRelations) > 0 { + l.sendMissingAttrsError(w, &ev, missingRelations) + return + } + } + l.logger.Infow("Processing event", zap.String("event", ev.String())) err := incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) if errors.Is(err, event.ErrSuperfluousStateChange) || errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent) { @@ -347,35 +341,32 @@ func (l *Listener) DumpRules(w http.ResponseWriter, r *http.Request) { _ = enc.Encode(l.runtimeConfig.Rules) } -// writeSourceRulesInfo writes the rules information for a specific source to the response writer. -// -// Internally, it converts the data to [baseSource.RulesInfo], being serialized JSON-encoded. -func (l *Listener) writeSourceRulesInfo(w http.ResponseWriter, source *config.Source) { - rulesInfo := baseSource.RulesInfo{ - Version: config.NoRulesVersion, - } - - func() { // Use a function to ensure that the RLock and RUnlock are called before writing the response. - l.runtimeConfig.RLock() - defer l.runtimeConfig.RUnlock() +// sendMissingAttrsError sends a response with status code 422 Unprocessable Entity to the client. +func (l *Listener) sendMissingAttrsError(w http.ResponseWriter, ev *event.Event, missingAttrs []string) { + l.logger.Infow( + "Event is missing attributes required for rule evaluation", + zap.Stringer("event", ev), + zap.Strings("missing_attributes", missingAttrs), + ) - if sourceInfo, ok := l.runtimeConfig.RulesBySource[source.ID]; ok { - rulesInfo.Version = sourceInfo.Version.String() - rulesInfo.Rules = make(map[string]string) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusUnprocessableEntity) - for _, rID := range sourceInfo.RuleIDs { - id := strconv.FormatInt(rID, 10) - filterExpr := "" - if l.runtimeConfig.Rules[rID].ObjectFilterExpr.Valid { - filterExpr = l.runtimeConfig.Rules[rID].ObjectFilterExpr.String - } + resp := map[string]any{ + "type": "attrs_negotiation", + "attributes": missingAttrs, + } - rulesInfo.Rules[id] = filterExpr - } - } - }() + if err := json.NewEncoder(w).Encode(resp); err != nil { + l.logger.Errorw("Failed to send missing attributes required for rule evaluation", zap.Error(err)) + return + } +} - enc := json.NewEncoder(w) - enc.SetIndent("", " ") - _ = enc.Encode(rulesInfo) +// SupportsAttrsNegotiation checks whether the source of the request supports attributes negotiation. +// +// Returns true if the request contains the header [notifications.XIcingaEnableAttributesNegotiation] +// set to "true", false otherwise. +func SupportsAttrsNegotiation(r *http.Request) bool { + return r.Header.Get(notifications.XIcingaEnableAttributesNegotiation) == "true" } diff --git a/internal/object/db_types.go b/internal/object/db_types.go index 70ad1ffb..fb048104 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -2,24 +2,13 @@ package object import "github.com/icinga/icinga-go-library/types" -// TagRow is a base type for IdTagRow and ExtraTagRow -type TagRow struct { +// IdTagRow represents a single database object id tag. +type IdTagRow struct { ObjectId types.Binary `db:"object_id"` Tag string `db:"tag"` Value string `db:"value"` } -// ExtraTagRow represents a single database object extra tag like `hostgroup/foo: null`. -type ExtraTagRow TagRow - -// TableName implements the contracts.TableNamer interface. -func (e *ExtraTagRow) TableName() string { - return "object_extra_tag" -} - -// IdTagRow represents a single database object id tag. -type IdTagRow TagRow - // TableName implements the contracts.TableNamer interface. func (e *IdTagRow) TableName() string { return "object_id_tag" diff --git a/internal/object/object.go b/internal/object/object.go index a4b30951..054559fb 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -22,8 +22,7 @@ type Object struct { URL types.String `db:"url"` MuteReason types.String `db:"mute_reason"` - Tags map[string]string `db:"-"` - ExtraTags map[string]string `db:"-"` + Tags map[string]string `db:"-"` db *database.DB } @@ -31,12 +30,11 @@ type Object struct { // New creates a new object from the given event. func New(db *database.DB, ev *event.Event) *Object { obj := &Object{ - SourceID: ev.SourceId, - Name: ev.Name, - db: db, - URL: types.MakeString(ev.URL, types.TransformEmptyStringToNull), - Tags: ev.Tags, - ExtraTags: ev.ExtraTags, + SourceID: ev.SourceId, + Name: ev.Name, + db: db, + URL: types.MakeString(ev.URL, types.TransformEmptyStringToNull), + Tags: ev.Tags, } if ev.Mute.Valid && ev.Mute.Bool { obj.MuteReason = types.String{NullString: sql.NullString{String: ev.MuteReason, Valid: true}} @@ -80,7 +78,6 @@ func FromEvent(ctx context.Context, db *database.DB, ev *event.Event) (*Object, } else { *newObject = *object - newObject.ExtraTags = ev.ExtraTags newObject.Name = ev.Name newObject.URL = types.MakeString(ev.URL, types.TransformEmptyStringToNull) if ev.Mute.Valid { @@ -106,25 +103,11 @@ func FromEvent(ctx context.Context, db *database.DB, ev *event.Event) (*Object, } stmt, _ = db.BuildUpsertStmt(&IdTagRow{}) - _, err = tx.NamedExecContext(ctx, stmt, mapToTagRows(newObject.ID, ev.Tags)) + _, err = tx.NamedExecContext(ctx, stmt, mapToIdTagRows(newObject.ID, ev.Tags)) if err != nil { return nil, fmt.Errorf("failed to upsert object id tags: %w", err) } - extraTag := &ExtraTagRow{ObjectId: newObject.ID} - _, err = tx.NamedExecContext(ctx, `DELETE FROM "object_extra_tag" WHERE "object_id" = :object_id`, extraTag) - if err != nil { - return nil, fmt.Errorf("failed to delete object extra tags: %w", err) - } - - if len(ev.ExtraTags) > 0 { - stmt, _ := db.BuildInsertStmt(extraTag) - _, err = tx.NamedExecContext(ctx, stmt, mapToTagRows(newObject.ID, ev.ExtraTags)) - if err != nil { - return nil, fmt.Errorf("failed to insert object extra tags: %w", err) - } - } - if err = tx.Commit(); err != nil { return nil, fmt.Errorf("cannot commit object database transaction: %w", err) } @@ -171,15 +154,6 @@ func (o *Object) String() string { _, _ = fmt.Fprintf(&b, " Source %d:\n", o.SourceID) _, _ = fmt.Fprintf(&b, " Name: %q\n", o.Name) _, _ = fmt.Fprintf(&b, " URL: %q\n", o.URL.String) - _, _ = fmt.Fprintf(&b, " Extra Tags:\n") - - for tag, value := range o.ExtraTags { - _, _ = fmt.Fprintf(&b, " %q", tag) - if value != "" { - _, _ = fmt.Fprintf(&b, " = %q", value) - } - _, _ = fmt.Fprintf(&b, "\n") - } return b.String() } @@ -218,11 +192,11 @@ func ID(source int64, tags map[string]string) types.Binary { return h.Sum(nil) } -// mapToTagRows transforms the object (extra) tags map to a slice of TagRow struct. -func mapToTagRows(objectId types.Binary, extraTags map[string]string) []*TagRow { - var tagRows []*TagRow - for key, val := range extraTags { - tagRows = append(tagRows, &TagRow{ +// mapToIdTagRows transforms the object tags map to a slice of TagRow struct. +func mapToIdTagRows(objectId types.Binary, tags map[string]string) []*IdTagRow { + var tagRows []*IdTagRow + for key, val := range tags { + tagRows = append(tagRows, &IdTagRow{ ObjectId: objectId, Tag: key, Value: val, diff --git a/internal/object/objects.go b/internal/object/objects.go index fbfa82c0..ceb494ed 100644 --- a/internal/object/objects.go +++ b/internal/object/objects.go @@ -61,7 +61,6 @@ func restoreObjectsFromQuery(ctx context.Context, db *database.DB, query string, err := utils.ExecAndApply[Object](ctx, db, query, args, func(o *Object) { o.db = db o.Tags = map[string]string{} - o.ExtraTags = map[string]string{} select { case objects <- o: @@ -100,14 +99,6 @@ func restoreObjectsFromQuery(ctx context.Context, db *database.DB, query string, return errors.Wrap(err, "cannot restore objects ID tags") } - // Restore object extra tags matching the given object ids - err = utils.ForEachRow[ExtraTagRow](ctx, db, "object_id", ids, func(et *ExtraTagRow) { - objectsMap[et.ObjectId.String()].ExtraTags[et.Tag] = et.Value - }) - if err != nil { - return errors.Wrap(err, "cannot restore objects extra tags") - } - cacheMu.Lock() defer cacheMu.Unlock() diff --git a/internal/object/objects_test.go b/internal/object/objects_test.go index 335c53a0..9a325639 100644 --- a/internal/object/objects_test.go +++ b/internal/object/objects_test.go @@ -69,16 +69,12 @@ func TestRestoreMutedObjects(t *testing.T) { assert.Equal(t, o.URL, objFromCache.URL, "objects url should match") assert.Equal(t, o.Tags, objFromCache.Tags, "objects tags should match") - assert.Equal(t, o.ExtraTags, objFromCache.ExtraTags, "objects tags should match") } // Purge all newly created objects and their relations not mes up local database tests. _, err = db.NamedExecContext(ctx, `DELETE FROM object_id_tag WHERE object_id = :id`, o) assert.NoError(t, err, "deleting object id tags should not fail") - _, err = db.NamedExecContext(ctx, `DELETE FROM object_extra_tag WHERE object_id = :id`, o) - assert.NoError(t, err, "deleting object extra tags should not fail") - _, err = db.NamedExecContext(ctx, `DELETE FROM object WHERE id = :id`, o) assert.NoError(t, err, "deleting object should not fail") } @@ -96,10 +92,6 @@ func makeObject(ctx context.Context, db *database.DB, t *testing.T, sourceID int "host": testutils.MakeRandomString(t), "service": testutils.MakeRandomString(t), }, - ExtraTags: map[string]string{ - "hostgroup/database-server": "", - "servicegroup/webserver": "", - }, }, } diff --git a/internal/pool/jsonpath.go b/internal/pool/jsonpath.go new file mode 100644 index 00000000..dad413be --- /dev/null +++ b/internal/pool/jsonpath.go @@ -0,0 +1,34 @@ +package pool + +import ( + "sync" + + "github.com/theory/jsonpath" +) + +// jsonPathParserPool is a pool of JSONPath parsers to avoid the overhead of creating new parsers for each evaluation. +// +// JSONPath parsers are used to evaluate JSONPath expressions of event rule filters against the events. +// Since the parser doesn't cache any state specific to any given expressions, it can be safely reused +// across multiple evaluations, and thus we can use a pool to reduce the overhead of creating new parsers +// for each evaluation. +var jsonPathParserPool = sync.Pool{ + New: func() any { + return jsonpath.NewParser() + }, +} + +// GetJSONPathParser retrieves a JSONPath parser from the pool. +// +// The caller is responsible for returning the parser to the pool after use by calling [PutJSONPathParser]. +func GetJSONPathParser() *jsonpath.Parser { + return jsonPathParserPool.Get().(*jsonpath.Parser) //nolint:forcetypeassert +} + +// PutJSONPathParser returns a JSONPath parser to the pool. +// +// The caller should call this function after using a parser retrieved from the pool to allow +// it to be reused for future evaluations. +func PutJSONPathParser(parser *jsonpath.Parser) { + jsonPathParserPool.Put(parser) +} diff --git a/internal/rule/rule.go b/internal/rule/rule.go index a4fa1a49..7889e4d6 100644 --- a/internal/rule/rule.go +++ b/internal/rule/rule.go @@ -3,6 +3,7 @@ package rule import ( "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config/baseconf" + "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/timeperiod" "go.uber.org/zap/zapcore" @@ -16,12 +17,33 @@ type Rule struct { TimePeriod *timeperiod.TimePeriod `db:"-"` TimePeriodID types.Int `db:"timeperiod_id"` SourceID int64 `db:"source_id"` + ObjectFilter filter.Filter `db:"-"` ObjectFilterExpr types.String `db:"object_filter"` Escalations map[int64]*Escalation `db:"-"` + + // FilterColumns is a set of all filter columns used in the rule's ObjectFilter. + // + // This is computed from the ObjectFilter once and can be used by sources to determine which + // columns they need to provide for the events to be able to evaluate the rule. + FilterColumns map[string]struct{} `db:"-"` } // IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. func (r *Rule) IncrementalInitAndValidate() error { + if r.ObjectFilterExpr.Valid { + f, err := filter.UnmarshalJSON([]byte(r.ObjectFilterExpr.String)) + if err != nil { + return err + } + + r.ObjectFilter = f + if f != nil { + r.FilterColumns = make(map[string]struct{}) + for _, condition := range f.ExtractConditions() { + r.FilterColumns[condition.Column()] = struct{}{} + } + } + } return nil } @@ -41,6 +63,16 @@ func (r *Rule) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } +// Eval evaluates the configured object filter for the provided filterable. +// +// Returns always true if the current rule doesn't have a configured object filter. +func (r *Rule) Eval(filterable filter.Filterable) (bool, error) { + if r.ObjectFilter == nil { + return true, nil + } + return r.ObjectFilter.Eval(filterable) +} + // ContactChannels stores a set of channel IDs for each set of individual contacts. type ContactChannels map[*recipient.Contact]map[int64]bool diff --git a/internal/utils/utils.go b/internal/utils/utils.go index e5938cb1..16a9ce3f 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -6,6 +6,7 @@ import ( "github.com/icinga/icinga-go-library/database" "github.com/jmoiron/sqlx" "github.com/pkg/errors" + "strings" ) // ExecAndApply applies the provided restoreFunc callback for each successfully retrieved row of the specified type. @@ -53,3 +54,14 @@ func ForEachRow[Row, Id any](ctx context.Context, db *database.DB, idColumn stri return ExecAndApply(ctx, db, stmt, args, restoreFunc) } + +// PrefixWithJSONPathRootSelector ensures that the provided JSONPath expression starts with the root selector "$.". +// +// If the provided path already starts with "$.", it is returned unchanged. +// Otherwise, the root selector is prefixed to the path. +func PrefixWithJSONPathRootSelector(path string) string { + if !strings.HasPrefix(path, "$.") { + return "$." + path + } + return path +} diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 8960e81e..f2ccde7f 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -255,15 +255,6 @@ CREATE TABLE object_id_tag ( CONSTRAINT fk_object_id_tag_object FOREIGN KEY (object_id) REFERENCES object(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE object_extra_tag ( - object_id binary(32) NOT NULL, - tag varchar(255) NOT NULL, - value text NOT NULL, - - CONSTRAINT pk_object_extra_tag PRIMARY KEY (object_id, tag), - CONSTRAINT fk_object_extra_tag_object FOREIGN KEY (object_id) REFERENCES object(id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - CREATE TABLE event ( id bigint NOT NULL AUTO_INCREMENT, time bigint NOT NULL, diff --git a/schema/mysql/upgrades/001.sql b/schema/mysql/upgrades/001.sql new file mode 100644 index 00000000..e7b1ddbd --- /dev/null +++ b/schema/mysql/upgrades/001.sql @@ -0,0 +1 @@ +DROP TABLE object_extra_tag; diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index 791b5702..fba441c2 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -288,15 +288,6 @@ CREATE TABLE object_id_tag ( CONSTRAINT fk_object_id_tag_object FOREIGN KEY (object_id) REFERENCES object(id) ); -CREATE TABLE object_extra_tag ( - object_id bytea NOT NULL, - tag varchar(255) NOT NULL, - value text NOT NULL, - - CONSTRAINT pk_object_extra_tag PRIMARY KEY (object_id, tag), - CONSTRAINT fk_object_extra_tag_object FOREIGN KEY (object_id) REFERENCES object(id) -); - CREATE TYPE event_type AS ENUM ( 'acknowledgement-cleared', 'acknowledgement-set', diff --git a/schema/pgsql/upgrades/001.sql b/schema/pgsql/upgrades/001.sql new file mode 100644 index 00000000..e7b1ddbd --- /dev/null +++ b/schema/pgsql/upgrades/001.sql @@ -0,0 +1 @@ +DROP TABLE object_extra_tag;