diff --git a/server/ipqueue.go b/server/ipqueue.go index fe091e44..8519043b 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -1,4 +1,4 @@ -// Copyright 2021 The NATS Authors +// Copyright 2021-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -23,7 +23,7 @@ const ipQueueDefaultMaxRecycleSize = 4 * 1024 // This is a generic intra-process queue. type ipQueue[T any] struct { inprogress int64 - sync.RWMutex + sync.Mutex ch chan struct{} elts []T pos int @@ -180,9 +180,9 @@ func (q *ipQueue[T]) recycle(elts *[]T) { // Returns the current length of the queue. func (q *ipQueue[T]) len() int { - q.RLock() + q.Lock() l := len(q.elts) - q.pos - q.RUnlock() + q.Unlock() return l } diff --git a/server/ipqueue_test.go b/server/ipqueue_test.go index ba0e9f34..1800f2a1 100644 --- a/server/ipqueue_test.go +++ b/server/ipqueue_test.go @@ -221,9 +221,9 @@ func TestIPQueuePopOne(t *testing.T) { q.push(1) q.push(2) // Capture current capacity - q.RLock() + q.Lock() c := cap(q.elts) - q.RUnlock() + q.Unlock() e, ok = q.popOne() if !ok || e != 1 { t.Fatalf("Invalid value: %v", e) @@ -343,9 +343,9 @@ func TestIPQueueRecycle(t *testing.T) { values = q.pop() q.recycle(&values) q.push(1002) - q.RLock() + q.Lock() recycled := &q.elts == &values - q.RUnlock() + q.Unlock() if recycled { t.Fatalf("Unexpected recycled slice") }