diff --git a/server/jetstream.go b/server/jetstream.go index 89cd2adf..d259567f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -456,6 +456,63 @@ func (s *Server) JetStreamEnabled() bool { return enabled } +// Will migrate off ephemerals if possible. +// This means parent stream needs to be replicated. +func (s *Server) migrateEphemerals() { + js, cc := s.getJetStreamCluster() + // Make sure JetStream is enabled and we are clustered. + if js == nil || cc == nil { + return + } + + var consumers []*consumerAssignment + + js.mu.Lock() + ourID := cc.meta.ID() + for _, asa := range cc.streams { + for _, sa := range asa { + if rg := sa.Group; rg != nil && len(rg.Peers) > 1 && rg.isMember(ourID) && len(sa.consumers) > 0 { + for _, ca := range sa.consumers { + if ca.Group != nil && len(ca.Group.Peers) == 1 && ca.Group.isMember(ourID) { + // Need to select possible new peer from parent stream. + for _, p := range rg.Peers { + if p != ourID { + ca.Group.Peers = []string{p} + ca.Group.Preferred = p + consumers = append(consumers, ca) + break + } + } + } + } + } + } + } + js.mu.Unlock() + + // Process the consumers. + for _, ca := range consumers { + // Locate the consumer itself. + if acc, err := s.LookupAccount(ca.Client.Account); err == nil && acc != nil { + if mset, err := acc.lookupStream(ca.Stream); err == nil && mset != nil { + if o := mset.lookupConsumer(ca.Name); o != nil { + state := o.readStoreState() + o.deleteWithoutAdvisory() + js.mu.Lock() + // Delete old one. + cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) + // Encode state and new name. + ca.State = state + ca.Name = createConsumerName() + addEntry := encodeAddConsumerAssignmentCompressed(ca) + cc.meta.ForwardProposal(addEntry) + js.mu.Unlock() + } + } + } + } +} + // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { s.mu.Lock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 585ad63e..44c0b2a4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3630,6 +3630,10 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // We need to set the ephemeral here before replicating. var oname string if !isDurableConsumer(cfg) { + // We chose to have ephemerals be R=1. + rg.Peers = []string{rg.Preferred} + rg.Name = groupNameForConsumer(rg.Peers, rg.Storage) + // Make sure name is unique. for { oname = createConsumerName() if sa.consumers != nil { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 2e43f231..f8cc73ff 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -2075,6 +2075,49 @@ func TestJetStreamClusterEphemeralConsumerCleanup(t *testing.T) { }) } +func TestJetStreamClusterEphemeralConsumersNotReplicated(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 3}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ci, _ := sub.ConsumerInfo() + if ci == nil { + t.Fatalf("Unexpected error: no consumer info") + } + + if _, err = js.Publish("foo", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + checkSubsPending(t, sub, 1) + + if ci.Cluster == nil || len(ci.Cluster.Replicas) != 0 { + t.Fatalf("Expected ephemeral to be R=1, got %+v", ci.Cluster) + } + scl := c.serverByName(ci.Cluster.Leader) + if scl == nil { + t.Fatalf("Could not select server where ephemeral consumer is running") + } + // Test migrations. + scl.Shutdown() + + if _, err = js.Publish("foo", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + checkSubsPending(t, sub, 2) +} + func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -2972,7 +3015,8 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - sub, err := js.SubscribeSync("NO-Q") + // Make durable to have R match Stream. + sub, err := js.SubscribeSync("NO-Q", nats.Durable("rr")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -4680,6 +4724,15 @@ func (c *cluster) waitOnAllCurrent() { } } +func (c *cluster) serverByName(sname string) *Server { + for _, s := range c.servers { + if s.Name() == sname { + return s + } + } + return nil +} + func (c *cluster) randomNonLeader() *Server { // range should randomize.. but.. for _, s := range c.servers { diff --git a/server/server.go b/server/server.go index 9ba85932..4210c861 100644 --- a/server/server.go +++ b/server/server.go @@ -1650,6 +1650,10 @@ func (s *Server) Shutdown() { // Transfer off any raft nodes that we are a leader by shutting them all down. s.shutdownRaftNodes() + // This is for clustered JetStream and ephemeral consumers. + // No-op if not clustered or not running JetStream. + s.migrateEphemerals() + // Shutdown the eventing system as needed. // This is done first to send out any messages for // account status. We will also clean up any