Fixed data race and fuxed bug that we would not clear our waiting queue when a leader stepped down.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-01-24 13:01:25 -08:00
parent 56870e4ddb
commit d332684322
2 changed files with 18 additions and 4 deletions

View File

@@ -863,6 +863,10 @@ func (o *consumer) setLeader(isLeader bool) {
close(o.qch)
o.qch = nil
}
// Reset waiting if we are in pull mode.
if o.isPullMode() {
o.waiting = newWaitQueue(o.cfg.MaxWaiting)
}
o.mu.Unlock()
}
}
@@ -2723,7 +2727,9 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
case <-mch:
// Messages are waiting.
case <-wrExp:
o.mu.Lock()
o.expireWaiting()
o.mu.Unlock()
case <-hbc:
if o.isActive() {
const t = "NATS/1.0 100 Idle Heartbeat\r\n%s: %d\r\n%s: %d\r\n\r\n"

View File

@@ -10076,6 +10076,13 @@ func TestJetStreamClusterPullConsumerLeaderChange(t *testing.T) {
require_NoError(t, err)
defer sub.Unsubscribe()
drainSub := func() {
t.Helper()
for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) {
}
checkSubsPending(t, sub, 0)
}
// Queue up a request that can live for a bit.
req := &JSApiConsumerGetNextRequest{Expires: 2 * time.Second}
jreq, err := json.Marshal(req)
@@ -10097,6 +10104,7 @@ func TestJetStreamClusterPullConsumerLeaderChange(t *testing.T) {
if m.Header.Get("Status") != "409" {
t.Fatalf("Expected a 409 status code, got %q", m.Header.Get("Status"))
}
checkSubsPending(t, sub, 0)
// Add a few messages to the stream to fulfill a request.
for i := 0; i < 10; i++ {
@@ -10109,13 +10117,13 @@ func TestJetStreamClusterPullConsumerLeaderChange(t *testing.T) {
err = nc.PublishRequest(rsubj, "reply", jreq)
require_NoError(t, err)
checkSubsPending(t, sub, 10)
// Drain this sub.
for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) {
}
drainSub()
// Now do a leader change again, make sure we do not get anything about that request.
_, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "dlc"), nil, time.Second)
require_NoError(t, err)
c.waitOnConsumerLeader("$G", "TEST", "dlc")
time.Sleep(100 * time.Millisecond)
checkSubsPending(t, sub, 0)
// Make sure we do not get anything if we expire, etc.
@@ -10125,7 +10133,7 @@ func TestJetStreamClusterPullConsumerLeaderChange(t *testing.T) {
err = nc.PublishRequest(rsubj, "reply", jreq)
require_NoError(t, err)
// Let it expire.
time.Sleep(300 * time.Millisecond)
time.Sleep(350 * time.Millisecond)
checkSubsPending(t, sub, 1)
// Now do a leader change again, make sure we do not get anything about that request.