From 01fa89a0b45baec33083cf570758b9bc680321e4 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 18 Feb 2023 08:09:12 -0800 Subject: [PATCH] Fix for deleting consumers on restarts and non-fatal update errors. If there was a spurious error on restart, or possibly on an update, we could delete a consumer which was the incorrect behavior. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 78 ++++++++++++++++++------------ server/jetstream_cluster_3_test.go | 54 +++++++++++++++++++++ 2 files changed, 100 insertions(+), 32 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9603b092..66120929 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3466,15 +3466,17 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { if err != nil { ll := fmt.Sprintf("Account [%s] lookup for consumer create failed: %v", accName, err) if isMember { - // If we can not lookup the account and we are a member, send this result back to the metacontroller leader. - result := &consumerAssignmentResult{ - Account: accName, - Stream: stream, - Consumer: consumerName, - Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + if !js.isMetaRecovering() { + // If we can not lookup the account and we are a member, send this result back to the metacontroller leader. + result := &consumerAssignmentResult{ + Account: accName, + Stream: stream, + Consumer: consumerName, + Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + } + result.Response.Error = NewJSNoAccountError() + s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) } - result.Response.Error = NewJSNoAccountError() - s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) s.Warnf(ll) } else { s.Debugf(ll) @@ -3598,18 +3600,20 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // Go ahead and create or update the consumer. mset, err := acc.lookupStream(ca.Stream) if err != nil { - js.mu.Lock() - s.Debugf("Consumer create failed, could not locate stream '%s > %s'", ca.Client.serviceAccount(), ca.Stream) - ca.err = NewJSStreamNotFoundError() - result := &consumerAssignmentResult{ - Account: ca.Client.serviceAccount(), - Stream: ca.Stream, - Consumer: ca.Name, - Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + if !js.isMetaRecovering() { + js.mu.Lock() + s.Warnf("Consumer create failed, could not locate stream '%s > %s > %s'", ca.Client.serviceAccount(), ca.Stream, ca.Name) + ca.err = NewJSStreamNotFoundError() + result := &consumerAssignmentResult{ + Account: ca.Client.serviceAccount(), + Stream: ca.Stream, + Consumer: ca.Name, + Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + } + result.Response.Error = NewJSStreamNotFoundError() + s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) + js.mu.Unlock() } - result.Response.Error = NewJSStreamNotFoundError() - s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) - js.mu.Unlock() return } @@ -3639,20 +3643,26 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false) didCreate = true } else { - if err := o.updateConfig(ca.Config); err != nil { - // This is essentially an update that has failed. - js.mu.Lock() - result := &consumerAssignmentResult{ - Account: ca.Client.serviceAccount(), - Stream: ca.Stream, - Consumer: ca.Name, - Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + // Call into update, ignore consumer exists error here since this means an old deliver subject is bound + // which can happen on restart etc. + if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() { + // This is essentially an update that has failed. Respond back to metaleader if we are not recovering. + js.mu.RLock() + if !js.metaRecovering { + result := &consumerAssignmentResult{ + Account: ca.Client.serviceAccount(), + Stream: ca.Stream, + Consumer: ca.Name, + Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, + } + result.Response.Error = NewJSConsumerNameExistError() + s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) } - result.Response.Error = NewJSConsumerNameExistError() - s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) - js.mu.Unlock() + s.Warnf("Consumer create failed during update for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err) + js.mu.RUnlock() return } + // Check if we already had a consumer assignment and its still pending. cca, oca := ca, o.consumerAssignment() o.mu.RLock() @@ -3715,7 +3725,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } var result *consumerAssignmentResult - if !hasResponded { + if !hasResponded && !js.metaRecovering { result = &consumerAssignmentResult{ Account: ca.Client.serviceAccount(), Stream: ca.Stream, @@ -4606,12 +4616,16 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded { js.srv.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(result.Response)) ca.responded = true + // Check if this failed. // TODO(dlc) - Could have mixed results, should track per peer. - if result.Response.Error != nil { + // Make sure this is recent response, do not delete existing consumers. + if result.Response.Error != nil && result.Response.Error != NewJSConsumerNameExistError() && time.Since(ca.Created) < 2*time.Second { // So while we are deleting we will not respond to list/names requests. ca.err = NewJSClusterNotAssignedError() cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) + s.Warnf("Proposing to delete consumer `%s > %s > %s' due to assignment response error: %v", + result.Account, result.Stream, result.Consumer, result.Response.Error) } } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 0e5dfb5d..fbcc46fc 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2621,3 +2621,57 @@ func TestJetStreamClusterActiveActiveSourcedStreams(t *testing.T) { }) require_NoError(t, err) } + +func TestJetStreamClusterUpdateConsumerShouldNotForceDeleteOnRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R7S", 7) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + }) + require_NoError(t, err) + + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "D", + DeliverSubject: "_no_bind_", + }) + require_NoError(t, err) + + // Shutdown a consumer follower. + nc.Close() + s := c.serverByName(ci.Cluster.Replicas[0].Name) + s.Shutdown() + + c.waitOnLeader() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Change delivery subject. + _, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{ + Durable: "D", + DeliverSubject: "_d_", + }) + require_NoError(t, err) + + // Create interest in new and old deliver subject. + _, err = nc.SubscribeSync("_d_") + require_NoError(t, err) + _, err = nc.SubscribeSync("_no_bind_") + require_NoError(t, err) + nc.Flush() + + c.restartServer(s) + c.waitOnAllCurrent() + + // Wait on bad error that would cleanup consumer. + time.Sleep(time.Second) + + _, err = js.ConsumerInfo("TEST", "D") + require_NoError(t, err) +}