Track peers differently, react to removal entries

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-06 20:11:30 -08:00
parent 1c79d96de8
commit 716da693e4

View File

@@ -294,6 +294,9 @@ func (s *Server) JetStreamClusterPeers() []string {
peers := cc.meta.Peers()
var nodes []string
for _, p := range peers {
if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(*nodeInfo).offline {
continue
}
nodes = append(nodes, p.ID)
}
return nodes
@@ -470,9 +473,11 @@ func (js *jetStream) setupMetaGroup() error {
if bootstrap {
s.Noticef("JetStream cluster bootstrapping")
// FIXME(dlc) - Make this real.
peers := s.activePeers()
peers := s.ActivePeers()
s.Debugf("JetStream cluster initial peers: %+v", peers)
s.bootstrapRaftNode(cfg, peers, false)
if err := s.bootstrapRaftNode(cfg, peers, false); err != nil {
return err
}
} else {
s.Noticef("JetStream cluster recovering state")
}
@@ -702,11 +707,16 @@ func (js *jetStream) monitorCluster() {
continue
}
// FIXME(dlc) - Deal with errors.
if hadSnapshot, err := js.applyMetaEntries(ce.Entries, isRecovering); err == nil {
if hadSnapshot, didRemoval, err := js.applyMetaEntries(ce.Entries, isRecovering); err == nil {
n.Applied(ce.Index)
if hadSnapshot {
snapout = false
n.Compact(ce.Index)
if !isRecovering {
n.Compact(ce.Index)
}
}
if didRemoval {
attemptSnapshot()
}
}
if isLeader && !snapout {
@@ -933,8 +943,8 @@ func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) {
}
}
func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, error) {
var didSnap bool
func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool, bool, error) {
var didSnap, didRemove bool
for _, e := range entries {
if e.Type == EntrySnapshot {
js.applyMetaSnapshot(e.Data, isRecovering)
@@ -948,7 +958,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
sa, err := decodeStreamAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, err
return didSnap, didRemove, err
}
if isRecovering {
js.setStreamAssignmentResponded(sa)
@@ -958,17 +968,18 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
sa, err := decodeStreamAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:])
return didSnap, err
return didSnap, didRemove, err
}
if isRecovering {
js.setStreamAssignmentResponded(sa)
}
js.processStreamRemoval(sa)
didRemove = true
case assignConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:])
return didSnap, err
return didSnap, didRemove, err
}
if isRecovering {
js.setConsumerAssignmentResponded(ca)
@@ -978,7 +989,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:])
return didSnap, err
return didSnap, didRemove, err
}
if isRecovering {
js.setConsumerAssignmentResponded(ca)
@@ -988,18 +999,19 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:])
return didSnap, err
return didSnap, didRemove, err
}
if isRecovering {
js.setConsumerAssignmentResponded(ca)
}
js.processConsumerRemoval(ca)
didRemove = true
default:
panic("JetStream Cluster Unknown meta entry op type")
}
}
}
return didSnap, nil
return didSnap, didRemove, nil
}
func (rg *raftGroup) isMember(id string) bool {
@@ -1058,7 +1070,7 @@ func (js *jetStream) createRaftGroup(rg *raftGroup) error {
stateDir := path.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name)
fs, bootstrap, err := newFileStore(
FileStoreConfig{StoreDir: stateDir, BlockSize: 32 * 1024 * 1024},
StreamConfig{Name: rg.Name, Storage: FileStorage},
StreamConfig{Name: rg.Name, Storage: FileStorage, Replicas: 2},
)
if err != nil {
s.Errorf("Error creating filestore: %v", err)
@@ -1283,11 +1295,14 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) {
}
}
// Apply our entries.
if hadSnapshot, err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
if hadSnapshot, didRemove, err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
n.Applied(ce.Index)
if hadSnapshot {
snapout = false
}
if isLeader && didRemove {
attemptSnapshot()
}
} else {
s.Warnf("Error applying entries to '%s > %s'", sa.Client.Account, sa.Config.Name)
}
@@ -1314,8 +1329,8 @@ func (js *jetStream) monitorStream(mset *Stream, sa *streamAssignment) {
}
}
func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isRecovering bool) (bool, error) {
var didSnap bool
func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isRecovering bool) (bool, bool, error) {
var didSnap, didRemove bool
for _, e := range ce.Entries {
if e.Type == EntrySnapshot {
if !isRecovering {
@@ -1329,7 +1344,7 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco
if peer := string(e.Data); peer == ourID {
mset.stop(true, false)
}
return false, nil
return false, false, nil
} else {
buf := e.Data
switch entryOp(buf[0]) {
@@ -1365,6 +1380,7 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco
if err != nil && !isRecovering {
s.Debugf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", md.Seq, md.Stream, md.Client.Account, err)
}
didRemove = removed
js.mu.RLock()
isLeader := cc.isStreamLeader(md.Client.Account, md.Stream)
@@ -1402,6 +1418,7 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco
if err != nil {
s.Warnf("JetStream cluster failed to purge stream %q for account %q: %v", sp.Stream, sp.Client.Account, err)
}
didRemove = purged > 0
js.mu.RLock()
isLeader := js.cluster.isStreamLeader(sp.Client.Account, sp.Stream)
@@ -1423,7 +1440,7 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry, isReco
}
}
}
return didSnap, nil
return didSnap, didRemove, nil
}
// Returns the PeerInfo for all replicas of a raft node. This is different than node.Peers()
@@ -1432,8 +1449,11 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo {
now := time.Now()
var replicas []*PeerInfo
for _, rp := range node.Peers() {
pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: rp.Current, Active: now.Sub(rp.Last)}
replicas = append(replicas, pi)
if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil {
si := sir.(*nodeInfo)
pi := &PeerInfo{Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag}
replicas = append(replicas, pi)
}
}
return replicas
}
@@ -1961,7 +1981,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
js.mu.Lock()
s, cc := js.srv, js.cluster
if s == nil || cc == nil {
if s == nil || cc == nil || cc.meta == nil {
// TODO(dlc) - debug at least
js.mu.Unlock()
return
@@ -2597,6 +2617,10 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe
cluster = sa.Config.Placement.Cluster
}
for _, p := range cc.meta.Peers() {
// If it is not in our list it probably shutdown, so don't consider.
if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(*nodeInfo).offline {
continue
}
// Make sure they are active and current and not already part of our group.
current, lastSeen := p.Current, now.Sub(p.Last)
if !current || lastSeen > lostQuorumInterval || sa.Group.isMember(p.ID) {
@@ -2624,23 +2648,19 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string) []string {
var nodes []string
peers := cc.meta.Peers()
s, ourID := cc.s, cc.meta.ID()
s := cc.s
now := time.Now()
for _, p := range peers {
// Make sure they are active and current.
current, lastSeen := p.Current, now.Sub(p.Last)
if current && lastSeen > lostQuorumInterval {
current = false
// If we know its offline or it is not in our list it probably shutdown, so don't consider.
if si, ok := s.nodeToInfo.Load(p.ID); !ok || si.(*nodeInfo).offline {
continue
}
if p.ID == ourID || current {
if cluster != _EMPTY_ {
if s.clusterNameForNode(p.ID) == cluster {
nodes = append(nodes, p.ID)
}
} else {
if cluster != _EMPTY_ {
if s.clusterNameForNode(p.ID) == cluster {
nodes = append(nodes, p.ID)
}
} else {
nodes = append(nodes, p.ID)
}
}
if len(nodes) < r {
@@ -3733,8 +3753,11 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
if current && lastSeen > lostQuorumInterval {
current = false
}
pi := &PeerInfo{Name: s.serverNameForNode(rp.ID), Current: current, Active: lastSeen}
ci.Replicas = append(ci.Replicas, pi)
if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil {
si := sir.(*nodeInfo)
pi := &PeerInfo{Name: si.name, Current: current, Offline: si.offline, Active: lastSeen, Lag: rp.Lag}
ci.Replicas = append(ci.Replicas, pi)
}
}
}
return ci