Skip to content

Commit

Permalink
refactor!: store pointer to resulting value (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Nov 3, 2023
1 parent 5394df3 commit e4f03f3
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 111 deletions.
3 changes: 2 additions & 1 deletion examples/future/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/reugn/async"
"github.com/reugn/async/internal/util"
)

func main() {
Expand All @@ -20,7 +21,7 @@ func asyncAction() async.Future[string] {
promise := async.NewPromise[string]()
go func() {
time.Sleep(time.Second)
promise.Success("OK")
promise.Success(util.Ptr("OK"))
}()

return promise.Future()
Expand Down
84 changes: 45 additions & 39 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,45 @@ import (
)

// Future represents a value which may or may not currently be available,
// but will be available at some point, or an error if that value could not be made available.
// but will be available at some point, or an error if that value could
// not be made available.
type Future[T any] interface {

// Map creates a new Future by applying a function to the successful result of this Future.
Map(func(T) (T, error)) Future[T]
// Map creates a new Future by applying a function to the successful
// result of this Future.
Map(func(*T) (*T, error)) Future[T]

// FlatMap creates a new Future by applying a function to the successful result of
// this Future.
FlatMap(func(T) (Future[T], error)) Future[T]
// FlatMap creates a new Future by applying a function to the successful
// result of this Future.
FlatMap(func(*T) (Future[T], error)) Future[T]

// Join blocks until the Future is completed and returns either a result or an error.
Join() (T, error)
// Join blocks until the Future is completed and returns either a result
// or an error.
Join() (*T, error)

// Get blocks for at most the given time duration for this Future to complete
// and returns either a result or an error.
Get(time.Duration) (T, error)
// Get blocks for at most the given time duration for this Future to
// complete and returns either a result or an error.
Get(time.Duration) (*T, error)

// Recover handles any error that this Future might contain using a resolver function.
Recover(func() (T, error)) Future[T]
// Recover handles any error that this Future might contain using a
// resolver function.
Recover(func() (*T, error)) Future[T]

// RecoverWith handles any error that this Future might contain using another Future.
// RecoverWith handles any error that this Future might contain using
// another Future.
RecoverWith(Future[T]) Future[T]

// complete completes the Future with either a value or an error.
// Is used by Promise internally.
complete(T, error)
complete(*T, error)
}

// FutureImpl implements the Future interface.
type FutureImpl[T any] struct {
acceptOnce sync.Once
completeOnce sync.Once
done chan any
value T
value *T
err error
}

Expand All @@ -62,7 +67,8 @@ func (fut *FutureImpl[T]) accept() {
})
}

// acceptTimeout blocks once, until the Future result is available or until the timeout expires.
// acceptTimeout blocks once, until the Future result is available or until
// the timeout expires.
func (fut *FutureImpl[T]) acceptTimeout(timeout time.Duration) {
fut.acceptOnce.Do(func() {
timer := time.NewTimer(timeout)
Expand All @@ -82,40 +88,37 @@ func (fut *FutureImpl[T]) setResult(result any) {
case error:
fut.err = value
default:
fut.value = value.(T)
fut.value = value.(*T)
}
}

// Map creates a new Future by applying a function to the successful result of this Future
// and returns the result of the function as a new Future.
func (fut *FutureImpl[T]) Map(f func(T) (T, error)) Future[T] {
// Map creates a new Future by applying a function to the successful result
// of this Future and returns the result of the function as a new Future.
func (fut *FutureImpl[T]) Map(f func(*T) (*T, error)) Future[T] {
next := NewFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
var nilT T
next.complete(nilT, fut.err)
next.complete(nil, fut.err)
} else {
next.complete(f(fut.value))
}
}()
return next
}

// FlatMap creates a new Future by applying a function to the successful result of
// this Future and returns the result of the function as a new Future.
func (fut *FutureImpl[T]) FlatMap(f func(T) (Future[T], error)) Future[T] {
// FlatMap creates a new Future by applying a function to the successful result
// of this Future and returns the result of the function as a new Future.
func (fut *FutureImpl[T]) FlatMap(f func(*T) (Future[T], error)) Future[T] {
next := NewFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
var nilT T
next.complete(nilT, fut.err)
next.complete(nil, fut.err)
} else {
tfut, terr := f(fut.value)
if terr != nil {
var nilT T
next.complete(nilT, terr)
next.complete(nil, terr)
} else {
next.complete(tfut.Join())
}
Expand All @@ -124,22 +127,24 @@ func (fut *FutureImpl[T]) FlatMap(f func(T) (Future[T], error)) Future[T] {
return next
}

// Join blocks until the Future is completed and returns either a result or an error.
func (fut *FutureImpl[T]) Join() (T, error) {
// Join blocks until the Future is completed and returns either
// a result or an error.
func (fut *FutureImpl[T]) Join() (*T, error) {
fut.accept()
return fut.value, fut.err
}

// Get blocks for at most the given time duration for this Future to complete
// and returns either a result or an error.
func (fut *FutureImpl[T]) Get(timeout time.Duration) (T, error) {
// Get blocks for at most the given time duration for this Future to
// complete and returns either a result or an error.
func (fut *FutureImpl[T]) Get(timeout time.Duration) (*T, error) {
fut.acceptTimeout(timeout)
return fut.value, fut.err
}

// Recover handles any error that this Future might contain using a given resolver function.
// Recover handles any error that this Future might contain using
// a given resolver function.
// Returns the result as a new Future.
func (fut *FutureImpl[T]) Recover(f func() (T, error)) Future[T] {
func (fut *FutureImpl[T]) Recover(f func() (*T, error)) Future[T] {
next := NewFuture[T]()
go func() {
fut.accept()
Expand All @@ -152,7 +157,8 @@ func (fut *FutureImpl[T]) Recover(f func() (T, error)) Future[T] {
return next
}

// RecoverWith handles any error that this Future might contain using another Future.
// RecoverWith handles any error that this Future might contain using
// another Future.
// Returns the result as a new Future.
func (fut *FutureImpl[T]) RecoverWith(rf Future[T]) Future[T] {
next := NewFuture[T]()
Expand All @@ -168,7 +174,7 @@ func (fut *FutureImpl[T]) RecoverWith(rf Future[T]) Future[T] {
}

// complete completes the Future with either a value or an error.
func (fut *FutureImpl[T]) complete(value T, err error) {
func (fut *FutureImpl[T]) complete(value *T, err error) {
fut.completeOnce.Do(func() {
go func() {
if err != nil {
Expand Down
113 changes: 74 additions & 39 deletions future_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,55 @@ import (
"time"

"github.com/reugn/async/internal/assert"
"github.com/reugn/async/internal/util"
)

func TestFuture(t *testing.T) {
p := NewPromise[bool]()
go func() {
time.Sleep(time.Millisecond * 100)
p.Success(true)
time.Sleep(100 * time.Millisecond)
p.Success(util.Ptr(true))
}()
res, err := p.Future().Join()

assert.Equal(t, res, true)
assert.Equal(t, err, nil)
assert.Equal(t, true, *res)
assert.Equal(t, nil, err)
}

func TestFutureUtils(t *testing.T) {
p1 := NewPromise[int]()
p2 := NewPromise[int]()
p3 := NewPromise[int]()

res1 := util.Ptr(1)
res2 := util.Ptr(2)
err3 := errors.New("error")

go func() {
time.Sleep(time.Millisecond * 100)
p1.Success(1)
time.Sleep(time.Millisecond * 200)
p2.Success(2)
time.Sleep(time.Millisecond * 300)
p3.Success(3)
time.Sleep(100 * time.Millisecond)
p1.Success(res1)
time.Sleep(200 * time.Millisecond)
p2.Success(res2)
time.Sleep(300 * time.Millisecond)
p3.Failure(err3)
}()
arr := []Future[int]{p1.Future(), p2.Future(), p3.Future()}
res := []any{1, 2, 3}
res := []any{res1, res2, err3}
futRes, _ := FutureSeq(arr).Join()

assert.Equal(t, res, futRes)
assert.Equal(t, res, *futRes)
}

func TestFutureFirstCompleted(t *testing.T) {
p := NewPromise[bool]()
go func() {
time.Sleep(time.Millisecond * 1000)
p.Success(true)
time.Sleep(100 * time.Millisecond)
p.Success(util.Ptr(true))
}()
timeout := FutureTimer[bool](time.Millisecond * 100)
timeout := FutureTimer[bool](10 * time.Millisecond)
futRes, futErr := FutureFirstCompletedOf(p.Future(), timeout).Join()

assert.Equal(t, false, futRes)
assert.Equal(t, nil, futRes)
if futErr == nil {
t.Fatalf("futErr is nil")
}
Expand All @@ -60,50 +66,79 @@ func TestFutureFirstCompleted(t *testing.T) {
func TestFutureTransform(t *testing.T) {
p1 := NewPromise[int]()
go func() {
time.Sleep(time.Millisecond * 100)
p1.Success(1)
time.Sleep(100 * time.Millisecond)
p1.Success(util.Ptr(1))
}()
future := p1.Future().Map(func(v int) (int, error) {
return v + 1, nil
}).FlatMap(func(v int) (Future[int], error) {
nv := v + 1
future := p1.Future().Map(func(v *int) (*int, error) {
inc := *v + 1
return &inc, nil
}).FlatMap(func(v *int) (Future[int], error) {
inc := *v + 1
p2 := NewPromise[int]()
p2.Success(nv)
p2.Success(&inc)
return p2.Future(), nil
}).Recover(func() (int, error) {
return 5, nil
}).Recover(func() (*int, error) {
return util.Ptr(5), nil
})

res, _ := future.Get(time.Second * 5)
assert.Equal(t, 3, res)
assert.Equal(t, 3, *res)

res, _ = future.Join()
assert.Equal(t, 3, res)
assert.Equal(t, 3, *res)
}

func TestFutureRecover(t *testing.T) {
p1 := NewPromise[int]()
p2 := NewPromise[int]()
go func() {
time.Sleep(10 * time.Millisecond)
p1.Success(util.Ptr(1))
time.Sleep(10 * time.Millisecond)
p2.Failure(errors.New("recover Future failure"))
}()
future := p1.Future().Map(func(v *int) (*int, error) {
return nil, errors.New("map error")
}).FlatMap(func(v *int) (Future[int], error) {
p2 := NewPromise[int]()
p2.Failure(errors.New("flatMap Future failure"))
return p2.Future(), nil
}).FlatMap(func(v *int) (Future[int], error) {
return nil, errors.New("flatMap error")
}).Recover(func() (*int, error) {
return nil, errors.New("recover error")
}).RecoverWith(p2.Future()).Recover(func() (*int, error) {
return util.Ptr(2), nil
})

res, err := future.Join()
assert.Equal(t, 2, *res)
assert.Equal(t, nil, err)
}

func TestFutureFailure(t *testing.T) {
p1 := NewPromise[int]()
p2 := NewPromise[int]()
go func() {
time.Sleep(time.Millisecond * 100)
time.Sleep(10 * time.Millisecond)
p1.Failure(errors.New("Future error"))
time.Sleep(time.Millisecond * 200)
p2.Success(2)
time.Sleep(20 * time.Millisecond)
p2.Success(util.Ptr(2))
}()
res, _ := p1.Future().RecoverWith(p2.Future()).Join()

assert.Equal(t, 2, res)
assert.Equal(t, 2, *res)
}

func TestFutureTimeout(t *testing.T) {
p := NewPromise[bool]()
go func() {
time.Sleep(time.Millisecond * 200)
p.Success(true)
time.Sleep(100 * time.Millisecond)
p.Success(util.Ptr(true))
}()
future := p.Future()

_, err := future.Get(time.Millisecond * 50)
_, err := future.Get(10 * time.Millisecond)
assert.ErrorContains(t, err, "timeout")

_, err = future.Join()
Expand All @@ -121,21 +156,21 @@ func TestFutureGoroutineLeak(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 100)
promise.Success("OK")
time.Sleep(100 * time.Millisecond)
promise.Success(util.Ptr("OK"))
}()
wg.Add(1)
go func() {
defer wg.Done()
fut := promise.Future()
_, _ = fut.Get(time.Millisecond * 10)
time.Sleep(time.Millisecond * 100)
_, _ = fut.Get(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
_, _ = fut.Join()
}()
}

wg.Wait()
time.Sleep(time.Millisecond * 10)
time.Sleep(10 * time.Millisecond)
numGoroutine := runtime.NumGoroutine()
fmt.Println(numGoroutine)
if numGoroutine > numFuture {
Expand Down
Loading

0 comments on commit e4f03f3

Please sign in to comment.