diff --git a/server/stream.go b/server/stream.go index 6e302cd9..1798a5fc 100644 --- a/server/stream.go +++ b/server/stream.go @@ -122,6 +122,7 @@ type Stream struct { ddindex int ddtmr *time.Timer qch chan struct{} + active bool // Clustered mode. sa *streamAssignment @@ -266,7 +267,8 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa } // Call directly to set leader if not in clustered mode. - if !s.JetStreamIsClustered() { + // This can be called though before we actually setup clustering, so check both. + if !s.JetStreamIsClustered() && s.standAloneMode() { if err := mset.setLeader(true); err != nil { mset.Delete() return nil, err @@ -309,6 +311,9 @@ func (mset *Stream) setLeader(isLeader bool) error { mset.mu.Lock() // If we are here we have a change in leader status. if isLeader { + // Make sure we are listening for sync requests. + // TODO(dlc) - Original design was that all in sync members of the group would do DQ. + mset.startClusterSubs() // Setup subscriptions if err := mset.subscribeToStream(); err != nil { mset.mu.Unlock() @@ -316,9 +321,6 @@ func (mset *Stream) setLeader(isLeader bool) error { mset.Delete() return err } - // Make sure we are listening for sync requests. - // TODO(dlc) - Original design was that all in sync members of the group would do DQ. - mset.startClusterSubs() } else { // Stop responding to sync requests. mset.stopClusterSubs() @@ -840,11 +842,15 @@ func (mset *Stream) removeMsg(seq uint64, secure bool) (bool, error) { // Will create internal subscriptions for the stream. // Lock should be held. func (mset *Stream) subscribeToStream() error { + if mset.active { + return nil + } for _, subject := range mset.config.Subjects { if _, err := mset.subscribeInternal(subject, mset.processInboundJetStreamMsg); err != nil { return err } } + mset.active = true return nil } @@ -854,6 +860,7 @@ func (mset *Stream) unsubscribeToStream() error { for _, subject := range mset.config.Subjects { mset.unsubscribeInternal(subject) } + mset.active = false return nil } diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index a1968d6c..4207e373 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -2612,6 +2612,56 @@ func TestJetStreamRestartAdvisories(t *testing.T) { checkSubsPending(t, usub, 0) } +func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "ND", 2) + defer c.shutdown() + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.SubscribeSync("foo", nats.Durable("dlc")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js.Publish("foo", []byte("msg1")) + if m, err := sub.NextMsg(time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else { + m.Ack() + } + + sl := c.streamLeader("$G", "TEST") + sl.Shutdown() + c.restartServer(sl) + c.waitOnStreamLeader("$G", "TEST") + + // Send second msg + js.Publish("foo", []byte("msg2")) + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error getting message: %v", err) + } + if string(msg.Data) != "msg2" { + t.Fatalf("Unexpected message: %s", msg.Data) + } + msg.Ack() + + // Make sure we don't get a duplicate. + msg, err = sub.NextMsg(250 * time.Millisecond) + if err == nil { + t.Fatalf("Should have gotten an error, got %s", msg.Data) + } +} + func TestJetStreamClusterStreamPerf(t *testing.T) { // Comment out to run, holding place for now. skip(t)