Merge pull request #2830 from nats-io/issue-2828

FIXES #2828.
This commit is contained in:
Derek Collison
2022-01-30 09:47:05 -08:00
committed by GitHub
5 changed files with 66 additions and 4 deletions

View File

@@ -1294,6 +1294,13 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
return err
}
if o.store != nil {
// Update local state always.
if err := o.store.UpdateConfig(cfg); err != nil {
return err
}
}
// DeliverSubject
if cfg.DeliverSubject != o.cfg.DeliverSubject {
o.updateDeliverSubjectLocked(cfg.DeliverSubject)

View File

@@ -5074,6 +5074,17 @@ func encodeConsumerState(state *ConsumerState) []byte {
return buf[:n]
}
func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
o.mu.Lock()
defer o.mu.Unlock()
// This is mostly unchecked here. We are assuming the upper layers have done sanity checking.
csi := o.cfg
csi.ConsumerConfig = *cfg
return o.writeConsumerMeta()
}
func (o *consumerFileStore) Update(state *ConsumerState) error {
// Sanity checks.
if state.AckFloor.Consumer > state.Delivered.Consumer {

View File

@@ -14501,6 +14501,50 @@ func TestJetStreamPullConsumersMultipleRequestsExpireOutOfOrder(t *testing.T) {
}
}
func TestJetStreamConsumerUpdateSurvival(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "X"})
require_NoError(t, err)
// First create a consumer that is push based.
_, err = js.AddConsumer("X", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy, MaxAckPending: 1024})
require_NoError(t, err)
// Now do same name but pull. This will update the MaxAcKPending
ci, err := js.AddConsumer("X", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy, MaxAckPending: 22})
require_NoError(t, err)
if ci.Config.MaxAckPending != 22 {
t.Fatalf("Expected MaxAckPending to be 22, got %d", ci.Config.MaxAckPending)
}
// Make sure this survives across a restart.
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()
nc, js = jsClientConnect(t, s)
defer nc.Close()
ci, err = js.ConsumerInfo("X", "dlc")
require_NoError(t, err)
if ci.Config.MaxAckPending != 22 {
t.Fatalf("Expected MaxAckPending to be 22, got %d", ci.Config.MaxAckPending)
}
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////

View File

@@ -823,11 +823,10 @@ func (ms *memStore) Snapshot(_ time.Duration, _, _ bool) (*SnapshotResult, error
}
// No-ops.
func (os *consumerMemStore) Update(_ *ConsumerState) error { return nil }
func (os *consumerMemStore) Update(_ *ConsumerState) error { return nil }
func (os *consumerMemStore) UpdateDelivered(_, _, _ uint64, _ int64) error { return nil }
func (os *consumerMemStore) UpdateAcks(_, _ uint64) error { return nil }
func (os *consumerMemStore) UpdateAcks(_, _ uint64) error { return nil }
func (os *consumerMemStore) UpdateConfig(_ *ConsumerConfig) error { return nil }
func (os *consumerMemStore) Stop() error {
os.ms.decConsumers()

View File

@@ -154,6 +154,7 @@ type SnapshotResult struct {
type ConsumerStore interface {
UpdateDelivered(dseq, sseq, dc uint64, ts int64) error
UpdateAcks(dseq, sseq uint64) error
UpdateConfig(cfg *ConsumerConfig) error
Update(*ConsumerState) error
State() (*ConsumerState, error)
Type() StorageType