Skip to content

Commit

Permalink
fix(stream): reverse and number stream methods
Browse files Browse the repository at this point in the history
  • Loading branch information
vkumbhar94 committed Mar 15, 2024
1 parent 0defea7 commit 52964cf
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 19 deletions.
67 changes: 67 additions & 0 deletions number_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package streams

import (
"sync/atomic"

"golang.org/x/exp/constraints"
)

type NumberStream[T constraints.Integer | constraints.Float] struct {
Stream[T]
}

func ToNumberStream[T constraints.Integer | constraints.Float](s *Stream[T]) *NumberStream[T] {
return &NumberStream[T]{
Stream: Stream[T]{
data: s.data,
run: s.run,
ran: atomic.Bool{},
},
}
}
func (s *NumberStream[T]) Sum() (result T) {
s.Run()
for t := range s.data {
result += t
}
return result
}
func (s *NumberStream[T]) Average() (result float64) {
s.Run()
var count int
for t := range s.data {
result += float64(t)
count++
}
if count == 0 {
return 0
}
return result / float64(count)
}
func (s *NumberStream[T]) Max() (result *T) {
s.Run()
for t := range s.data {
if result == nil || t > *result {
result = &t
}
}
return result
}

func (s *NumberStream[T]) Min() (result *T) {
s.Run()
for t := range s.data {
if result == nil || t < *result {
result = &t
}
}
return result
}

func (s *NumberStream[T]) Count() (result int64) {
s.Run()
for range s.data {
result++
}
return result
}
50 changes: 50 additions & 0 deletions number_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package streams

import "testing"

func TestNumberStream_Average(t *testing.T) {
s := New(1, 2, 3, 4, 5)
ns := ToNumberStream(s)
if ns.Average() != 3 {
t.Fail()
}
}
func TestNumberStream_AverageEmpty(t *testing.T) {
s := New([]int{}...)
ns := ToNumberStream(s)
if ns.Average() != 0 {
t.Fail()
}
}

func TestNumberStream_Count(t *testing.T) {
s := New(1, 2, 3, 4, 5)
ns := ToNumberStream(s)
if ns.Count() != 5 {
t.Fail()
}
}

func TestNumberStream_Max(t *testing.T) {
s := New(1, 2, 3, 4, 5)
ns := ToNumberStream(s)
if *ns.Max() != 5 {
t.Fail()
}
}

func TestNumberStream_Min(t *testing.T) {
s := New(1, 2, 3, 4, 5)
ns := ToNumberStream(s)
if *ns.Min() != 1 {
t.Fail()
}
}

func TestNumberStream_Sum(t *testing.T) {
s := New(1, 2, 3, 4, 5)
ns := ToNumberStream(s)
if ns.Sum() != 15 {
t.Fail()
}
}
33 changes: 14 additions & 19 deletions stream_methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package streams

import (
"sync/atomic"

"golang.org/x/exp/constraints"
)

func (s *Stream[T]) Filter(filter FilterFun[T]) *Stream[T] {
Expand Down Expand Up @@ -286,23 +284,20 @@ func (s *Stream[T]) Count() (cnt int64) {
return
}

type NumberStream[T constraints.Integer | constraints.Float] struct {
Stream[T]
}

func ToNumberStream[T constraints.Integer | constraints.Float](s *Stream[T]) *NumberStream[T] {
return &NumberStream[T]{
Stream: Stream[T]{
data: s.data,
run: s.run,
ran: atomic.Bool{},
func (s *Stream[T]) Reverse() *Stream[T] {
ch := make(chan T)
return &Stream[T]{
data: ch,
run: func() {
s.Run()
defer close(ch)
var data []T
for t := range s.data {
data = append(data, t)
}
for i := len(data) - 1; i >= 0; i-- {
ch <- data[i]
}
},
}
}
func (s *NumberStream[T]) Sum() (result T) {
s.Run()
for t := range s.data {
result += t
}
return result
}
5 changes: 5 additions & 0 deletions stream_methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,8 @@ func TestMSortedDesc(t *testing.T) {
{"abc", 2},
}))
}

func TestMethodReverse(t *testing.T) {
collected := New([]int{1, 2, 3, 4, 5}...).Reverse().Collect()
assert.Equal(t, []int{5, 4, 3, 2, 1}, collected)
}

0 comments on commit 52964cf

Please sign in to comment.