mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Suppress additional advisories on server restart and leadership changes.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -891,13 +891,14 @@ func (s *Server) shutdownEventing() {
|
||||
s.mu.Lock()
|
||||
clearTimer(&s.sys.sweeper)
|
||||
clearTimer(&s.sys.stmr)
|
||||
sys := s.sys
|
||||
s.mu.Unlock()
|
||||
|
||||
// We will queue up a shutdown event and wait for the
|
||||
// internal send loop to exit.
|
||||
s.sendShutdownEvent()
|
||||
s.sys.wg.Wait()
|
||||
close(s.sys.resetCh)
|
||||
sys.wg.Wait()
|
||||
close(sys.resetCh)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -726,7 +726,7 @@ func (js *jetStream) metaSnapshot() []byte {
|
||||
return s2.EncodeBetter(nil, b)
|
||||
}
|
||||
|
||||
func (js *jetStream) applyMetaSnapshot(buf []byte) error {
|
||||
func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error {
|
||||
jse, err := s2.Decode(nil, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -794,32 +794,62 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error {
|
||||
|
||||
// Do removals first.
|
||||
for _, sa := range saDel {
|
||||
if isRecovering {
|
||||
js.setStreamAssignmentResponded(sa)
|
||||
}
|
||||
js.processStreamRemoval(sa)
|
||||
}
|
||||
// Now do add for the streams. Also add in all consumers.
|
||||
for _, sa := range saAdd {
|
||||
if isRecovering {
|
||||
js.setStreamAssignmentResponded(sa)
|
||||
}
|
||||
js.processStreamAssignment(sa)
|
||||
// We can simply add the consumers.
|
||||
for _, ca := range sa.consumers {
|
||||
if isRecovering {
|
||||
js.setConsumerAssignmentResponded(ca)
|
||||
}
|
||||
js.processConsumerAssignment(ca)
|
||||
}
|
||||
}
|
||||
// Now do the deltas for existing stream's consumers.
|
||||
for _, ca := range caDel {
|
||||
if isRecovering {
|
||||
js.setConsumerAssignmentResponded(ca)
|
||||
}
|
||||
js.processConsumerRemoval(ca)
|
||||
}
|
||||
for _, ca := range caAdd {
|
||||
if isRecovering {
|
||||
js.setConsumerAssignmentResponded(ca)
|
||||
}
|
||||
js.processConsumerAssignment(ca)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Called on recovery to make sure we do not process like original
|
||||
func (js *jetStream) setStreamAssignmentResponded(sa *streamAssignment) {
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
sa.responded = true
|
||||
sa.Restore = nil
|
||||
}
|
||||
|
||||
// Called on recovery to make sure we do not process like original
|
||||
func (js *jetStream) setConsumerAssignmentResponded(ca *consumerAssignment) {
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
ca.responded = true
|
||||
}
|
||||
|
||||
func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, error) {
|
||||
var didSnap bool
|
||||
for _, e := range entries {
|
||||
if e.Type == EntrySnapshot {
|
||||
js.applyMetaSnapshot(e.Data)
|
||||
js.applyMetaSnapshot(e.Data, isRecovering)
|
||||
didSnap = true
|
||||
} else {
|
||||
buf := e.Data
|
||||
@@ -830,9 +860,8 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
|
||||
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
|
||||
return didSnap, err
|
||||
}
|
||||
// We process the assignment but ignore restore on recovery.
|
||||
if sa.Restore != nil && isRecovering {
|
||||
sa.Restore = nil
|
||||
if isRecovering {
|
||||
js.setStreamAssignmentResponded(sa)
|
||||
}
|
||||
js.processStreamAssignment(sa)
|
||||
case removeStreamOp:
|
||||
@@ -841,6 +870,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
|
||||
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
|
||||
return didSnap, err
|
||||
}
|
||||
if isRecovering {
|
||||
js.setStreamAssignmentResponded(sa)
|
||||
}
|
||||
js.processStreamRemoval(sa)
|
||||
case assignConsumerOp:
|
||||
ca, err := decodeConsumerAssignment(buf[1:])
|
||||
@@ -848,6 +880,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
|
||||
js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:])
|
||||
return didSnap, err
|
||||
}
|
||||
if isRecovering {
|
||||
js.setConsumerAssignmentResponded(ca)
|
||||
}
|
||||
js.processConsumerAssignment(ca)
|
||||
case assignCompressedConsumerOp:
|
||||
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
|
||||
@@ -855,6 +890,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
|
||||
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:])
|
||||
return didSnap, err
|
||||
}
|
||||
if isRecovering {
|
||||
js.setConsumerAssignmentResponded(ca)
|
||||
}
|
||||
js.processConsumerAssignment(ca)
|
||||
case removeConsumerOp:
|
||||
ca, err := decodeConsumerAssignment(buf[1:])
|
||||
@@ -862,6 +900,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
|
||||
js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:])
|
||||
return didSnap, err
|
||||
}
|
||||
if isRecovering {
|
||||
js.setConsumerAssignmentResponded(ca)
|
||||
}
|
||||
js.processConsumerRemoval(ca)
|
||||
default:
|
||||
panic("JetStream Cluster Unknown meta entry op type")
|
||||
@@ -1153,6 +1194,9 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) {
|
||||
acc, _ := s.LookupAccount(sa.Client.Account)
|
||||
restoreDoneCh = s.processStreamRestore(sa.Client, acc, sa.Config.Name, _EMPTY_, sa.Reply, _EMPTY_)
|
||||
} else {
|
||||
if !isLeader && n.GroupLeader() != noLeader {
|
||||
js.setStreamAssignmentResponded(sa)
|
||||
}
|
||||
js.processStreamLeaderChange(mset, sa, isLeader)
|
||||
}
|
||||
case <-t.C:
|
||||
@@ -1466,10 +1510,9 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
|
||||
// Go ahead and create or update the stream.
|
||||
mset, err = acc.LookupStream(sa.Config.Name)
|
||||
if err == nil && mset != nil {
|
||||
mset.setStreamAssignment(sa)
|
||||
if err := mset.Update(sa.Config); err != nil {
|
||||
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
|
||||
} else {
|
||||
mset.setStreamAssignment(sa)
|
||||
}
|
||||
} else if err == ErrJetStreamStreamNotFound {
|
||||
// Add in the stream here.
|
||||
@@ -1799,7 +1842,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
|
||||
}
|
||||
}
|
||||
o.setConsumerAssignment(ca)
|
||||
s.Debugf("JetStream cluster, consumer already running")
|
||||
s.Debugf("JetStream cluster, consumer was already running")
|
||||
}
|
||||
|
||||
// Add in the consumer if needed.
|
||||
@@ -1999,6 +2042,9 @@ func (js *jetStream) monitorConsumer(o *Consumer, ca *consumerAssignment) {
|
||||
}
|
||||
}
|
||||
case isLeader := <-lch:
|
||||
if !isLeader && n.GroupLeader() != noLeader {
|
||||
js.setConsumerAssignmentResponded(ca)
|
||||
}
|
||||
js.processConsumerLeaderChange(o, ca, isLeader)
|
||||
case <-t.C:
|
||||
// TODO(dlc) - We should have this delayed a bit to not race the invariants.
|
||||
|
||||
@@ -66,6 +66,11 @@ type WAL interface {
|
||||
Delete() error
|
||||
}
|
||||
|
||||
type LeadChange struct {
|
||||
Leader bool
|
||||
Previous string
|
||||
}
|
||||
|
||||
type Peer struct {
|
||||
ID string
|
||||
Current bool
|
||||
@@ -1698,6 +1703,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
n.writeTermVote()
|
||||
if isNew {
|
||||
n.resetElectionTimeout()
|
||||
n.updateLeadChange(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -765,7 +765,14 @@ func (mset *Stream) Update(config *StreamConfig) error {
|
||||
}
|
||||
// Now update config and store's version of our config.
|
||||
mset.config = cfg
|
||||
mset.sendUpdateAdvisoryLocked()
|
||||
|
||||
var suppress bool
|
||||
if mset.isClustered() && mset.sa != nil {
|
||||
suppress = mset.sa.responded
|
||||
}
|
||||
if !suppress {
|
||||
mset.sendUpdateAdvisoryLocked()
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
|
||||
mset.store.UpdateConfig(&cfg)
|
||||
|
||||
@@ -173,7 +173,7 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) {
|
||||
sl := c.streamLeader("$G", "TEST")
|
||||
sl.Shutdown()
|
||||
c.restartServer(sl)
|
||||
c.waitOnNewStreamLeader("$G", "TEST")
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
si, err = js.StreamInfo("TEST")
|
||||
@@ -184,7 +184,7 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) {
|
||||
t.Fatalf("StreamInfo is not correct %+v", si)
|
||||
}
|
||||
// Now durable consumer.
|
||||
c.waitOnNewConsumerLeader("$G", "TEST", "dlc")
|
||||
c.waitOnConsumerLeader("$G", "TEST", "dlc")
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
if _, err = js.ConsumerInfo("TEST", "dlc"); err != nil {
|
||||
@@ -532,7 +532,7 @@ func TestJetStreamClusterConsumerState(t *testing.T) {
|
||||
}
|
||||
|
||||
c.consumerLeader("$G", "TEST", "dlc").Shutdown()
|
||||
c.waitOnNewConsumerLeader("$G", "TEST", "dlc")
|
||||
c.waitOnConsumerLeader("$G", "TEST", "dlc")
|
||||
|
||||
nci, err := sub.ConsumerInfo()
|
||||
if err != nil {
|
||||
@@ -768,7 +768,7 @@ func TestJetStreamClusterStreamSynchedTimeStamps(t *testing.T) {
|
||||
sl := c.streamLeader("$G", "foo")
|
||||
|
||||
sl.Shutdown()
|
||||
c.waitOnNewStreamLeader("$G", "foo")
|
||||
c.waitOnStreamLeader("$G", "foo")
|
||||
|
||||
s = c.randomServer()
|
||||
nc, js = jsClientConnect(t, s)
|
||||
@@ -929,7 +929,7 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
|
||||
}
|
||||
|
||||
c.consumerLeader("$G", "foo", "dlc").Shutdown()
|
||||
c.waitOnNewConsumerLeader("$G", "foo", "dlc")
|
||||
c.waitOnConsumerLeader("$G", "foo", "dlc")
|
||||
|
||||
ci2, err := sub.ConsumerInfo()
|
||||
if err != nil {
|
||||
@@ -944,7 +944,7 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
|
||||
}
|
||||
|
||||
// In case the server above was also stream leader.
|
||||
c.waitOnNewStreamLeader("$G", "foo")
|
||||
c.waitOnStreamLeader("$G", "foo")
|
||||
|
||||
// Now send more..
|
||||
// Send 10 more messages.
|
||||
@@ -1302,7 +1302,7 @@ func TestJetStreamClusterStreamNormalCatchup(t *testing.T) {
|
||||
|
||||
sl := c.streamLeader("$G", "TEST")
|
||||
sl.Shutdown()
|
||||
c.waitOnNewStreamLeader("$G", "TEST")
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
|
||||
// Send 10 more while one replica offline.
|
||||
for i := toSend; i <= toSend*2; i++ {
|
||||
@@ -1371,7 +1371,7 @@ func TestJetStreamClusterStreamSnapshotCatchup(t *testing.T) {
|
||||
sl := c.streamLeader("$G", "TEST")
|
||||
|
||||
sl.Shutdown()
|
||||
c.waitOnNewStreamLeader("$G", "TEST")
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
|
||||
sendBatch(100)
|
||||
|
||||
@@ -1425,7 +1425,7 @@ func TestJetStreamClusterStreamSnapshotCatchupWithPurge(t *testing.T) {
|
||||
sl := c.streamLeader("$G", "TEST")
|
||||
|
||||
sl.Shutdown()
|
||||
c.waitOnNewStreamLeader("$G", "TEST")
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
|
||||
toSend := 10
|
||||
for i := 0; i < toSend; i++ {
|
||||
@@ -1452,7 +1452,7 @@ func TestJetStreamClusterStreamSnapshotCatchupWithPurge(t *testing.T) {
|
||||
c.waitOnStreamCurrent(sl, "$G", "TEST")
|
||||
|
||||
nsl.Shutdown()
|
||||
c.waitOnNewStreamLeader("$G", "TEST")
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
|
||||
if _, err := js.StreamInfo("TEST"); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
@@ -1521,7 +1521,7 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
|
||||
oldLeader := c.streamLeader("$G", "TEST")
|
||||
oldLeader.Shutdown()
|
||||
|
||||
c.waitOnNewStreamLeader("$G", "TEST")
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
|
||||
// Re-request.
|
||||
leader = c.streamLeader("$G", "TEST").Name()
|
||||
@@ -1955,7 +1955,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
|
||||
})
|
||||
|
||||
// Wait on the system to elect a leader for the restored consumer.
|
||||
c.waitOnNewConsumerLeader("$G", "TEST", "dlc")
|
||||
c.waitOnConsumerLeader("$G", "TEST", "dlc")
|
||||
|
||||
// Now check for the consumer being recreated.
|
||||
nci, err := js.ConsumerInfo("TEST", "dlc")
|
||||
@@ -2532,6 +2532,86 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamRestartAdvisories(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
sub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.API")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
nc.Flush()
|
||||
|
||||
sendBatch := func(subject string, n int) {
|
||||
t.Helper()
|
||||
for i := 0; i < n; i++ {
|
||||
if _, err := js.Publish(subject, []byte("JSC-OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add in some streams with msgs and consumers.
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-1", Replicas: 2}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := js.SubscribeSync("TEST-1", nats.Durable("DC")); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
sendBatch("TEST-1", 25)
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-2", Replicas: 2}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := js.SubscribeSync("TEST-2", nats.Durable("DC")); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
sendBatch("TEST-2", 50)
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST-3", Replicas: 3, Storage: nats.MemoryStorage}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := js.SubscribeSync("TEST-3", nats.Durable("DC")); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
sendBatch("TEST-3", 100)
|
||||
|
||||
// Wait for the advisories for all streams and consumers.
|
||||
checkSubsPending(t, sub, 9) // 3 streams, 3 consumers, 3 stream names lookups for creating consumers.
|
||||
for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) {
|
||||
}
|
||||
|
||||
usub, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.*.UPDATED.>")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer usub.Unsubscribe()
|
||||
nc.Flush()
|
||||
|
||||
checkSubsPending(t, sub, 0)
|
||||
checkSubsPending(t, usub, 0)
|
||||
|
||||
// Now restart the other two servers we are not connected to.
|
||||
for _, cs := range c.servers {
|
||||
if cs != s {
|
||||
cs.Shutdown()
|
||||
c.restartServer(cs)
|
||||
}
|
||||
}
|
||||
for _, cs := range c.servers {
|
||||
c.waitOnServerCurrent(cs)
|
||||
}
|
||||
|
||||
checkSubsPending(t, sub, 0)
|
||||
checkSubsPending(t, usub, 0)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamPerf(t *testing.T) {
|
||||
// Comment out to run, holding place for now.
|
||||
skip(t)
|
||||
@@ -2739,7 +2819,7 @@ func (c *cluster) waitOnPeerCount(n int) {
|
||||
c.t.Fatalf("Expected a cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers()))
|
||||
}
|
||||
|
||||
func (c *cluster) waitOnNewConsumerLeader(account, stream, consumer string) {
|
||||
func (c *cluster) waitOnConsumerLeader(account, stream, consumer string) {
|
||||
c.t.Helper()
|
||||
expires := time.Now().Add(10 * time.Second)
|
||||
for time.Now().Before(expires) {
|
||||
@@ -2762,7 +2842,7 @@ func (c *cluster) consumerLeader(account, stream, consumer string) *server.Serve
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cluster) waitOnNewStreamLeader(account, stream string) {
|
||||
func (c *cluster) waitOnStreamLeader(account, stream string) {
|
||||
c.t.Helper()
|
||||
expires := time.Now().Add(10 * time.Second)
|
||||
for time.Now().Before(expires) {
|
||||
|
||||
Reference in New Issue
Block a user