diff --git a/server/consumer.go b/server/consumer.go index dc6513fd..3d9d8a9b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2667,12 +2667,16 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { mset := o.mset clustered := o.node != nil + + // In case retention changes for a stream, this ought to have been updated + // using the consumer lock to avoid a race. + retention := o.retention o.mu.Unlock() // Let the owning stream know if we are interest or workqueue retention based. // If this consumer is clustered this will be handled by processReplicatedAck // after the ack has propagated. - if !clustered && mset != nil && mset.cfg.Retention != LimitsPolicy { + if !clustered && mset != nil && retention != LimitsPolicy { if sagap > 1 { // FIXME(dlc) - This is very inefficient, will need to fix. for seq := sseq; seq > sseq-sagap; seq-- { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index fcb2ffca..08c9ea25 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21510,6 +21510,99 @@ func TestJetStreamLimitsToInterestPolicy(t *testing.T) { require_Equal(t, info.State.Msgs, 10) } +func TestJetStreamLimitsToInterestPolicyWhileAcking(t *testing.T) { + for _, st := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} { + t.Run(st.String(), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.leader()) + defer nc.Close() + streamCfg := nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.LimitsPolicy, + Storage: st, + Replicas: 3, + } + + stream, err := js.AddStream(&streamCfg) + require_NoError(t, err) + + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + payload := []byte(strings.Repeat("A", 128)) + + wg.Add(1) + go func() { + defer wg.Done() + for range time.NewTicker(10 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + js.Publish("foo", payload) + } + }() + for i := 0; i < 5; i++ { + cname := fmt.Sprintf("test_%d", i) + sub, err := js.PullSubscribe("foo", cname) + require_NoError(t, err) + + wg.Add(1) + go func() { + defer wg.Done() + for range time.NewTicker(10 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + + msgs, err := sub.Fetch(1) + if err != nil && errors.Is(err, nats.ErrTimeout) { + t.Logf("ERROR: %v", err) + } + for _, msg := range msgs { + msg.Ack() + } + } + }() + } + // Leave running for a few secs then do the change on a different connection. + time.Sleep(5 * time.Second) + nc2, js2 := jsClientConnect(t, c.leader()) + defer nc2.Close() + + // Try updating to interest-based and changing replicas too. + streamCfg = stream.Config + streamCfg.Retention = nats.InterestPolicy + _, err = js2.UpdateStream(&streamCfg) + require_NoError(t, err) + + // We need to wait for all nodes to have applied the new stream + // configuration. + c.waitOnAllCurrent() + + var retention nats.RetentionPolicy + checkFor(t, 15*time.Second, 500*time.Millisecond, func() error { + info, err := js2.StreamInfo("TEST", nats.MaxWait(500*time.Millisecond)) + if err != nil { + return err + } + retention = info.Config.Retention + return nil + }) + require_Equal(t, retention, nats.InterestPolicy) + + // Cancel and wait for goroutines underneath. + cancel() + wg.Wait() + }) + } +} + func TestJetStreamUsageSyncDeadlock(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown()