Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ApiServerSource: add optional selection criteria to ceOverride.extensions based on fields in the cloudevent #8001

Open
skoved opened this issue Jun 13, 2024 · 7 comments
Assignees
Labels
kind/feature-request triage/accepted Issues which should be fixed (post-triage)

Comments

@skoved
Copy link

skoved commented Jun 13, 2024

Problem
Our use case involves performing different actions based on the state of different resources on our k8s cluster.

Example: send a notification for Jobs that have failed (ie: where status.failed > 0).

We're using ApiServerSource to receive cloudevents about different types of resources on the cluster. We would like the ability to subscribe to cloudevents for resources that are in a specific state. I think this could be accomplished by adding a field to ApiServerSource that contains criteria for when a specific spec.ceOverride.extensions should be applied. This would allow us, when combined with spec.mode: Resource, to use either new trigger filters or maybe even filters on ApiServerSource itself to have our sinks only receive cloudevents where an action is required.

Persona:
Which persona is this feature for?

  • Event Consumer
  • Event Producer

Exit Criteria
events meeting the specified criteria have the approriate custom attribute applied

@muskan2622
Copy link

/assign

@pierDipi
Copy link
Member

pierDipi commented Jun 17, 2024

This feature might be similar to or combined with #7704

if we allow you defining lightweight transformation on a JSON represented event using JSON path, I think, it would solve both:

ceOverrides:
  jsonTransform:
    - from: .data.status.failed
      to: jobfailedstatus
    - from: .data.status.conditions[...].type
      to: completed

@skoved
Copy link
Author

skoved commented Jun 17, 2024

Thanks a bunch for accepting this request. I just wanna check if my understanding is correct. These cloudevent extensions added by jsonTransform would be able to be used with the ApiServerSource filter (#7791), right?

@pierDipi
Copy link
Member

Yes, the idea is that when we get an event we would apply the ceOverrides (including jsonTransform) and then pass through the defined filters

@pierDipi
Copy link
Member

pierDipi commented Jun 17, 2024

@muskan2622 since you have expressed interest in contributing, feel free to ask any questions, there are a few parts that needs to be changed as it's a relatively large feature but I'm happy help/chat/etc.

Here is a very high level idea for getting the field out of the event and then setting it as extension:

import (
	"encoding/json"
	"fmt"
	"regexp"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	cetest "github.com/cloudevents/sdk-go/v2/test"
	"github.com/cloudevents/sdk-go/v2/types"
	"k8s.io/client-go/util/jsonpath"
)


type JSONTransform struct {
	From string // JSON path
	To   string // CE extension
	Type string // CE Extension type
}

func ExampleJsonTransform() {

	exampleTransform := JSONTransform{
		From: ".data.status.failed",
		To:   "failed",
		Type: "integer", // One of https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type-system
	}
	c := cetest.FullEvent()
	err := c.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
		"status": map[string]interface{}{
			"failed": 1,
		},
	})
	if err != nil {
		panic(err)
	}

	// Core implementation
	// 1. Parse From
	// 2. Find From in JSON-serialized event
	// 3. Set extension based on Type

	expr, err := relaxedJSONPathExpression(exampleTransform.From)
	if err != nil {
		panic(err)
	}

	jp := jsonpath.New("Parser")
	if err := jp.Parse(expr); err != nil {
		panic(err)
	}

	b, err := c.MarshalJSON()
	if err != nil {
		panic(err)
	}
	var data map[string]interface{}
	err = json.Unmarshal(b, &data)
	if err != nil {
		panic(err)
	}

	results, err := jp.FindResults(data)
	if err != nil {
		panic(err)
	}

	if len(results) != 1 && len(results[0]) != 1 {
		panic("expected 1 result")
	}

	// TODO: properly handle results and JSONTransform.Type with "results[0][0].Elem().CanInt()", "results[0][0].Elem().CanFloat()" etc

	c.SetExtension(exampleTransform.To, int64(results[0][0].Elem().Float()))

	b, err = c.MarshalJSON()
	if err != nil {
		panic(err)
	}

	f, err := types.ToInteger(c.Extensions()[exampleTransform.To])
	if err != nil {
		panic(err)
	}
	fmt.Println(f)
	// Output: 1
}

var jsonRegexp = regexp.MustCompile(`^\{\.?([^{}]+)}$|^\.?([^{}]+)$`)

// relaxedJSONPathExpression attempts to be flexible with JSONPath expressions, it accepts:
//   - metadata.name (no leading '.' or curly braces '{...}'
//   - {metadata.name} (no leading '.')
//   - .metadata.name (no curly braces '{...}')
//   - {.metadata.name} (complete expression)
//
// And transforms them all into a valid jsonpath expression:
//
//	{.metadata.name}
//
// Copied from https://github.com/kubernetes/kubectl/blob/a70106d6a8b4fc24633f7020b9fdc416648e7f22/pkg/cmd/get/customcolumn.go#L38-L67
// Copyright 2014 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//	http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
func relaxedJSONPathExpression(pathExpression string) (string, error) {
	if len(pathExpression) == 0 {
		return pathExpression, nil
	}
	submatches := jsonRegexp.FindStringSubmatch(pathExpression)
	if submatches == nil {
		return "", fmt.Errorf("unexpected path string, expected a 'name1.name2' or '.name1.name2' or '{name1.name2}' or '{.name1.name2}'")
	}
	if len(submatches) != 3 {
		return "", fmt.Errorf("unexpected submatch list: %v", submatches)
	}
	var fieldSpec string
	if len(submatches[1]) != 0 {
		fieldSpec = submatches[1]
	} else {
		fieldSpec = submatches[2]
	}
	return fmt.Sprintf("{.%s}", fieldSpec), nil
}

@muskan2622
Copy link

yes sure @pierDipi .

@pierDipi
Copy link
Member

pierDipi commented Aug 8, 2024

@muskan2622 do you have any updates on this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature-request triage/accepted Issues which should be fixed (post-triage)
Projects
Status: Icebox / Wishlist
Development

No branches or pull requests

3 participants