diff --git a/.travis.yml b/.travis.yml index 6789b71f..19a8c783 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,6 @@ + +dist: focal + language: go go: - 1.15.x diff --git a/server/consumer.go b/server/consumer.go index 70a75bf1..d01aa3c2 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -582,14 +582,22 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, ca *consum mset.setConsumer(o) mset.mu.Unlock() - if !s.JetStreamIsClustered() { + if !s.JetStreamIsClustered() && s.standAloneMode() { o.setLeader(true) } // This is always true in single server mode. if o.isLeader() { // Send advisory. - o.sendCreateAdvisory() + var suppress bool + if !s.standAloneMode() && ca == nil { + suppress = true + } else if ca != nil { + suppress = ca.responded + } + if !suppress { + o.sendCreateAdvisory() + } } return o, nil @@ -2321,7 +2329,10 @@ func (o *Consumer) Stream() string { o.mu.RLock() mset := o.mset o.mu.RUnlock() - return mset.Name() + if mset != nil { + return mset.Name() + } + return _EMPTY_ } // Active indicates if this consumer is still active. diff --git a/server/jetstream.go b/server/jetstream.go index 9c56978b..f23f9495 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1038,7 +1038,7 @@ func (jsa *jsAccount) delete() { jsa.mu.Unlock() for _, ms := range streams { - ms.stop(false) + ms.stop(false, false) } for _, t := range ts { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2133244b..2da3ad89 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1368,6 +1368,9 @@ func (js *jetStream) processStreamLeaderChange(mset *Stream, sa *streamAssignmen } else { resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: mset.Config(), Cluster: s.clusterInfo(mset.raftNode())} s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp)) + if node := mset.raftNode(); node != nil { + mset.sendCreateAdvisory() + } } } @@ -1719,7 +1722,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, if mset.Config().internal { err = errors.New("not allowed to delete internal stream") } else { - err = mset.Delete() + err = mset.stop(true, wasLeader) } } @@ -1934,8 +1937,8 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb } else if mset != nil { if mset.Config().internal { err = errors.New("not allowed to delete internal consumer") - } else if obs := mset.LookupConsumer(ca.Name); obs != nil { - err = obs.Delete() + } else if o := mset.LookupConsumer(ca.Name); o != nil { + err = o.stop(true, true, wasLeader) } else { resp.Error = jsNoConsumerErr } @@ -1945,12 +1948,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb ca.Group.node.Delete() } - if !isMember || !wasLeader && ca.Group.node != nil && ca.Group.node.GroupLeader() != noLeader { - return - } - - // Just return if no reply subject. - if ca.Reply == _EMPTY_ { + if !wasLeader || ca.Reply == _EMPTY_ { return } @@ -2193,6 +2191,9 @@ func (js *jetStream) processConsumerLeaderChange(o *Consumer, ca *consumerAssign } else { resp.ConsumerInfo = o.Info() s.sendAPIResponse(client, acc, _EMPTY_, reply, _EMPTY_, s.jsonResponse(&resp)) + if node := o.raftNode(); node != nil { + o.sendCreateAdvisory() + } } } diff --git a/server/stream.go b/server/stream.go index 1798a5fc..7a1fdb38 100644 --- a/server/stream.go +++ b/server/stream.go @@ -282,7 +282,15 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa if isLeader { // Send advisory. - mset.sendCreateAdvisory() + var suppress bool + if !s.standAloneMode() && sa == nil { + suppress = true + } else if sa != nil { + suppress = sa.responded + } + if !suppress { + mset.sendCreateAdvisory() + } } return mset, nil @@ -497,10 +505,12 @@ func (mset *Stream) sendCreateAdvisory() { } j, err := json.MarshalIndent(m, "", " ") - if err == nil { - subj := JSAdvisoryStreamCreatedPre + "." + name - sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} + if err != nil { + return } + + subj := JSAdvisoryStreamCreatedPre + "." + name + sendq <- &jsPubMsg{subj, subj, _EMPTY_, nil, j, nil, 0} } func (mset *Stream) sendDeleteAdvisoryLocked() { @@ -670,16 +680,6 @@ func (mset *Stream) FileStoreConfig() (FileStoreConfig, error) { // Delete deletes a stream from the owning account. func (mset *Stream) Delete() error { - mset.mu.Lock() - jsa := mset.jsa - mset.mu.Unlock() - if jsa == nil { - return ErrJetStreamNotEnabledForAccount - } - jsa.mu.Lock() - delete(jsa.streams, mset.config.Name) - jsa.mu.Unlock() - return mset.delete() } @@ -1401,6 +1401,9 @@ func (mset *Stream) setupSendCapabilities() { // Name returns the stream name. func (mset *Stream) Name() string { + if mset == nil { + return _EMPTY_ + } mset.mu.Lock() defer mset.mu.Unlock() return mset.config.Name @@ -1468,11 +1471,24 @@ func (mset *Stream) internalSendLoop() { // Internal function to delete a stream. func (mset *Stream) delete() error { - return mset.stop(true) + return mset.stop(true, true) } // Internal function to stop or delete the stream. -func (mset *Stream) stop(delete bool) error { +func (mset *Stream) stop(deleteFlag, advisory bool) error { + mset.mu.RLock() + jsa := mset.jsa + mset.mu.RUnlock() + + if jsa == nil { + return ErrJetStreamNotEnabledForAccount + } + + // Remove from our account map. + jsa.mu.Lock() + delete(jsa.streams, mset.config.Name) + jsa.mu.Unlock() + // Clean up consumers. mset.mu.Lock() var obs []*Consumer @@ -1486,7 +1502,7 @@ func (mset *Stream) stop(delete bool) error { // Second flag says do not broadcast to signal. // TODO(dlc) - If we have an err here we don't want to stop // but should we log? - o.stop(delete, false, delete) + o.stop(deleteFlag, false, advisory) } mset.mu.Lock() @@ -1499,7 +1515,7 @@ func (mset *Stream) stop(delete bool) error { // Cluster cleanup if n := mset.node; n != nil { - if delete { + if deleteFlag { n.Delete() } else { n.Stop() @@ -1508,7 +1524,7 @@ func (mset *Stream) stop(delete bool) error { } // Send stream delete advisory after the consumers. - if delete { + if deleteFlag && advisory { mset.sendDeleteAdvisoryLocked() } @@ -1547,7 +1563,7 @@ func (mset *Stream) stop(delete bool) error { return nil } - if delete { + if deleteFlag { if err := mset.store.Delete(); err != nil { return err } diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 4e1c6117..dee3b9a4 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -1930,7 +1930,15 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) { } nc.Request(rresp.DeliverSubject, chunk[:n], time.Second) } - nc.Request(rresp.DeliverSubject, nil, time.Second) + rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + rresp.Error = nil + json.Unmarshal(rmsg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Got an unexpected error response: %+v", rresp.Error) + } si, err := js.StreamInfo("TEST") if err != nil { @@ -2532,7 +2540,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { } } -func TestJetStreamRestartAdvisories(t *testing.T) { +func TestJetStreamClusterRestartAndRemoveAdvisories(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -2546,6 +2554,12 @@ func TestJetStreamRestartAdvisories(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() + + csub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.*.CREATED.>") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer csub.Unsubscribe() nc.Flush() sendBatch := func(subject string, n int) { @@ -2582,10 +2596,18 @@ func TestJetStreamRestartAdvisories(t *testing.T) { } sendBatch("TEST-3", 100) + drainSub := func(sub *nats.Subscription) { + for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) { + } + } + // Wait for the advisories for all streams and consumers. checkSubsPending(t, sub, 9) // 3 streams, 3 consumers, 3 stream names lookups for creating consumers. - for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) { - } + drainSub(sub) + + // Created audit events. + checkSubsPending(t, csub, 6) + drainSub(csub) usub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.*.UPDATED.>") if err != nil { @@ -2604,12 +2626,38 @@ func TestJetStreamRestartAdvisories(t *testing.T) { c.restartServer(cs) } } - for _, cs := range c.servers { - c.waitOnServerCurrent(cs) - } + c.waitOnAllCurrent() + checkSubsPending(t, csub, 0) checkSubsPending(t, sub, 0) checkSubsPending(t, usub, 0) + + dsub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.*.DELETED.>") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer dsub.Unsubscribe() + nc.Flush() + + c.waitOnConsumerLeader("$G", "TEST-1", "DC") + + // Now check delete advisories as well. + if err := js.DeleteConsumer("TEST-1", "DC"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + checkSubsPending(t, csub, 0) + checkSubsPending(t, dsub, 1) + checkSubsPending(t, sub, 1) + checkSubsPending(t, usub, 0) + drainSub(dsub) + + if err := js.DeleteStream("TEST-3"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + checkSubsPending(t, dsub, 2) // Stream and the consumer underneath. + checkSubsPending(t, sub, 4) } func TestJetStreamClusterNoDuplicateOnNodeRestart(t *testing.T) { @@ -2998,6 +3046,12 @@ func (c *cluster) waitOnServerCurrent(s *server.Server) { c.t.Fatalf("Expected server %q to eventually be current", s) } +func (c *cluster) waitOnAllCurrent() { + for _, cs := range c.servers { + c.waitOnServerCurrent(cs) + } +} + func (c *cluster) randomNonLeader() *server.Server { // range should randomize.. but.. for _, s := range c.servers {