-
Notifications
You must be signed in to change notification settings - Fork 600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ApiServerSource: add optional selection criteria to ceOverride.extensions based on fields in the cloudevent #8001
Comments
/assign |
This feature might be similar to or combined with #7704 if we allow you defining lightweight transformation on a JSON represented event using JSON path, I think, it would solve both:
|
Thanks a bunch for accepting this request. I just wanna check if my understanding is correct. These cloudevent extensions added by jsonTransform would be able to be used with the ApiServerSource filter (#7791), right? |
Yes, the idea is that when we get an event we would apply the |
@muskan2622 since you have expressed interest in contributing, feel free to ask any questions, there are a few parts that needs to be changed as it's a relatively large feature but I'm happy help/chat/etc. Here is a very high level idea for getting the field out of the event and then setting it as extension: import (
"encoding/json"
"fmt"
"regexp"
cloudevents "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/cloudevents/sdk-go/v2/types"
"k8s.io/client-go/util/jsonpath"
)
type JSONTransform struct {
From string // JSON path
To string // CE extension
Type string // CE Extension type
}
func ExampleJsonTransform() {
exampleTransform := JSONTransform{
From: ".data.status.failed",
To: "failed",
Type: "integer", // One of https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type-system
}
c := cetest.FullEvent()
err := c.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"status": map[string]interface{}{
"failed": 1,
},
})
if err != nil {
panic(err)
}
// Core implementation
// 1. Parse From
// 2. Find From in JSON-serialized event
// 3. Set extension based on Type
expr, err := relaxedJSONPathExpression(exampleTransform.From)
if err != nil {
panic(err)
}
jp := jsonpath.New("Parser")
if err := jp.Parse(expr); err != nil {
panic(err)
}
b, err := c.MarshalJSON()
if err != nil {
panic(err)
}
var data map[string]interface{}
err = json.Unmarshal(b, &data)
if err != nil {
panic(err)
}
results, err := jp.FindResults(data)
if err != nil {
panic(err)
}
if len(results) != 1 && len(results[0]) != 1 {
panic("expected 1 result")
}
// TODO: properly handle results and JSONTransform.Type with "results[0][0].Elem().CanInt()", "results[0][0].Elem().CanFloat()" etc
c.SetExtension(exampleTransform.To, int64(results[0][0].Elem().Float()))
b, err = c.MarshalJSON()
if err != nil {
panic(err)
}
f, err := types.ToInteger(c.Extensions()[exampleTransform.To])
if err != nil {
panic(err)
}
fmt.Println(f)
// Output: 1
}
var jsonRegexp = regexp.MustCompile(`^\{\.?([^{}]+)}$|^\.?([^{}]+)$`)
// relaxedJSONPathExpression attempts to be flexible with JSONPath expressions, it accepts:
// - metadata.name (no leading '.' or curly braces '{...}'
// - {metadata.name} (no leading '.')
// - .metadata.name (no curly braces '{...}')
// - {.metadata.name} (complete expression)
//
// And transforms them all into a valid jsonpath expression:
//
// {.metadata.name}
//
// Copied from https://github.com/kubernetes/kubectl/blob/a70106d6a8b4fc24633f7020b9fdc416648e7f22/pkg/cmd/get/customcolumn.go#L38-L67
// Copyright 2014 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
func relaxedJSONPathExpression(pathExpression string) (string, error) {
if len(pathExpression) == 0 {
return pathExpression, nil
}
submatches := jsonRegexp.FindStringSubmatch(pathExpression)
if submatches == nil {
return "", fmt.Errorf("unexpected path string, expected a 'name1.name2' or '.name1.name2' or '{name1.name2}' or '{.name1.name2}'")
}
if len(submatches) != 3 {
return "", fmt.Errorf("unexpected submatch list: %v", submatches)
}
var fieldSpec string
if len(submatches[1]) != 0 {
fieldSpec = submatches[1]
} else {
fieldSpec = submatches[2]
}
return fmt.Sprintf("{.%s}", fieldSpec), nil
} |
yes sure @pierDipi . |
@muskan2622 do you have any updates on this issue? |
Problem
Our use case involves performing different actions based on the state of different resources on our k8s cluster.
Example: send a notification for
Job
s that have failed (ie: wherestatus.failed > 0
).We're using ApiServerSource to receive cloudevents about different types of resources on the cluster. We would like the ability to subscribe to cloudevents for resources that are in a specific state. I think this could be accomplished by adding a field to ApiServerSource that contains criteria for when a specific
spec.ceOverride.extensions
should be applied. This would allow us, when combined withspec.mode: Resource
, to use either new trigger filters or maybe even filters on ApiServerSource itself to have our sinks only receive cloudevents where an action is required.Persona:
Which persona is this feature for?
Exit Criteria
events meeting the specified criteria have the approriate custom attribute applied
The text was updated successfully, but these errors were encountered: