Skip to content

Commit

Permalink
feat: implement concurrent map (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Sep 25, 2023
1 parent 22d2d3b commit 443d886
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Async is a synchronization and asynchronous computation package for Go.

## Overview
* **ConcurrentMap** - Implements the generic `async.Map` interface in a thread-safe manner by delegating load/store operations to the underlying `sync.Map`.
* **Future** - A placeholder object for a value that may not yet exist.
* **Promise** - While futures are defined as a type of read-only placeholder object created for a result which doesn’t yet exist, a promise can be thought of as a writable, single-assignment container, which completes a future.
* **Task** - A data type for controlling possibly lazy and asynchronous computations.
Expand Down
150 changes: 150 additions & 0 deletions concurrent_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package async

import (
"sync"
"sync/atomic"
"time"
)

// ConcurrentMap implements the async.Map interface in a thread-safe manner
// by delegating load/store operations to the underlying sync.Map.
//
// The sync.Map type is optimized for two common use cases: (1) when the entry for a given
// key is only ever written once but read many times, as in caches that only grow,
// or (2) when multiple goroutines read, write, and overwrite entries for disjoint
// sets of keys. In these two cases, use of a sync.Map may significantly reduce lock
// contention compared to a Go map paired with a separate sync.Mutex or sync.RWMutex.
type ConcurrentMap[K comparable, V any] struct {
m atomic.Value
size int64
clearing int32 // TODO: use atomic.Bool when upgrading to/past go1.19
}

var _ Map[int, any] = (*ConcurrentMap[int, any])(nil)

// NewConcurrentMap returns a new ConcurrentMap instance.
func NewConcurrentMap[K comparable, V any]() *ConcurrentMap[K, V] {
var underlying atomic.Value
underlying.Store(&sync.Map{})
return &ConcurrentMap[K, V]{
m: underlying,
}
}

// Clear removes all of the mappings from this map.
func (cm *ConcurrentMap[K, V]) Clear() {
atomic.StoreInt32(&cm.clearing, 1)
defer atomic.StoreInt32(&cm.clearing, 0)
_ = cm.m.Swap(&sync.Map{})
atomic.StoreInt64(&cm.size, 0)
}

// ComputeIfAbsent attempts to compute a value using the given mapping
// function and enters it into the map, if the specified key is not
// already associated with a value.
func (cm *ConcurrentMap[K, V]) ComputeIfAbsent(key K, mappingFunction func(K) *V) *V {
value := cm.Get(key)
if value == nil {
computed, loaded := cm.smap().LoadOrStore(key, mappingFunction(key))
if !loaded {
atomic.AddInt64(&cm.size, 1)
}
return computed.(*V)
}
return value
}

// ContainsKey returns true if this map contains a mapping for the
// specified key.
func (cm *ConcurrentMap[K, V]) ContainsKey(key K) bool {
return cm.Get(key) != nil
}

// Get returns the value to which the specified key is mapped, or nil if
// this map contains no mapping for the key.
func (cm *ConcurrentMap[K, V]) Get(key K) *V {
value, ok := cm.smap().Load(key)
if !ok {
return nil
}
return value.(*V)
}

// GetOrDefault returns the value to which the specified key is mapped, or
// defaultValue if this map contains no mapping for the key.
func (cm *ConcurrentMap[K, V]) GetOrDefault(key K, defaultValue *V) *V {
value, ok := cm.smap().Load(key)
if !ok {
return defaultValue
}
return value.(*V)
}

// IsEmpty returns true if this map contains no key-value mappings.
func (cm *ConcurrentMap[K, V]) IsEmpty() bool {
return cm.Size() == 0
}

// KeySet returns a slice of the keys contained in this map.
func (cm *ConcurrentMap[K, V]) KeySet() []K {
keys := make([]K, 0, cm.Size())
rangeKeysFunc := func(key any, _ any) bool {
keys = append(keys, key.(K))
return true
}
cm.smap().Range(rangeKeysFunc)
return keys
}

// Put associates the specified value with the specified key in this map.
func (cm *ConcurrentMap[K, V]) Put(key K, value *V) {
// TODO: use sync.Map.Swap when upgrading to/past go1.20
_, loaded := cm.smap().LoadOrStore(key, value)
if !loaded {
atomic.AddInt64(&cm.size, 1)
} else {
cm.smap().Store(key, value)
}
}

// Remove removes the mapping for a key from this map if it is present,
// returning the previous value or nil if none.
func (cm *ConcurrentMap[K, V]) Remove(key K) *V {
value, loaded := cm.smap().LoadAndDelete(key)
if !loaded {
return nil
}
atomic.AddInt64(&cm.size, -1)
return value.(*V)
}

// Size returns the number of key-value mappings in this map.
func (cm *ConcurrentMap[K, V]) Size() int {
size := atomic.LoadInt64(&cm.size)
if size > 0 {
return int(size)
}
return 0
}

// Values returns a slice of the values contained in this map.
func (cm *ConcurrentMap[K, V]) Values() []*V {
values := make([]*V, 0, cm.Size())
rangeValuesFunc := func(_ any, value any) bool {
values = append(values, value.(*V))
return true
}
cm.smap().Range(rangeValuesFunc)
return values
}

func (cm *ConcurrentMap[K, V]) smap() *sync.Map {
for {
c := atomic.LoadInt32(&cm.clearing)
if c == 0 {
break
}
time.Sleep(time.Nanosecond)
}
return cm.m.Load().(*sync.Map)
}
168 changes: 168 additions & 0 deletions concurrent_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package async

import (
"runtime"
"strconv"
"sync"
"testing"
"time"

"github.com/reugn/async/internal"
)

func TestClear(t *testing.T) {
m := prepareConcurrentMap()
m.Clear()
internal.AssertEqual(t, m.Size(), 0)
m.Put(1, ptr("a"))
internal.AssertEqual(t, m.Size(), 1)
}

func TestComputeIfAbsent(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertEqual(
t,
m.ComputeIfAbsent(4, func(_ int) *string { return ptr("d") }),
ptr("d"),
)
internal.AssertEqual(t, m.Size(), 4)
internal.AssertEqual(
t,
m.ComputeIfAbsent(4, func(_ int) *string { return ptr("e") }),
ptr("d"),
)
internal.AssertEqual(t, m.Size(), 4)
}

func TestContainsKey(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertEqual(t, m.ContainsKey(3), true)
internal.AssertEqual(t, m.ContainsKey(4), false)
}

func TestGet(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertEqual(t, m.Get(1), ptr("a"))
internal.AssertEqual(t, m.Get(4), nil)
}

func TestGetOrDefault(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertEqual(t, m.GetOrDefault(1, ptr("e")), ptr("a"))
internal.AssertEqual(t, m.GetOrDefault(5, ptr("e")), ptr("e"))
}

func TestIsEmpty(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertEqual(t, m.IsEmpty(), false)
m.Clear()
internal.AssertEqual(t, m.IsEmpty(), true)
}

func TestKeySet(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertElementsMatch(t, m.KeySet(), []int{1, 2, 3})
m.Put(4, ptr("d"))
internal.AssertElementsMatch(t, m.KeySet(), []int{1, 2, 3, 4})
}

func TestPut(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertEqual(t, m.Size(), 3)
m.Put(4, ptr("d"))
internal.AssertEqual(t, m.Size(), 4)
internal.AssertEqual(t, m.Get(4), ptr("d"))
m.Put(4, ptr("e"))
internal.AssertEqual(t, m.Size(), 4)
internal.AssertEqual(t, m.Get(4), ptr("e"))
}

func TestRemove(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertEqual(t, m.Remove(3), ptr("c"))
internal.AssertEqual(t, m.Size(), 2)
internal.AssertEqual(t, m.Remove(5), nil)
internal.AssertEqual(t, m.Size(), 2)
}

func TestSize(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertEqual(t, m.Size(), 3)
}

func TestValues(t *testing.T) {
m := prepareConcurrentMap()
internal.AssertElementsMatch(
t,
m.Values(),
[]*string{ptr("a"), ptr("b"), ptr("c")},
)
m.Put(4, ptr("d"))
internal.AssertElementsMatch(
t,
m.Values(),
[]*string{ptr("a"), ptr("b"), ptr("c"), ptr("d")},
)
}

func TestMemoryLeaks(t *testing.T) {
var statsBefore runtime.MemStats
runtime.ReadMemStats(&statsBefore)

m := NewConcurrentMap[int, string]()

var wg sync.WaitGroup
wg.Add(4)
go func() {
defer wg.Done()
for i := 0; i < 1000000; i++ {
m.Put(i, ptr(strconv.Itoa(i)))
time.Sleep(time.Nanosecond)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
m.Clear()
time.Sleep(time.Millisecond)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
m.KeySet()
time.Sleep(time.Millisecond * 10)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 80; i++ {
m.Values()
time.Sleep(time.Millisecond * 12)
}
}()

wg.Wait()
m.Clear()
runtime.GC()

var statsAfter runtime.MemStats
runtime.ReadMemStats(&statsAfter)

internal.AssertEqual(t, m.IsEmpty(), true)
if statsAfter.HeapObjects > statsBefore.HeapObjects+50 {
t.Error("HeapObjects leak")
}
}

func prepareConcurrentMap() *ConcurrentMap[int, string] {
syncMap := NewConcurrentMap[int, string]()
syncMap.Put(1, ptr("a"))
syncMap.Put(2, ptr("b"))
syncMap.Put(3, ptr("c"))
return syncMap
}

func ptr(s string) *string {
return &s
}
19 changes: 19 additions & 0 deletions internal/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,25 @@ func AssertEqual[T any](t *testing.T, a, b T) {
}
}

// AssertElementsMatch checks whether the given slices contain the same elements.
func AssertElementsMatch[T any](t *testing.T, a, b []T) {
if len(a) == len(b) {
count := 0
for _, va := range a {
for _, vb := range b {
if reflect.DeepEqual(va, vb) {
count++
break
}
}
}
if count == len(a) {
return
}
}
t.Fatalf("Slice elements are not equal: %v != %v", a, b)
}

// AssertErrorContains checks whether the given error contains the specified string.
func AssertErrorContains(t *testing.T, err error, str string) {
if err == nil {
Expand Down
44 changes: 44 additions & 0 deletions map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package async

// A Map is an object that maps keys to values.
type Map[K comparable, V any] interface {

// Clear removes all of the mappings from this map.
Clear()

// ComputeIfAbsent attempts to compute a value using the given mapping
// function and enters it into the map, if the specified key is not
// already associated with a value.
ComputeIfAbsent(key K, mappingFunction func(K) *V) *V

// ContainsKey returns true if this map contains a mapping for the
// specified key.
ContainsKey(key K) bool

// Get returns the value to which the specified key is mapped, or nil if
// this map contains no mapping for the key.
Get(key K) *V

// GetOrDefault returns the value to which the specified key is mapped, or
// defaultValue if this map contains no mapping for the key.
GetOrDefault(key K, defaultValue *V) *V

// IsEmpty returns true if this map contains no key-value mappings.
IsEmpty() bool

// KeySet returns a slice of the keys contained in this map.
KeySet() []K

// Put associates the specified value with the specified key in this map.
Put(key K, value *V)

// Remove removes the mapping for a key from this map if it is present,
// returning the previous value or nil if none.
Remove(key K) *V

// Size returns the number of key-value mappings in this map.
Size() int

// Values returns a slice of the values contained in this map.
Values() []*V
}

0 comments on commit 443d886

Please sign in to comment.