diff --git a/server/events.go b/server/events.go index a8b637c5..0ebafde4 100644 --- a/server/events.go +++ b/server/events.go @@ -891,13 +891,14 @@ func (s *Server) shutdownEventing() { s.mu.Lock() clearTimer(&s.sys.sweeper) clearTimer(&s.sys.stmr) + sys := s.sys s.mu.Unlock() // We will queue up a shutdown event and wait for the // internal send loop to exit. s.sendShutdownEvent() - s.sys.wg.Wait() - close(s.sys.resetCh) + sys.wg.Wait() + close(sys.resetCh) s.mu.Lock() defer s.mu.Unlock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 8f0d8c35..3cec6117 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -726,7 +726,7 @@ func (js *jetStream) metaSnapshot() []byte { return s2.EncodeBetter(nil, b) } -func (js *jetStream) applyMetaSnapshot(buf []byte) error { +func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error { jse, err := s2.Decode(nil, buf) if err != nil { return err @@ -794,32 +794,62 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { // Do removals first. for _, sa := range saDel { + if isRecovering { + js.setStreamAssignmentResponded(sa) + } js.processStreamRemoval(sa) } // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { + if isRecovering { + js.setStreamAssignmentResponded(sa) + } js.processStreamAssignment(sa) // We can simply add the consumers. for _, ca := range sa.consumers { + if isRecovering { + js.setConsumerAssignmentResponded(ca) + } js.processConsumerAssignment(ca) } } // Now do the deltas for existing stream's consumers. for _, ca := range caDel { + if isRecovering { + js.setConsumerAssignmentResponded(ca) + } js.processConsumerRemoval(ca) } for _, ca := range caAdd { + if isRecovering { + js.setConsumerAssignmentResponded(ca) + } js.processConsumerAssignment(ca) } return nil } +// Called on recovery to make sure we do not process like original +func (js *jetStream) setStreamAssignmentResponded(sa *streamAssignment) { + js.mu.Lock() + defer js.mu.Unlock() + sa.responded = true + sa.Restore = nil +} + +// Called on recovery to make sure we do not process like original +func (js *jetStream) setConsumerAssignmentResponded(ca *consumerAssignment) { + js.mu.Lock() + defer js.mu.Unlock() + ca.responded = true +} + func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, error) { var didSnap bool for _, e := range entries { if e.Type == EntrySnapshot { - js.applyMetaSnapshot(e.Data) + js.applyMetaSnapshot(e.Data, isRecovering) didSnap = true } else { buf := e.Data @@ -830,9 +860,8 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) return didSnap, err } - // We process the assignment but ignore restore on recovery. - if sa.Restore != nil && isRecovering { - sa.Restore = nil + if isRecovering { + js.setStreamAssignmentResponded(sa) } js.processStreamAssignment(sa) case removeStreamOp: @@ -841,6 +870,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) return didSnap, err } + if isRecovering { + js.setStreamAssignmentResponded(sa) + } js.processStreamRemoval(sa) case assignConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) @@ -848,6 +880,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:]) return didSnap, err } + if isRecovering { + js.setConsumerAssignmentResponded(ca) + } js.processConsumerAssignment(ca) case assignCompressedConsumerOp: ca, err := decodeConsumerAssignmentCompressed(buf[1:]) @@ -855,6 +890,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:]) return didSnap, err } + if isRecovering { + js.setConsumerAssignmentResponded(ca) + } js.processConsumerAssignment(ca) case removeConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) @@ -862,6 +900,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:]) return didSnap, err } + if isRecovering { + js.setConsumerAssignmentResponded(ca) + } js.processConsumerRemoval(ca) default: panic("JetStream Cluster Unknown meta entry op type") @@ -1153,6 +1194,9 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) { acc, _ := s.LookupAccount(sa.Client.Account) restoreDoneCh = s.processStreamRestore(sa.Client, acc, sa.Config.Name, _EMPTY_, sa.Reply, _EMPTY_) } else { + if !isLeader && n.GroupLeader() != noLeader { + js.setStreamAssignmentResponded(sa) + } js.processStreamLeaderChange(mset, sa, isLeader) } case <-t.C: @@ -1466,10 +1510,9 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // Go ahead and create or update the stream. mset, err = acc.LookupStream(sa.Config.Name) if err == nil && mset != nil { + mset.setStreamAssignment(sa) if err := mset.Update(sa.Config); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) - } else { - mset.setStreamAssignment(sa) } } else if err == ErrJetStreamStreamNotFound { // Add in the stream here. @@ -1799,7 +1842,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { } } o.setConsumerAssignment(ca) - s.Debugf("JetStream cluster, consumer already running") + s.Debugf("JetStream cluster, consumer was already running") } // Add in the consumer if needed. @@ -1999,6 +2042,9 @@ func (js *jetStream) monitorConsumer(o *Consumer, ca *consumerAssignment) { } } case isLeader := <-lch: + if !isLeader && n.GroupLeader() != noLeader { + js.setConsumerAssignmentResponded(ca) + } js.processConsumerLeaderChange(o, ca, isLeader) case <-t.C: // TODO(dlc) - We should have this delayed a bit to not race the invariants. diff --git a/server/raft.go b/server/raft.go index ad16cdcd..5284ff31 100644 --- a/server/raft.go +++ b/server/raft.go @@ -66,6 +66,11 @@ type WAL interface { Delete() error } +type LeadChange struct { + Leader bool + Previous string +} + type Peer struct { ID string Current bool @@ -1698,6 +1703,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.writeTermVote() if isNew { n.resetElectionTimeout() + n.updateLeadChange(false) } } diff --git a/server/stream.go b/server/stream.go index 7e24e944..6e302cd9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -765,7 +765,14 @@ func (mset *Stream) Update(config *StreamConfig) error { } // Now update config and store's version of our config. mset.config = cfg - mset.sendUpdateAdvisoryLocked() + + var suppress bool + if mset.isClustered() && mset.sa != nil { + suppress = mset.sa.responded + } + if !suppress { + mset.sendUpdateAdvisoryLocked() + } mset.mu.Unlock() mset.store.UpdateConfig(&cfg) diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index fda675de..a1968d6c 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -173,7 +173,7 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { sl := c.streamLeader("$G", "TEST") sl.Shutdown() c.restartServer(sl) - c.waitOnNewStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "TEST") time.Sleep(500 * time.Millisecond) si, err = js.StreamInfo("TEST") @@ -184,7 +184,7 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } // Now durable consumer. - c.waitOnNewConsumerLeader("$G", "TEST", "dlc") + c.waitOnConsumerLeader("$G", "TEST", "dlc") time.Sleep(500 * time.Millisecond) if _, err = js.ConsumerInfo("TEST", "dlc"); err != nil { @@ -532,7 +532,7 @@ func TestJetStreamClusterConsumerState(t *testing.T) { } c.consumerLeader("$G", "TEST", "dlc").Shutdown() - c.waitOnNewConsumerLeader("$G", "TEST", "dlc") + c.waitOnConsumerLeader("$G", "TEST", "dlc") nci, err := sub.ConsumerInfo() if err != nil { @@ -768,7 +768,7 @@ func TestJetStreamClusterStreamSynchedTimeStamps(t *testing.T) { sl := c.streamLeader("$G", "foo") sl.Shutdown() - c.waitOnNewStreamLeader("$G", "foo") + c.waitOnStreamLeader("$G", "foo") s = c.randomServer() nc, js = jsClientConnect(t, s) @@ -929,7 +929,7 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { } c.consumerLeader("$G", "foo", "dlc").Shutdown() - c.waitOnNewConsumerLeader("$G", "foo", "dlc") + c.waitOnConsumerLeader("$G", "foo", "dlc") ci2, err := sub.ConsumerInfo() if err != nil { @@ -944,7 +944,7 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) { } // In case the server above was also stream leader. - c.waitOnNewStreamLeader("$G", "foo") + c.waitOnStreamLeader("$G", "foo") // Now send more.. // Send 10 more messages. @@ -1302,7 +1302,7 @@ func TestJetStreamClusterStreamNormalCatchup(t *testing.T) { sl := c.streamLeader("$G", "TEST") sl.Shutdown() - c.waitOnNewStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "TEST") // Send 10 more while one replica offline. for i := toSend; i <= toSend*2; i++ { @@ -1371,7 +1371,7 @@ func TestJetStreamClusterStreamSnapshotCatchup(t *testing.T) { sl := c.streamLeader("$G", "TEST") sl.Shutdown() - c.waitOnNewStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "TEST") sendBatch(100) @@ -1425,7 +1425,7 @@ func TestJetStreamClusterStreamSnapshotCatchupWithPurge(t *testing.T) { sl := c.streamLeader("$G", "TEST") sl.Shutdown() - c.waitOnNewStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "TEST") toSend := 10 for i := 0; i < toSend; i++ { @@ -1452,7 +1452,7 @@ func TestJetStreamClusterStreamSnapshotCatchupWithPurge(t *testing.T) { c.waitOnStreamCurrent(sl, "$G", "TEST") nsl.Shutdown() - c.waitOnNewStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "TEST") if _, err := js.StreamInfo("TEST"); err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1521,7 +1521,7 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) { oldLeader := c.streamLeader("$G", "TEST") oldLeader.Shutdown() - c.waitOnNewStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "TEST") // Re-request. leader = c.streamLeader("$G", "TEST").Name() @@ -1955,7 +1955,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { }) // Wait on the system to elect a leader for the restored consumer. - c.waitOnNewConsumerLeader("$G", "TEST", "dlc") + c.waitOnConsumerLeader("$G", "TEST", "dlc") // Now check for the consumer being recreated. nci, err := js.ConsumerInfo("TEST", "dlc") @@ -2532,6 +2532,86 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { } } +func TestJetStreamRestartAdvisories(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + sub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.API") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + nc.Flush() + + sendBatch := func(subject string, n int) { + t.Helper() + for i := 0; i < n; i++ { + if _, err := js.Publish(subject, []byte("JSC-OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + } + + // Add in some streams with msgs and consumers. + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-1", Replicas: 2}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.SubscribeSync("TEST-1", nats.Durable("DC")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sendBatch("TEST-1", 25) + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-2", Replicas: 2}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.SubscribeSync("TEST-2", nats.Durable("DC")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sendBatch("TEST-2", 50) + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-3", Replicas: 3, Storage: nats.MemoryStorage}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.SubscribeSync("TEST-3", nats.Durable("DC")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sendBatch("TEST-3", 100) + + // Wait for the advisories for all streams and consumers. + checkSubsPending(t, sub, 9) // 3 streams, 3 consumers, 3 stream names lookups for creating consumers. + for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) { + } + + usub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.*.UPDATED.>") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer usub.Unsubscribe() + nc.Flush() + + checkSubsPending(t, sub, 0) + checkSubsPending(t, usub, 0) + + // Now restart the other two servers we are not connected to. + for _, cs := range c.servers { + if cs != s { + cs.Shutdown() + c.restartServer(cs) + } + } + for _, cs := range c.servers { + c.waitOnServerCurrent(cs) + } + + checkSubsPending(t, sub, 0) + checkSubsPending(t, usub, 0) +} + func TestJetStreamClusterStreamPerf(t *testing.T) { // Comment out to run, holding place for now. skip(t) @@ -2739,7 +2819,7 @@ func (c *cluster) waitOnPeerCount(n int) { c.t.Fatalf("Expected a cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers())) } -func (c *cluster) waitOnNewConsumerLeader(account, stream, consumer string) { +func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) { c.t.Helper() expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) { @@ -2762,7 +2842,7 @@ func (c *cluster) consumerLeader(account, stream, consumer string) *server.Serve return nil } -func (c *cluster) waitOnNewStreamLeader(account, stream string) { +func (c *cluster) waitOnStreamLeader(account, stream string) { c.t.Helper() expires := time.Now().Add(10 * time.Second) for time.Now().Before(expires) {