Skip to content

Commit

Permalink
Documentation and some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
saantiaguilera committed May 16, 2022
1 parent 3e11ef1 commit 8dcb186
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 9 deletions.
7 changes: 3 additions & 4 deletions concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ type (
reducer[O any] func(context.Context, O, O) (O, error)
)

// NewConcurrentStep creates a step that will run each of the units concurrently.
// The step will wait for all of the units to finish before returning.
// NewConcurrentStep creates a step that will run each of the inner steps concurrently.
// The step will wait for all of the steps to finish before returning.
//
// If one of them fails, the step will wait until everyone finishes and after that return the error.
// If more than one fails, then the error will be the one delivered by the last failure.
// If one of them fails, the step will wait until everyone finishes and after that return the first encountered error.
func NewConcurrentStep[I, O any](steps []Step[I, O], reduce reducer[O]) ConcurrentStep[I, O] {
return ConcurrentStep[I, O]{
steps: steps,
Expand Down
3 changes: 2 additions & 1 deletion conditional.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type (
// NewConditionalStep creates a conditional step that will run a statement. If it holds true, then the "true" step will be run.
// Else, the "false" step will be called.
// If a statement is nil, then it will be considered to hold false (thus, the "false" step is called)
// If one of the steps is nil and the statement is such, then nothing will happen.
// If one of the steps is nil and the statement is such, then an error will be triggered (you probably want an OptionalStep if
// one of the branches can be nil).
func NewConditionalStep[I, O any](statement conditionalStatement[I], t, f Step[I, O]) ConditionalStep[I, O] {
return ConditionalStep[I, O]{
statement: statement,
Expand Down
143 changes: 143 additions & 0 deletions optional_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package pipeline_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/saantiaguilera/go-pipeline"
)

func TestOptionalStep_GivenNilStatement_WhenRun_ThenDefaults(t *testing.T) {
run := false
step := pipeline.NewOptionalStep[any](pipeline.NewAnonymousStatement[any](nil), pipeline.NewUnitStep("", func(_ context.Context, _ any) (any, error) {
return nil, nil
}))

res, err := step.Run(context.Background(), 1)

assert.Nil(t, err)
assert.Equal(t, 1, res)
assert.False(t, run)
}

func TestOptionalStep_GivenStatementTrue_WhenRun_ThenEvaluatesStep(t *testing.T) {
run := false
ev := pipeline.NewUnitStep("", func(ctx context.Context, t any) (any, error) {
run = true
return 25, nil
})
step := pipeline.NewOptionalStep[any](pipeline.NewAnonymousStatement(func(ctx context.Context, in any) bool {
return true
}), ev)

v, err := step.Run(context.Background(), 1)

assert.Nil(t, err)
assert.Equal(t, 25, v)
assert.True(t, run)
}

func TestOptionalStep_GivenNilStatementWithDefault_WhenRun_ThenDefaults(t *testing.T) {
run := false
step := pipeline.NewOptionalStepWithDefault[any, any](pipeline.NewAnonymousStatement[any](nil), pipeline.NewUnitStep("", func(_ context.Context, _ any) (any, error) {
return nil, nil
}), func(_ context.Context, _ any) (any, error) {
return 25, nil
})

res, err := step.Run(context.Background(), 1)

assert.Nil(t, err)
assert.Equal(t, 25, res)
assert.False(t, run)
}

func TestOptionalStep_GivenStatementTrueWithDefault_WhenRun_ThenEvaluatesStep(t *testing.T) {
run := false
ev := pipeline.NewUnitStep("", func(ctx context.Context, t any) (any, error) {
run = true
return 25, nil
})
step := pipeline.NewOptionalStepWithDefault[any, any](pipeline.NewAnonymousStatement(func(ctx context.Context, in any) bool {
return true
}), ev, func(_ context.Context, _ any) (any, error) {
return 50, nil
})

v, err := step.Run(context.Background(), 1)

assert.Nil(t, err)
assert.Equal(t, 25, v)
assert.True(t, run)
}

func TestOptionalStep_GivenAGraphToDrawWithAnonymouseStatement_WhenDrawn_ThenConditionGetsEmptyName(t *testing.T) {
statement := pipeline.NewAnonymousStatement(func(ctx context.Context, in any) bool {
return true
})
mockGraph := new(mockGraph)
mockGraph.On(
"AddDecision",
"",
mock.MatchedBy(func(obj any) bool {
return true
}), mock.MatchedBy(func(obj any) bool {
return true
}),
)
trueStep := pipeline.NewUnitStep[any, any]("", nil)
step := pipeline.NewOptionalStepWithDefault[any, any](statement, trueStep, func(_ context.Context, _ any) (any, error) {
return nil, nil
})

step.Draw(mockGraph)

mockGraph.AssertExpectations(t)
}

func TestOptionalStep_GivenAGraphToDraw_WhenDrawn_ThenConditionGetsNameOfStatement(t *testing.T) {
mockGraph := new(mockGraph)
mockGraph.On(
"AddDecision",
"SomeFuncName",
mock.MatchedBy(func(obj any) bool {
return true
}), mock.MatchedBy(func(obj any) bool {
return true
}),
)
trueStep := pipeline.NewUnitStep[any, any]("", nil)
step := pipeline.NewOptionalStep[any](pipeline.NewStatement[any]("SomeFuncName", nil), trueStep)

step.Draw(mockGraph)

mockGraph.AssertExpectations(t)
}

func TestOptionalStep_GivenAGraphToDraw_WhenDrawn_ThenConditionIsAppliedWithBothBranches(t *testing.T) {
mockGraph := new(mockGraph)
mockGraph.On("AddActivity", "truestep").Once()
mockGraph.On(
"AddDecision",
mock.Anything,
mock.MatchedBy(func(obj any) bool {
return true
}), mock.MatchedBy(func(obj any) bool {
return true
}),
).Run(func(args mock.Arguments) {
args.Get(1).(pipeline.GraphDrawer)(mockGraph)
args.Get(2).(pipeline.GraphDrawer)(mockGraph)
})
trueStep := pipeline.NewUnitStep[any, any]("truestep", nil)
step := pipeline.NewOptionalStep[any](pipeline.NewAnonymousStatement(func(ctx context.Context, in any) bool {
return true
}), trueStep)

step.Draw(mockGraph)

mockGraph.AssertExpectations(t)
}
4 changes: 2 additions & 2 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ type (
}
)

// NewStatement creates statement represented by the given name, that will evaluate to the given evaluation
// NewStatement creates a statement represented by the given name, that will evaluate to the given evaluation
func NewStatement[T any](name string, eval func(context.Context, T) bool) Statement[T] {
return Statement[T]{
label: name,
fn: eval,
}
}

// NewAnonymousStatement createsn anonymous statement with no representation, that will evaluate to the given evaluation
// NewAnonymousStatement creates an anonymous statement with no representation, that will evaluate to the given evaluation
func NewAnonymousStatement[T any](eval func(context.Context, T) bool) Statement[T] {
return NewStatement("", eval)
}
Expand Down
12 changes: 10 additions & 2 deletions step.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
)

type (
// Unit of work to yield a result of type O (or an error in case of a failure) from a given input I
Unit[I, O any] func(context.Context, I) (O, error)

// Step interface for making a unit work.
// UnitStep for making a unit of work.
UnitStep[I, O any] struct {
id string
name string
fn Unit[I, O]
}

// Step is runnable element that yields a result or error from a given input
// A step can be drawn into a graph to represent it.
Step[I, O any] interface {
DrawableGraph

Expand All @@ -26,7 +28,7 @@ type (
}
)

// NewStep creates an immutable stateless unit of work based on a function that matches the Runnable contract.
// NewUnitStep creates an immutable stateless unit of work based on a Unit function
// You can use this implementation when your use-cases will be completely stateless (they don't rely on a service
// or anything that can be injected at the start and stay immutable for the lifetime of the process)
func NewUnitStep[I, O any](name string, run Unit[I, O]) UnitStep[I, O] {
Expand All @@ -37,21 +39,27 @@ func NewUnitStep[I, O any](name string, run Unit[I, O]) UnitStep[I, O] {
}
}

// ID is a unique identifier of this step. You can safely assume it wont be repeated and use it in any custom steps
// to enrich logic (eg. a circuit breaker / cache for IDs)
func (s UnitStep[I, O]) ID() string {
return s.id
}

// Name to identify a step. You shouldn't assume this name is unique per step but rather use it to understand what this is / does / represent
func (s UnitStep[I, O]) Name() string {
return s.name
}

// Run a step and yield a result of type O or an error if it failed.
// This operation is context-aware.
func (s UnitStep[I, O]) Run(ctx context.Context, in I) (O, error) {
if err := ctx.Err(); err != nil {
return *new(O), err
}
return s.fn(ctx, in)
}

// Draw this step in a graph
func (s UnitStep[I, O]) Draw(graph Graph) {
graph.AddActivity(s.Name())
}

0 comments on commit 8dcb186

Please sign in to comment.