diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index b8b0a9fb..5de969c7 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10849,6 +10849,162 @@ func TestJetStreamClusterMirrorOrSourceNotActiveReporting(t *testing.T) { } } +func TestJetStreamStreamAdvisories(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + checkAdv := func(t *testing.T, sub *nats.Subscription, expectedPrefixes ...string) { + t.Helper() + seen := make([]bool, len(expectedPrefixes)) + for i := 0; i < len(expectedPrefixes); i++ { + msg := natsNexMsg(t, sub, time.Second) + var gotOne bool + for j, pfx := range expectedPrefixes { + if !seen[j] && strings.HasPrefix(msg.Subject, pfx) { + seen[j] = true + gotOne = true + break + } + } + if !gotOne { + t.Fatalf("Expected one of prefixes %q, got %q", expectedPrefixes, msg.Subject) + } + } + } + + // Used to keep stream names pseudo unique. t.Name() has slashes in it which caused problems. + var testN int + + checkAdvisories := func(t *testing.T, s *Server, replicas int) { + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + testN++ + streamName := "TEST_ADVISORIES_" + fmt.Sprintf("%d", testN) + + sub := natsSubSync(t, nc, "$JS.EVENT.ADVISORY.STREAM.*."+streamName) + + si, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + Storage: nats.FileStorage, + Replicas: replicas, + }) + require_NoError(t, err) + advisories := []string{JSAdvisoryStreamCreatedPre} + if replicas > 1 { + advisories = append(advisories, JSAdvisoryStreamLeaderElectedPre) + } + checkAdv(t, sub, advisories...) + + si.Config.MaxMsgs = 1000 + _, err = js.UpdateStream(&si.Config) + require_NoError(t, err) + checkAdv(t, sub, JSAdvisoryStreamUpdatedPre) + + snapreq := &JSApiStreamSnapshotRequest{ + DeliverSubject: nats.NewInbox(), + ChunkSize: 512, + } + var snapshot []byte + done := make(chan bool) + nc.Subscribe(snapreq.DeliverSubject, func(m *nats.Msg) { + // EOF + if len(m.Data) == 0 { + done <- true + return + } + // Could be writing to a file here too. + snapshot = append(snapshot, m.Data...) + // Flow ack + m.Respond(nil) + }) + + req, _ := json.Marshal(snapreq) + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamSnapshotT, streamName), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error on snapshot request: %v", err) + } + + var snapresp JSApiStreamSnapshotResponse + json.Unmarshal(rmsg.Data, &snapresp) + if snapresp.Error != nil { + t.Fatalf("Did not get correct error response: %+v", snapresp.Error) + } + + // Wait to receive the snapshot. + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive our snapshot in time") + } + + checkAdv(t, sub, JSAdvisoryStreamSnapshotCreatePre) + checkAdv(t, sub, JSAdvisoryStreamSnapshotCompletePre) + + err = js.DeleteStream(streamName) + require_NoError(t, err) + checkAdv(t, sub, JSAdvisoryStreamDeletedPre) + + state := *snapresp.State + config := *snapresp.Config + resreq := &JSApiStreamRestoreRequest{ + Config: config, + State: state, + } + req, _ = json.Marshal(resreq) + rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, streamName), req, 5*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var resresp JSApiStreamRestoreResponse + json.Unmarshal(rmsg.Data, &resresp) + if resresp.Error != nil { + t.Fatalf("Got an unexpected error response: %+v", resresp.Error) + } + + // Send our snapshot back in to restore the stream. + // Can be any size message. + var chunk [1024]byte + for r := bytes.NewReader(snapshot); ; { + n, err := r.Read(chunk[:]) + if err != nil { + break + } + nc.Request(resresp.DeliverSubject, chunk[:n], time.Second) + } + rmsg, err = nc.Request(resresp.DeliverSubject, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + resresp.Error = nil + json.Unmarshal(rmsg.Data, &resresp) + if resresp.Error != nil { + t.Fatalf("Got an unexpected error response: %+v", resresp.Error) + } + + checkAdv(t, sub, JSAdvisoryStreamRestoreCreatePre) + // At this point, the stream_created advisory may be sent before + // or after the restore_complete advisory because they are sent + // using different "send queues". That is, the restore uses the + // server's event queue while the stream_created is sent from + // the stream's own send queue. + advisories = append(advisories, JSAdvisoryStreamRestoreCompletePre) + checkAdv(t, sub, advisories...) + } + + t.Run("Single", func(t *testing.T) { checkAdvisories(t, s, 1) }) + t.Run("Clustered_R1", func(t *testing.T) { checkAdvisories(t, c.randomServer(), 1) }) + t.Run("Clustered_R3", func(t *testing.T) { checkAdvisories(t, c.randomServer(), 3) }) +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/stream.go b/server/stream.go index fdbe41a4..41ff5444 100644 --- a/server/stream.go +++ b/server/stream.go @@ -464,7 +464,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Send advisory. var suppress bool if !s.standAloneMode() && sa == nil { - suppress = true + if cfg.Replicas > 1 { + suppress = true + } } else if sa != nil { suppress = sa.responded } @@ -1048,11 +1050,8 @@ func (mset *stream) update(config *StreamConfig) error { // Now update config and store's version of our config. mset.cfg = *cfg - var suppress bool - if mset.isClustered() && mset.sa != nil { - suppress = mset.sa.responded - } - if mset.isLeader() && !suppress { + // If we are the leader never suppres update advisory, simply send. + if mset.isLeader() { mset.sendUpdateAdvisoryLocked() } mset.mu.Unlock() @@ -3327,12 +3326,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.infoSub = nil } - // Quit channel. - if mset.qch != nil { - close(mset.qch) - mset.qch = nil - } - // Cluster cleanup if n := mset.node; n != nil { if deleteFlag { @@ -3347,6 +3340,12 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } + // Quit channel, do this after sending the delete advisory + if mset.qch != nil { + close(mset.qch) + mset.qch = nil + } + c := mset.client mset.client = nil if c == nil {