Skip to content

Commit

Permalink
Improve testing of concurrent systems
Browse files Browse the repository at this point in the history
  • Loading branch information
iwahbe committed Mar 28, 2024
1 parent 997d21b commit 6d6b8aa
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 51 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ examples/**/pulumi-resource-*

/.vscode

**/testdata/rapid/**
**/testdata/rapid/**

go.work
go.work.sum
15 changes: 0 additions & 15 deletions middleware/cancel/cancel_test.go

This file was deleted.

175 changes: 140 additions & 35 deletions tests/cancel_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, Pulumi Corporation.
// Copyright 2024, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,64 +15,169 @@
package tests

import (
"context"
"sync"
"testing"

"github.com/blang/semver"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

p "github.com/pulumi/pulumi-go-provider"
"github.com/pulumi/pulumi-go-provider/integration"
"github.com/pulumi/pulumi-go-provider/middleware/cancel"
"github.com/stretchr/testify/assert"
)

func TestGlobalCancel(t *testing.T) {
t.Parallel()
wg := new(sync.WaitGroup)
wg.Add(4)
s := integration.NewServer("cancel", semver.MustParse("1.2.3"),

const testSize = 5000
require.True(t, testSize%2 == 0)

noWaitCounter := new(sync.WaitGroup)
noWaitCounter.Add(testSize / 2)

provider := integration.NewServer("cancel", semver.MustParse("1.2.3"),
cancel.Wrap(p.Provider{
Create: func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) {
select {
case <-ctx.Done():
wg.Done()
return p.CreateResponse{
ID: "cancled",
Properties: req.Properties,
}, nil

// If a request is set to wait, then it pauses until it is canceled.
if req.Properties["wait"].BoolValue() {
<-ctx.Done()

return p.CreateResponse{}, ctx.Err()
}

noWaitCounter.Done()

return p.CreateResponse{}, nil
},
}))
go func() { _, err := s.Create(p.CreateRequest{}); assert.NoError(t, err) }()
go func() { _, err := s.Create(p.CreateRequest{}); assert.NoError(t, err) }()
go func() { _, err := s.Create(p.CreateRequest{}); assert.NoError(t, err) }()
assert.NoError(t, s.Cancel())
go func() { _, err := s.Create(p.CreateRequest{}); assert.NoError(t, err) }()
wg.Wait()

finished := new(sync.WaitGroup)
finished.Add(testSize + (testSize / 2))

go func() {
// Make sure that all requests that should not be canceled have already gone through.
noWaitCounter.Wait()

// Now cancel remaining requests.
err := provider.Cancel()
assert.NoError(t, err)

// As a sanity check, send another testSize/2 requests. Check that they are immediately
// canceled.
for i := 0; i < testSize/2; i++ {
go func() {
_, err := provider.Create(p.CreateRequest{
Properties: resource.PropertyMap{
"wait": resource.NewProperty(true),
},
})
assert.ErrorIs(t, err, context.Canceled)
finished.Done()
}()
}
}()

// create testSize requests.
//
// Half are configured to wait, while the other half are set to return immediately.
for i := 0; i < testSize; i++ {
shouldWait := i%2 == 0
go func() {
_, err := provider.Create(p.CreateRequest{
Properties: resource.PropertyMap{
"wait": resource.NewProperty(shouldWait),
},
})
if shouldWait {
assert.ErrorIs(t, err, context.Canceled)
} else {
assert.NoError(t, err)
}
finished.Done()
}()
}
finished.Wait()
}

func TestTimeoutApplication(t *testing.T) {
// TestCancelCreate checks that a Cancel that occurs during a concurrent operation
// (Create) cancels the context associated with the operation.
func TestCancelCreate(t *testing.T) {
t.Parallel()
wg := new(sync.WaitGroup)
wg.Add(1)

createCheck := make(chan bool)

provider := integration.NewServer("cancel", semver.MustParse("1.2.3"), cancel.Wrap(p.Provider{
Create: func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) {
// The context should not be canceled yes
assert.NoError(t, ctx.Err())
createCheck <- true
<-createCheck

return p.CreateResponse{}, ctx.Err()
},
}))

go func() {
<-createCheck
assert.NoError(t, provider.Cancel())
createCheck <- true
}()

_, err := provider.Create(p.CreateRequest{})
assert.ErrorIs(t, err, context.Canceled)
}

// TestCancelTimeout checks that timeouts are applied.
//
// Note: if the timeout is not applied, the test will hang instead of fail.
func TestCancelTimeout(t *testing.T) {
t.Parallel()

checkDeadline := func(ctx p.Context) error {
_, ok := ctx.Deadline()
assert.True(t, ok)
<-ctx.Done()
return ctx.Err()
}

s := integration.NewServer("cancel", semver.MustParse("1.2.3"),
cancel.Wrap(p.Provider{
Create: func(ctx p.Context, req p.CreateRequest) (p.CreateResponse, error) {
select {
case <-ctx.Done():
wg.Done()
return p.CreateResponse{
ID: "cancled",
Properties: req.Properties,
}, nil
}
Create: func(ctx p.Context, _ p.CreateRequest) (p.CreateResponse, error) {
return p.CreateResponse{}, checkDeadline(ctx)
},
Update: func(ctx p.Context, _ p.UpdateRequest) (p.UpdateResponse, error) {
return p.UpdateResponse{}, checkDeadline(ctx)
},
Delete: func(ctx p.Context, _ p.DeleteRequest) error {
return checkDeadline(ctx)
},
}))

go func() {
t.Run("create", func(t *testing.T) {
t.Parallel()
_, err := s.Create(p.CreateRequest{
Timeout: 0.5,
Timeout: 0.1,
})
assert.NoError(t, err)
}()
wg.Wait()
assert.ErrorIs(t, err, context.DeadlineExceeded)
})

t.Run("update", func(t *testing.T) {
t.Parallel()
_, err := s.Update(p.UpdateRequest{
Timeout: 0.1,
})
assert.ErrorIs(t, err, context.DeadlineExceeded)
})

t.Run("delete", func(t *testing.T) {
t.Parallel()
err := s.Delete(p.DeleteRequest{
Timeout: 0.1,
})
assert.ErrorIs(t, err, context.DeadlineExceeded)
})
}

0 comments on commit 6d6b8aa

Please sign in to comment.