mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
When removing a stream peer with an attached durable consumer, the consumer could become inconsistent.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -868,12 +868,11 @@ func (o *consumer) setLeader(isLeader bool) {
|
||||
} else {
|
||||
// Shutdown the go routines and the subscriptions.
|
||||
o.mu.Lock()
|
||||
// ok if they are nil, we protect inside unsubscribe()
|
||||
o.unsubscribe(o.ackSub)
|
||||
o.unsubscribe(o.reqSub)
|
||||
o.unsubscribe(o.fcSub)
|
||||
o.ackSub = nil
|
||||
o.reqSub = nil
|
||||
o.fcSub = nil
|
||||
o.ackSub, o.reqSub, o.fcSub = nil, nil, nil
|
||||
if o.infoSub != nil {
|
||||
o.srv.sysUnsubscribe(o.infoSub)
|
||||
o.infoSub = nil
|
||||
@@ -3528,8 +3527,15 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
|
||||
}
|
||||
o.closed = true
|
||||
|
||||
if dflag && advisory && o.isLeader() {
|
||||
o.sendDeleteAdvisoryLocked()
|
||||
// Check if we are the leader and are being deleted.
|
||||
if dflag && o.isLeader() {
|
||||
// If we are clustered and node leader (probable from above), stepdown.
|
||||
if node := o.node; node != nil && node.Leader() {
|
||||
node.StepDown()
|
||||
}
|
||||
if advisory {
|
||||
o.sendDeleteAdvisoryLocked()
|
||||
}
|
||||
}
|
||||
|
||||
if o.qch != nil {
|
||||
|
||||
@@ -1102,7 +1102,7 @@ func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) {
|
||||
}
|
||||
}
|
||||
|
||||
// Just copied over and changes out the group so it can be encoded.
|
||||
// Just copies over and changes out the group so it can be encoded.
|
||||
// Lock should be held.
|
||||
func (sa *streamAssignment) copyGroup() *streamAssignment {
|
||||
csa, cg := *sa, *sa.Group
|
||||
@@ -1111,6 +1111,15 @@ func (sa *streamAssignment) copyGroup() *streamAssignment {
|
||||
return &csa
|
||||
}
|
||||
|
||||
// Just copies over and changes out the group so it can be encoded.
|
||||
// Lock should be held.
|
||||
func (ca *consumerAssignment) copyGroup() *consumerAssignment {
|
||||
cca, cg := *ca, *ca.Group
|
||||
cca.Group = &cg
|
||||
cca.Group.Peers = copyStrings(ca.Group.Peers)
|
||||
return &cca
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (sa *streamAssignment) missingPeers() bool {
|
||||
return len(sa.Group.Peers) < sa.Config.Replicas
|
||||
@@ -1235,9 +1244,9 @@ func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer strin
|
||||
for _, ca := range sa.consumers {
|
||||
// Ephemerals are R=1, so only auto-remap durables, or R>1.
|
||||
if ca.Config.Durable != _EMPTY_ {
|
||||
cca := *ca
|
||||
cca.Group.Peers = rg.Peers
|
||||
cc.meta.Propose(encodeAddConsumerAssignment(&cca))
|
||||
cca := ca.copyGroup()
|
||||
cca.Group.Peers, cca.Group.Preferred = rg.Peers, _EMPTY_
|
||||
cc.meta.Propose(encodeAddConsumerAssignment(cca))
|
||||
} else if ca.Group.isMember(peer) {
|
||||
// These are ephemerals. Check to see if we deleted this peer.
|
||||
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
|
||||
@@ -1297,7 +1306,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
|
||||
case assignConsumerOp:
|
||||
ca, err := decodeConsumerAssignment(buf[1:])
|
||||
if err != nil {
|
||||
js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:])
|
||||
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
|
||||
return didSnap, didRemove, err
|
||||
}
|
||||
if isRecovering {
|
||||
@@ -1307,7 +1316,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
|
||||
case assignCompressedConsumerOp:
|
||||
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
|
||||
if err != nil {
|
||||
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:])
|
||||
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:])
|
||||
return didSnap, didRemove, err
|
||||
}
|
||||
if isRecovering {
|
||||
@@ -1317,7 +1326,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
|
||||
case removeConsumerOp:
|
||||
ca, err := decodeConsumerAssignment(buf[1:])
|
||||
if err != nil {
|
||||
js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:])
|
||||
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
|
||||
return didSnap, didRemove, err
|
||||
}
|
||||
if isRecovering {
|
||||
@@ -2194,6 +2203,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
|
||||
// Check if we have a raft node running, meaning we are no longer part of the group but were.
|
||||
js.mu.Lock()
|
||||
if node := sa.Group.node; node != nil {
|
||||
if node.Leader() {
|
||||
node.StepDown()
|
||||
}
|
||||
node.ProposeRemovePeer(ourID)
|
||||
didRemove = true
|
||||
}
|
||||
@@ -2687,6 +2699,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
|
||||
sa.consumers = make(map[string]*consumerAssignment)
|
||||
} else if oca := sa.consumers[ca.Name]; oca != nil {
|
||||
wasExisting = true
|
||||
// Copy over private existing state from former SA.
|
||||
ca.Group.node = oca.Group.node
|
||||
ca.responded = oca.responded
|
||||
ca.err = oca.err
|
||||
}
|
||||
|
||||
// Capture the optional state. We will pass it along if we are a member to apply.
|
||||
@@ -2706,6 +2722,9 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
|
||||
// Check if we have a raft node running, meaning we are no longer part of the group but were.
|
||||
js.mu.Lock()
|
||||
if node := ca.Group.node; node != nil {
|
||||
if node.Leader() {
|
||||
node.StepDown()
|
||||
}
|
||||
node.ProposeRemovePeer(ourID)
|
||||
}
|
||||
ca.Group.node = nil
|
||||
@@ -3076,6 +3095,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
|
||||
// Track if we are leader.
|
||||
var isLeader bool
|
||||
recovering := true
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -3088,6 +3108,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
for _, cei := range ces {
|
||||
// No special processing needed for when we are caught up on restart.
|
||||
if cei == nil {
|
||||
recovering = false
|
||||
if n.NeedSnapshot() {
|
||||
doSnapshot()
|
||||
}
|
||||
@@ -3106,7 +3127,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
}
|
||||
aq.recycle(&ces)
|
||||
case isLeader = <-lch:
|
||||
if !isLeader && n.GroupLeader() != noLeader {
|
||||
if recovering && !isLeader {
|
||||
js.setConsumerAssignmentRecovering(ca)
|
||||
}
|
||||
js.processConsumerLeaderChange(o, isLeader)
|
||||
|
||||
@@ -4110,11 +4110,10 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) {
|
||||
for _, p := range si.Cluster.Replicas {
|
||||
peers = append(peers, p.Name)
|
||||
}
|
||||
// Pick a truly random server to remove.
|
||||
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
|
||||
toRemove := peers[0]
|
||||
if cl := c.leader(); toRemove == cl.Name() {
|
||||
toRemove = peers[1]
|
||||
}
|
||||
|
||||
// First test bad peer.
|
||||
req := &JSApiStreamRemovePeerRequest{Peer: "NOT VALID"}
|
||||
jsreq, err := json.Marshal(req)
|
||||
@@ -4140,6 +4139,7 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
resp, err = nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "TEST"), jsreq, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
@@ -4151,6 +4151,8 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) {
|
||||
t.Fatalf("Unexpected error: %+v", rpResp.Error)
|
||||
}
|
||||
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
|
||||
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user