🩹 Fix: Memory leak removal in the idempotency middleware (#3263)

* 🩹 Fix: Add key removal in MemoryLock

* Fixed concurrent deletion.

* Fix: idempotency middleware's MemoryLock

* Add MemoryLock benchmarks.

* Updated benchmarks: Add returning error handling

* Renamed benchmark: RepeatedKeys

---------

Co-authored-by: Juan Calderon-Perez <835733+gaby@users.noreply.github.com>
pull/3266/head
Bulat Bagaviev 2024-12-28 16:29:31 +03:00 committed by GitHub
parent 57744ebbe8
commit 775e0a73f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 91 additions and 9 deletions

View File

@ -10,42 +10,58 @@ type Locker interface {
Unlock(key string) error
}
type countedLock struct {
mu sync.Mutex
locked int
}
type MemoryLock struct {
keys map[string]*sync.Mutex
keys map[string]*countedLock
mu sync.Mutex
}
func (l *MemoryLock) Lock(key string) error {
l.mu.Lock()
mu, ok := l.keys[key]
lock, ok := l.keys[key]
if !ok {
mu = new(sync.Mutex)
l.keys[key] = mu
lock = new(countedLock)
l.keys[key] = lock
}
lock.locked++
l.mu.Unlock()
mu.Lock()
lock.mu.Lock()
return nil
}
func (l *MemoryLock) Unlock(key string) error {
l.mu.Lock()
mu, ok := l.keys[key]
l.mu.Unlock()
lock, ok := l.keys[key]
if !ok {
// This happens if we try to unlock an unknown key
l.mu.Unlock()
return nil
}
l.mu.Unlock()
mu.Unlock()
lock.mu.Unlock()
l.mu.Lock()
lock.locked--
if lock.locked <= 0 {
// This happens if countedLock is used to Lock and Unlock the same number of times
// So, we can delete the key to prevent memory leak
delete(l.keys, key)
}
l.mu.Unlock()
return nil
}
func NewMemoryLock() *MemoryLock {
return &MemoryLock{
keys: make(map[string]*sync.Mutex),
keys: make(map[string]*countedLock),
}
}

View File

@ -1,6 +1,8 @@
package idempotency_test
import (
"strconv"
"sync/atomic"
"testing"
"time"
@ -59,3 +61,67 @@ func Test_MemoryLock(t *testing.T) {
require.NoError(t, err)
}
}
func Benchmark_MemoryLock(b *testing.B) {
keys := make([]string, b.N)
for i := range keys {
keys[i] = strconv.Itoa(i)
}
lock := idempotency.NewMemoryLock()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := keys[i]
if err := lock.Lock(key); err != nil {
b.Fatal(err)
}
if err := lock.Unlock(key); err != nil {
b.Fatal(err)
}
}
}
func Benchmark_MemoryLock_Parallel(b *testing.B) {
// In order to prevent using repeated keys I pre-allocate keys
keys := make([]string, 1_000_000)
for i := range keys {
keys[i] = strconv.Itoa(i)
}
b.Run("UniqueKeys", func(b *testing.B) {
lock := idempotency.NewMemoryLock()
var keyI atomic.Int32
b.RunParallel(func(p *testing.PB) {
for p.Next() {
i := int(keyI.Add(1)) % len(keys)
key := keys[i]
if err := lock.Lock(key); err != nil {
b.Fatal(err)
}
if err := lock.Unlock(key); err != nil {
b.Fatal(err)
}
}
})
})
b.Run("RepeatedKeys", func(b *testing.B) {
lock := idempotency.NewMemoryLock()
var keyI atomic.Int32
b.RunParallel(func(p *testing.PB) {
for p.Next() {
// Division by 3 ensures that index will be repreated exactly 3 times
i := int(keyI.Add(1)) / 3 % len(keys)
key := keys[i]
if err := lock.Lock(key); err != nil {
b.Fatal(err)
}
if err := lock.Unlock(key); err != nil {
b.Fatal(err)
}
}
})
})
}