From 716da693e4e6da89dd24b5d17cc474cae54ca49f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 6 Feb 2021 20:11:30 -0800 Subject: [PATCH] Track peers differently, react to removal entries Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 93 +++++++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 35 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e45d8219..3fc1635d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -294,6 +294,9 @@ func (s *Server) JetStreamClusterPeers() []string { peers := cc.meta.Peers() var nodes []string for _, p := range peers { + if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(*nodeInfo).offline { + continue + } nodes = append(nodes, p.ID) } return nodes @@ -470,9 +473,11 @@ func (js *jetStream) setupMetaGroup() error { if bootstrap { s.Noticef("JetStream cluster bootstrapping") // FIXME(dlc) - Make this real. - peers := s.activePeers() + peers := s.ActivePeers() s.Debugf("JetStream cluster initial peers: %+v", peers) - s.bootstrapRaftNode(cfg, peers, false) + if err := s.bootstrapRaftNode(cfg, peers, false); err != nil { + return err + } } else { s.Noticef("JetStream cluster recovering state") } @@ -702,11 +707,16 @@ func (js *jetStream) monitorCluster() { continue } // FIXME(dlc) - Deal with errors. - if hadSnapshot, err := js.applyMetaEntries(ce.Entries, isRecovering); err == nil { + if hadSnapshot, didRemoval, err := js.applyMetaEntries(ce.Entries, isRecovering); err == nil { n.Applied(ce.Index) if hadSnapshot { snapout = false - n.Compact(ce.Index) + if !isRecovering { + n.Compact(ce.Index) + } + } + if didRemoval { + attemptSnapshot() } } if isLeader && !snapout { @@ -933,8 +943,8 @@ func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) { } } -func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, error) { - var didSnap bool +func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, bool, error) { + var didSnap, didRemove bool for _, e := range entries { if e.Type == EntrySnapshot { js.applyMetaSnapshot(e.Data, isRecovering) @@ -948,7 +958,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool sa, err := decodeStreamAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, err + return didSnap, didRemove, err } if isRecovering { js.setStreamAssignmentResponded(sa) @@ -958,17 +968,18 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool sa, err := decodeStreamAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) - return didSnap, err + return didSnap, didRemove, err } if isRecovering { js.setStreamAssignmentResponded(sa) } js.processStreamRemoval(sa) + didRemove = true case assignConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:]) - return didSnap, err + return didSnap, didRemove, err } if isRecovering { js.setConsumerAssignmentResponded(ca) @@ -978,7 +989,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool ca, err := decodeConsumerAssignmentCompressed(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:]) - return didSnap, err + return didSnap, didRemove, err } if isRecovering { js.setConsumerAssignmentResponded(ca) @@ -988,18 +999,19 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:]) - return didSnap, err + return didSnap, didRemove, err } if isRecovering { js.setConsumerAssignmentResponded(ca) } js.processConsumerRemoval(ca) + didRemove = true default: panic("JetStream Cluster Unknown meta entry op type") } } } - return didSnap, nil + return didSnap, didRemove, nil } func (rg *raftGroup) isMember(id string) bool { @@ -1058,7 +1070,7 @@ func (js *jetStream) createRaftGroup(rg *raftGroup) error { stateDir := path.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name) fs, bootstrap, err := newFileStore( FileStoreConfig{StoreDir: stateDir, BlockSize: 32 * 1024 * 1024}, - StreamConfig{Name: rg.Name, Storage: FileStorage}, + StreamConfig{Name: rg.Name, Storage: FileStorage, Replicas: 2}, ) if err != nil { s.Errorf("Error creating filestore: %v", err) @@ -1283,11 +1295,14 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) { } } // Apply our entries. - if hadSnapshot, err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { + if hadSnapshot, didRemove, err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { n.Applied(ce.Index) if hadSnapshot { snapout = false } + if isLeader && didRemove { + attemptSnapshot() + } } else { s.Warnf("Error applying entries to '%s > %s'", sa.Client.Account, sa.Config.Name) } @@ -1314,8 +1329,8 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) { } } -func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isRecovering bool) (bool, error) { - var didSnap bool +func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isRecovering bool) (bool, bool, error) { + var didSnap, didRemove bool for _, e := range ce.Entries { if e.Type == EntrySnapshot { if !isRecovering { @@ -1329,7 +1344,7 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco if peer := string(e.Data); peer == ourID { mset.stop(true, false) } - return false, nil + return false, false, nil } else { buf := e.Data switch entryOp(buf[0]) { @@ -1365,6 +1380,7 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco if err != nil && !isRecovering { s.Debugf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", md.Seq, md.Stream, md.Client.Account, err) } + didRemove = removed js.mu.RLock() isLeader := cc.isStreamLeader(md.Client.Account, md.Stream) @@ -1402,6 +1418,7 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco if err != nil { s.Warnf("JetStream cluster failed to purge stream %q for account %q: %v", sp.Stream, sp.Client.Account, err) } + didRemove = purged > 0 js.mu.RLock() isLeader := js.cluster.isStreamLeader(sp.Client.Account, sp.Stream) @@ -1423,7 +1440,7 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco } } } - return didSnap, nil + return didSnap, didRemove, nil } // Returns the PeerInfo for all replicas of a raft node. This is different than node.Peers() @@ -1432,8 +1449,11 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo { now := time.Now() var replicas []*PeerInfo for _, rp := range node.Peers() { - pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: rp.Current, Active: now.Sub(rp.Last)} - replicas = append(replicas, pi) + if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { + si := sir.(*nodeInfo) + pi := &PeerInfo{Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} + replicas = append(replicas, pi) + } } return replicas } @@ -1961,7 +1981,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { js.mu.Lock() s, cc := js.srv, js.cluster - if s == nil || cc == nil { + if s == nil || cc == nil || cc.meta == nil { // TODO(dlc) - debug at least js.mu.Unlock() return @@ -2597,6 +2617,10 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe cluster = sa.Config.Placement.Cluster } for _, p := range cc.meta.Peers() { + // If it is not in our list it probably shutdown, so don't consider. + if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(*nodeInfo).offline { + continue + } // Make sure they are active and current and not already part of our group. current, lastSeen := p.Current, now.Sub(p.Last) if !current || lastSeen > lostQuorumInterval || sa.Group.isMember(p.ID) { @@ -2624,23 +2648,19 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string) []string { var nodes []string peers := cc.meta.Peers() - s, ourID := cc.s, cc.meta.ID() + s := cc.s - now := time.Now() for _, p := range peers { - // Make sure they are active and current. - current, lastSeen := p.Current, now.Sub(p.Last) - if current && lastSeen > lostQuorumInterval { - current = false + // If we know its offline or it is not in our list it probably shutdown, so don't consider. + if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(*nodeInfo).offline { + continue } - if p.ID == ourID || current { - if cluster != _EMPTY_ { - if s.clusterNameForNode(p.ID) == cluster { - nodes = append(nodes, p.ID) - } - } else { + if cluster != _EMPTY_ { + if s.clusterNameForNode(p.ID) == cluster { nodes = append(nodes, p.ID) } + } else { + nodes = append(nodes, p.ID) } } if len(nodes) < r { @@ -3733,8 +3753,11 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { if current && lastSeen > lostQuorumInterval { current = false } - pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: current, Active: lastSeen} - ci.Replicas = append(ci.Replicas, pi) + if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { + si := sir.(*nodeInfo) + pi := &PeerInfo{Name: si.name, Current: current, Offline: si.offline, Active: lastSeen, Lag: rp.Lag} + ci.Replicas = append(ci.Replicas, pi) + } } } return ci