Skip to content

Commit

Permalink
Per-step errors (#33)
Browse files Browse the repository at this point in the history
* Spec conformance: app ID prefixes, data wrapping (backcompat)

* Add per-step error handling

- Add support for capturing return data alongside error
- Handle NoRetry and RetryAt within step errors
  • Loading branch information
tonyhb authored Jan 25, 2024
1 parent 4b598cb commit 8f90d72
Show file tree
Hide file tree
Showing 653 changed files with 270,288 additions and 681 deletions.
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package inngestgo

import "github.com/inngest/inngestgo/errors"

type StepError = errors.StepError

// Re-export internal errors for users
var NoRetryError = errors.NoRetryError
var RetryAtError = errors.RetryAtError
28 changes: 28 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,38 @@
package errors

import (
"encoding/json"
"errors"
"time"
)

// StepError is an error returned when a step permanently fails
type StepError struct {
Name string `json:"name"`
Message string `json:"message"`
// Data is the data from state.UserError, used to store
// the resulting value when step errors occur with an additional
// response type.
Data json.RawMessage `json:"data,omitempty"`
}

func (e StepError) Error() string {
return e.Message
}

func (e StepError) Is(err error) bool {
switch err.(type) {
case *StepError, StepError:
return true
default:
return false
}
}

func IsStepError(err error) bool {
return errors.Is(err, StepError{})
}

// NoRetryError wraps an error, preventing retries in the SDK. This permanently
// fails a step and function.
func NoRetryError(err error) error {
Expand Down
22 changes: 18 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ replace github.com/tencentcloud/tencentcloud-sdk-go v3.0.82+incompatible => gith
require (
github.com/gosimple/slug v1.12.0
github.com/gowebpki/jcs v1.0.0
github.com/inngest/inngest v0.23.2-0.20240108184544-5424b5936b1a
github.com/inngest/inngest v0.24.4-0.20240125144251-d920c39952bc
github.com/stretchr/testify v1.8.4
github.com/xhit/go-str2duration/v2 v2.1.0
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
Expand All @@ -20,30 +20,44 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustinkirkland/golang-petname v0.0.0-20191129215211-8e5a1ed0cff0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/google/cel-go v0.18.2 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/gosimple/unidecode v1.0.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-hclog v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/hcl/v2 v2.10.0 // indirect
github.com/hashicorp/terraform v0.15.3 // indirect
github.com/inngest/expr v0.0.0-20240106161226-5bf5dca4ded5 // indirect
github.com/inngest/expr v0.0.0-20240109020554-19e459e1e8d4 // indirect
github.com/karlseguin/ccache/v2 v2.0.8 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/ohler55/ojg v1.21.0 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/plar/go-adaptive-radix-tree v1.0.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rs/zerolog v1.26.1 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.10.1 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/zclconf/go-cty v1.8.3 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
58 changes: 51 additions & 7 deletions go.sum

Large diffs are not rendered by default.

50 changes: 38 additions & 12 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"time"

"github.com/inngest/inngest/pkg/enums"
"github.com/inngest/inngest/pkg/execution/state"
"github.com/inngest/inngest/pkg/inngest"
"github.com/inngest/inngest/pkg/publicerr"
Expand Down Expand Up @@ -268,7 +269,7 @@ func (h *handler) register(w http.ResponseWriter, r *http.Request) error {

f := sdk.SDKFunction{
Name: fn.Name(),
Slug: fn.Slug(),
Slug: h.appName + "-" + fn.Slug(),
Idempotency: c.Idempotency,
Priority: fn.Config().Priority,
Triggers: []inngest.Trigger{{}},
Expand Down Expand Up @@ -480,14 +481,40 @@ func (h *handler) invoke(w http.ResponseWriter, r *http.Request) error {
resp, ops, err := invoke(r.Context(), fn, request)
streamCancel()

// NOTE: When triggering step errors, we should have an OpcodeStepError
// within ops alongside an error. We can safely ignore that error, as its
// onyl used for checking wither the step used a NoRetryError or RetryAtError
//
// For that reason, we check those values first.
noRetry := errors.IsNoRetryError(err)
retryAt := errors.GetRetryAtTime(err)
if len(ops) == 1 && ops[0].Op == enums.OpcodeStepError {
// Now we've handled error types we can ignore step
// errors safely.
err = nil
}

// Now that we've handled the OpcodeStepError, if we *still* ahve
// a StepError kind returned from a function we must have an unhandled
// step error. This is a NonRetryableError, as the most likely code is:
//
// _, err := step.Run(ctx, func() (any, error) { return fmt.Errorf("") })
// if err != nil {
// return err
// }
if errors.IsStepError(err) {
err = fmt.Errorf("Unhandled step error: %s", err)
noRetry = true
}

if h.UseStreaming {
if err != nil {
// TODO: Add retry-at.
return json.NewEncoder(w).Encode(StreamResponse{
StatusCode: 500,
Body: fmt.Sprintf("error calling function: %s", err.Error()),
NoRetry: errors.IsNoRetryError(err),
RetryAt: errors.GetRetryAtTime(err),
NoRetry: noRetry,
RetryAt: retryAt,
})
}
if len(ops) > 0 {
Expand All @@ -502,17 +529,16 @@ func (h *handler) invoke(w http.ResponseWriter, r *http.Request) error {
})
}

// These may be added even for 2xx codes with step errors.
if noRetry {
w.Header().Add(HeaderKeyNoRetry, "true")
}
if retryAt != nil {
w.Header().Add(HeaderKeyRetryAfter, retryAt.Format(time.RFC3339))
}

if err != nil {
l.Error("error calling function", "error", err)

if errors.IsNoRetryError(err) {
w.Header().Add(HeaderKeyNoRetry, "true")
}

if at := errors.GetRetryAtTime(err); at != nil {
w.Header().Add(HeaderKeyRetryAfter, at.Format(time.RFC3339))
}

return publicerr.Error{
Message: fmt.Sprintf("error calling function: %s", err.Error()),
Status: 500,
Expand Down
13 changes: 8 additions & 5 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,14 @@ func TestSteps(t *testing.T) {
EventTrigger("test/event.a", nil),
func(ctx context.Context, input Input[EventA]) (any, error) {
atomic.AddInt32(&fnCt, 1)
stepA := step.Run(ctx, "First step", func(ctx context.Context) (map[string]any, error) {
stepA, _ := step.Run(ctx, "First step", func(ctx context.Context) (map[string]any, error) {
atomic.AddInt32(&aCt, 1)
return map[string]any{
"test": true,
"foo": input.Event.Data.Foo,
}, nil
})
stepB := step.Run(ctx, "Second step", func(ctx context.Context) (map[string]any, error) {
stepB, _ := step.Run(ctx, "Second step", func(ctx context.Context) (map[string]any, error) {
atomic.AddInt32(&bCt, 1)
return map[string]any{
"b": "lol",
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestSteps(t *testing.T) {
require.Len(t, opcodes, 1)
opcode = opcodes[0]

require.Equal(t, enums.OpcodeStep, opcode.Op, "tools.Run didn't return the correct opcode")
require.Equal(t, enums.OpcodeStepRun, opcode.Op, "tools.Run didn't return the correct opcode")
require.Equal(t, "First step", opcode.Name, "tools.Run didn't return the correct opcode")

require.EqualValues(t, 1, fnCt)
Expand All @@ -405,7 +405,9 @@ func TestSteps(t *testing.T) {
stepA = map[string]any{}
err = json.Unmarshal(opcode.Data, &stepA)
require.NoError(t, err)
require.EqualValues(t, map[string]any{"test": true, "foo": "potato"}, stepA)
require.EqualValues(t, map[string]any{
"test": true, "foo": "potato",
}, stepA)
})

t.Run("It invokes the second step if the first step's data is passed in", func(t *testing.T) {
Expand All @@ -424,7 +426,7 @@ func TestSteps(t *testing.T) {
require.Len(t, opcodes, 1)
opcode = opcodes[0]

require.Equal(t, enums.OpcodeStep, opcode.Op, "tools.Run didn't return the correct opcode")
require.Equal(t, enums.OpcodeStepRun, opcode.Op, "tools.Run didn't return the correct opcode")
require.Equal(t, "Second step", opcode.Name, "tools.Run didn't return the correct opcode")

require.EqualValues(t, 2, fnCt)
Expand All @@ -436,6 +438,7 @@ func TestSteps(t *testing.T) {
err = json.Unmarshal(opcode.Data, &stepB)
require.NoError(t, err)
require.EqualValues(t, map[string]any{
// data is wrapped in an object to conform to the spec.
"b": "lol",
"a": stepA,
}, stepB)
Expand Down
1 change: 1 addition & 0 deletions step/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func Invoke[T any](ctx context.Context, id string, opts InvokeOpts) (T, error) {
mgr.SetErr(fmt.Errorf("error parsing invoke value for '%s'; unknown shape", opts.FunctionId))
panic(ControlHijack{})
}

mgr.AppendOp(state.GeneratorOpcode{
ID: op.MustHash(),
Op: op.Op,
Expand Down
66 changes: 60 additions & 6 deletions step/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/inngest/inngest/pkg/enums"
"github.com/inngest/inngest/pkg/execution/state"
"github.com/inngest/inngestgo/errors"
)

type RunOpts struct {
Expand All @@ -17,6 +18,11 @@ type RunOpts struct {
Name string
}

type response struct {
Data json.RawMessage `json:"data"`
Error json.RawMessage `json:"error"`
}

// StepRun runs any code reliably, with retries, returning the resulting data. If this
// fails the function stops.
//
Expand All @@ -25,21 +31,51 @@ func Run[T any](
ctx context.Context,
id string,
f func(ctx context.Context) (T, error),
) T {
) (T, error) {
mgr := preflight(ctx)
op := mgr.NewOp(enums.OpcodeStep, id, nil)

if val, ok := mgr.Step(op); ok {
// This step has already ran as we have state for it.
// Unmarshal the JSON into type T
// Create a new empty type T in v
ft := reflect.TypeOf(f)
v := reflect.New(ft.Out(0)).Interface()

// This step has already ran as we have state for it. Unmarshal the JSON into type T
unwrapped := response{}
if err := json.Unmarshal(val, &unwrapped); err == nil {
// Check for step errors first.
if len(unwrapped.Error) > 0 {
// TODO: Unmrashal the error into a new StepError struct
err := errors.StepError{}
if err := json.Unmarshal(unwrapped.Error, &err); err != nil {
mgr.SetErr(fmt.Errorf("error unmarshalling error for step '%s': %w", id, err))
panic(ControlHijack{})
}

// See if we have any data for multiple returns in the error type.
if err := json.Unmarshal(err.Data, v); err != nil {
mgr.SetErr(fmt.Errorf("error unmarshalling state for step '%s': %w", id, err))
panic(ControlHijack{})
}

val, _ := reflect.ValueOf(v).Elem().Interface().(T)
return val, err
}
// If there's an error, assume that val is already of type T without wrapping
// in the 'data' object as per the SDK spec. Here, if this succeeds we can be
// sure that we're wrapping the data in a compliant way.
if len(unwrapped.Data) > 0 {
val = unwrapped.Data
}
}

// Grab the data as the step type.
if err := json.Unmarshal(val, v); err != nil {
mgr.SetErr(fmt.Errorf("error unmarshalling state for step '%s': %w", id, err))
panic(ControlHijack{})
}
val, _ := reflect.ValueOf(v).Elem().Interface().(T)
return val
return val, nil
}

// We're calling a function, so always cancel the context afterwards so that no
Expand All @@ -48,6 +84,25 @@ func Run[T any](

result, err := f(ctx)
if err != nil {
// If tihs is a StepFailure already, fail fast.
if errors.IsStepError(err) {
mgr.SetErr(fmt.Errorf("Unhandled step error: %s", err))
panic(ControlHijack{})
}

result, _ := json.Marshal(result)

// Implement per-step errors.
mgr.AppendOp(state.GeneratorOpcode{
ID: op.MustHash(),
Op: enums.OpcodeStepError,
Name: id,
Error: &state.UserError{
Name: "Step failed",
Message: err.Error(),
Data: result,
},
})
mgr.SetErr(err)
panic(ControlHijack{})
}
Expand All @@ -56,10 +111,9 @@ func Run[T any](
if err != nil {
mgr.SetErr(fmt.Errorf("unable to marshal run respone for '%s': %w", id, err))
}

mgr.AppendOp(state.GeneratorOpcode{
ID: op.MustHash(),
Op: enums.OpcodeStep,
Op: enums.OpcodeStepRun,
Name: id,
Data: byt,
})
Expand Down
Loading

0 comments on commit 8f90d72

Please sign in to comment.