Skip to content

Commit

Permalink
cache: make adjustments according to the comments
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Jun 30, 2021
1 parent 48c85ba commit af0c1e8
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 76 deletions.
83 changes: 44 additions & 39 deletions pkg/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ type InMemoryCache struct {
hitsExpired prometheus.Counter
// The input cache value would be copied to an inmemory array
// instead of simply using the one sent by the caller.
added prometheus.Counter
current prometheus.Gauge
singleflightsaved prometheus.Counter
added prometheus.Counter
current prometheus.Gauge
sfSaved prometheus.Counter

currentSize prometheus.Gauge
totalCurrentSize prometheus.Gauge
Expand Down Expand Up @@ -121,7 +121,7 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R

if config.Singleflight {
c.subs = make(map[string]*pubsub)
c.singleflightsaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{
c.sfSaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_singleflight_saved_calls_total",
Help: "Total number of calls saved by the singleflight mechanism.",
ConstLabels: prometheus.Labels{"name": name},
Expand Down Expand Up @@ -315,23 +315,24 @@ func (c *InMemoryCache) reset() {
func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
for key, val := range data {
c.set(key, val, ttl)
if !c.singleFlight {
continue
}

if c.singleFlight {
c.mu.Lock()
c.mu.Lock()

if c.subs[key] == nil {
c.mu.Unlock()
continue
}

for _, listener := range c.subs[key].listeners {
listener <- val
close(listener)
}

delete(c.subs, key)
if c.subs[key] == nil {
c.mu.Unlock()
continue
}

for _, listener := range c.subs[key].listeners {
listener <- val
close(listener)
}

delete(c.subs, key)
c.mu.Unlock()
}
}

Expand All @@ -342,29 +343,33 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b
for _, key := range keys {
if b, ok := c.get(key); ok {
results[key] = b
} else if c.singleFlight {
c.mu.Lock()
if c.subs[key] == nil {
continue
}
if !c.singleFlight {
continue
}

c.mu.Lock()
if c.subs[key] == nil {
c.subs[key] = &pubsub{originalCtx: ctx}
c.mu.Unlock()
} else {
if c.subs[key].originalCtx.Err() != nil {
c.subs[key] = &pubsub{originalCtx: ctx}
c.mu.Unlock()
} else {
if c.subs[key].originalCtx.Err() != nil {
c.subs[key] = &pubsub{originalCtx: ctx}
}
originalCtx := c.subs[key].originalCtx
respReceiver := make(chan []byte)
c.subs[key].listeners = append(c.subs[key].listeners, respReceiver)
c.mu.Unlock()

select {
case b := <-respReceiver:
results[key] = b
c.singleflightsaved.Inc()
case <-ctx.Done():
return results
case <-originalCtx.Done():
continue
}
}
originalCtx := c.subs[key].originalCtx
respReceiver := make(chan []byte)
c.subs[key].listeners = append(c.subs[key].listeners, respReceiver)
c.mu.Unlock()

select {
case b := <-respReceiver:
results[key] = b
c.sfSaved.Inc()
case <-ctx.Done():
return results
case <-originalCtx.Done():
continue
}
}
}
Expand Down
60 changes: 23 additions & 37 deletions pkg/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ single_flight: true

testutil.Ok(t, g.Wait())

testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved))
}

// TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works
Expand Down Expand Up @@ -94,7 +94,7 @@ single_flight: true
cancel()

testutil.Ok(t, g.Wait())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved))
}

// TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works
Expand Down Expand Up @@ -133,7 +133,7 @@ single_flight: true

time.Sleep(1 * time.Second)
testutil.Ok(t, g.Wait())
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.singleflightsaved))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.sfSaved))
}

func TestInmemoryCache(t *testing.T) {
Expand Down Expand Up @@ -235,6 +235,24 @@ max_item_size: 1MB
testutil.Equals(t, (*InMemoryCache)(nil), cache)
}

func benchCacheGetSet(b *testing.B, cache *InMemoryCache, numKeys, concurrency int) {
wg := &sync.WaitGroup{}

for k := 0; k < numKeys; k++ {
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
cache.Fetch(context.Background(), []string{fmt.Sprintf("%d", k)})
cache.Store(context.Background(), map[string][]byte{fmt.Sprintf("%d", k): {}}, 1*time.Minute)
}
}()
}
wg.Wait()
}
}

func BenchmarkInmemorySingleflight(b *testing.B) {
conf := []byte(`max_size: 2KB
max_item_size: 1KB`)
Expand All @@ -248,46 +266,14 @@ single_flight: true`)
sfCache, err := NewInMemoryCache("testsf", log.NewNopLogger(), nil, singleflightConf)
testutil.Ok(b, err)

var _ = sfCache

for _, numKeys := range []int{100, 1000, 10000} {
for _, concurrency := range []int{1, 5, 10} {
wg := &sync.WaitGroup{}

b.Run(fmt.Sprintf("inmemory_get_set_keys%d_c%d", numKeys, concurrency), func(b *testing.B) {
// b.N times get and set random number of numKeys keys.
for k := 0; k < numKeys; k++ {
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
cache.Fetch(context.Background(), []string{fmt.Sprintf("%d", k)})
cache.Store(context.Background(), map[string][]byte{fmt.Sprintf("%d", k): {}}, 1*time.Minute)
}
}()
}
wg.Wait()
}
benchCacheGetSet(b, cache, numKeys, concurrency)
})

wg = &sync.WaitGroup{}

b.Run(fmt.Sprintf("inmemory_singleflight_get_set_keys%d_conc%d", numKeys, concurrency), func(b *testing.B) {
// b.N times get and set random number of numKeys keys.
for k := 0; k < numKeys; k++ {
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
sfCache.Fetch(context.Background(), []string{fmt.Sprintf("%d", k)})
sfCache.Store(context.Background(), map[string][]byte{fmt.Sprintf("%d", k): {}}, 1*time.Minute)
}
}()
}
wg.Wait()
}
benchCacheGetSet(b, sfCache, numKeys, concurrency)
})
}
}
Expand Down

0 comments on commit af0c1e8

Please sign in to comment.