From d7f76da597caeb548b3cfdceebdea42ee1e6d055 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 2 Aug 2023 12:51:29 +0100 Subject: [PATCH] Allow switching from limits-based to interest-based retention in stream update Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 2 +- server/jetstream_test.go | 110 ++++++++++++++++++++++++++++++++++++ server/stream.go | 38 ++++++++++++- server/test_test.go | 13 ++--- 4 files changed, 153 insertions(+), 10 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ca125a6c..6cb208c7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6980,7 +6980,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } else { nca := ca.copyGroup() - rBefore := ca.Config.replicas(sa.Config) + rBefore := nca.Config.replicas(sa.Config) rAfter := cfg.replicas(sa.Config) var curLeader string diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d8f1ceba..1147f8a5 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21407,3 +21407,113 @@ func TestJetStreamServerReencryption(t *testing.T) { }) } } + +func TestJetStreamLimitsToInterestPolicy(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.leader()) + defer nc.Close() + + // This is the index of the consumer that we'll create as R1 + // instead of R3, just to prove that it blocks the stream + // update from happening properly. + singleReplica := 3 + + streamCfg := nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.LimitsPolicy, + Storage: nats.MemoryStorage, + Replicas: 3, + } + + stream, err := js.AddStream(&streamCfg) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + replicas := streamCfg.Replicas + if i == singleReplica { + // Make one of the consumers R1 so that we can check + // that the switch to interest-based retention is also + // turning it into an R3 consumer. + replicas = 1 + } + cname := fmt.Sprintf("test_%d", i) + _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: cname, + Durable: cname, + AckPolicy: nats.AckAllPolicy, + Replicas: replicas, + }) + require_NoError(t, err) + } + + for i := 0; i < 20; i++ { + _, err := js.Publish("foo", []byte{1, 2, 3, 4, 5}) + require_NoError(t, err) + } + + // Pull 10 or more messages from the stream. We will never pull + // less than 10, which guarantees that the lowest ack floor of + // all consumers should be 10. + for i := 0; i < 10; i++ { + cname := fmt.Sprintf("test_%d", i) + count := 10 + i // At least 10 messages + + sub, err := js.PullSubscribe("foo", cname) + require_NoError(t, err) + + msgs, err := sub.Fetch(count) + require_NoError(t, err) + require_Equal(t, len(msgs), count) + require_NoError(t, msgs[len(msgs)-1].AckSync()) + + // At this point the ack floor should match the count of + // messages we received. + info, err := js.ConsumerInfo("TEST", cname) + require_NoError(t, err) + require_Equal(t, info.AckFloor.Consumer, uint64(count)) + } + + // Try updating to interest-based. This should fail because + // we have a consumer that is R1 on an R3 stream. + streamCfg = stream.Config + streamCfg.Retention = nats.InterestPolicy + _, err = js.UpdateStream(&streamCfg) + require_Error(t, err) + + // Now we'll make the R1 consumer an R3. + cname := fmt.Sprintf("test_%d", singleReplica) + cinfo, err := js.ConsumerInfo("TEST", cname) + require_NoError(t, err) + + cinfo.Config.Replicas = streamCfg.Replicas + _, _ = js.UpdateConsumer("TEST", &cinfo.Config) + // TODO(nat): The jsConsumerCreateRequest update doesn't always + // respond when there are no errors updating a consumer, so this + // nearly always returns a timeout, despite actually doing what + // it should. We'll make sure the replicas were updated by doing + // another consumer info just to be sure. + // require_NoError(t, err) + c.waitOnAllCurrent() + cinfo, err = js.ConsumerInfo("TEST", cname) + require_NoError(t, err) + require_Equal(t, cinfo.Config.Replicas, streamCfg.Replicas) + require_Equal(t, len(cinfo.Cluster.Replicas), streamCfg.Replicas-1) + + // This time it should succeed. + _, err = js.UpdateStream(&streamCfg) + require_NoError(t, err) + + // We need to wait for all nodes to have applied the new stream + // configuration. + c.waitOnAllCurrent() + + // Now we should only have 10 messages left in the stream, as + // each consumer has acked at least the first 10 messages. + info, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, info.State.FirstSeq, 11) + require_Equal(t, info.State.Msgs, 10) +} diff --git a/server/stream.go b/server/stream.go index 990ee014..ab1e2983 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1495,9 +1495,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str if cfg.Storage != old.Storage { return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change storage type")) } - // Can't change retention. + // Can only change retention from limits to interest or back, not to/from work queue for now. if cfg.Retention != old.Retention { - return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change retention policy")) + if old.Retention == WorkQueuePolicy || cfg.Retention == WorkQueuePolicy { + return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change retention policy to/from workqueue")) + } } // Can not have a template owner for now. if old.Template != _EMPTY_ { @@ -1785,9 +1787,41 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) // a subsequent update to an existing tier will then move from existing past tier to existing new tier } + if mset.isLeader() && ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy { + // Before we can update the retention policy for the consumer, we need + // the replica count of all consumers to match the stream. + for _, c := range mset.sa.consumers { + if c.Config.Replicas > 0 && c.Config.Replicas != cfg.Replicas { + mset.mu.Unlock() + return fmt.Errorf("consumer %q replica count must be %d", c.Name, cfg.Replicas) + } + } + } + // Now update config and store's version of our config. mset.cfg = *cfg + // If we're changing retention and haven't errored because of consumer + // replicas by now, whip through and update the consumer retention. + if ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy { + toUpdate := make([]*consumer, 0, len(mset.consumers)) + for _, c := range mset.consumers { + toUpdate = append(toUpdate, c) + } + mset.mu.Unlock() + for _, c := range toUpdate { + c.mu.Lock() + c.retention = cfg.Retention + c.mu.Unlock() + if c.retention == InterestPolicy { + // If we're switching to interest, force a check of the + // interest of existing stream messages. + c.checkStateForInterestStream() + } + } + mset.mu.Lock() + } + // If we are the leader never suppress update advisory, simply send. if mset.isLeader() && sendAdvisory { mset.sendUpdateAdvisoryLocked() diff --git a/server/test_test.go b/server/test_test.go index 64717f90..00bc53e5 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -14,7 +14,6 @@ package server import ( - "bytes" "fmt" "math/rand" "net/url" @@ -112,17 +111,17 @@ func require_Error(t *testing.T, err error, expected ...error) { t.Fatalf("Expected one of %v, got '%v'", expected, err) } -func require_Equal(t *testing.T, a, b string) { +func require_Equal[T comparable](t *testing.T, a, b T) { t.Helper() - if strings.Compare(a, b) != 0 { - t.Fatalf("require equal, but got: %v != %v", a, b) + if a != b { + t.Fatalf("require %T equal, but got: %v != %v", a, a, b) } } -func require_NotEqual(t *testing.T, a, b [32]byte) { +func require_NotEqual[T comparable](t *testing.T, a, b T) { t.Helper() - if bytes.Equal(a[:], b[:]) { - t.Fatalf("require not equal, but got: %v != %v", a, b) + if a == b { + t.Fatalf("require %T not equal, but got: %v != %v", a, a, b) } }