From 2ac05785c3e17bc2dfd7aade712365c10b381651 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 21 Apr 2021 18:50:38 -0700 Subject: [PATCH] Do not persist or snapshot consumer state after a restore. This can lead to a data race and is not needed after being applied. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 0b0ed6d7..183658e9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2438,6 +2438,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { ca.err = oca.err } + // Capture the optional state. We will pass it along if we are a member to apply. + // This is only applicable when restoring a stream with consumers. + state := ca.State + ca.State = nil + // Place into our internal map under the stream assignment. // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca @@ -2449,7 +2454,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Check if this is for us.. if isMember { - js.processClusterCreateConsumer(ca) + js.processClusterCreateConsumer(ca, state) } else { // Clear our raft node here. // TODO(dlc) - This might be better if done by leader, not the one who is being removed @@ -2507,8 +2512,8 @@ type consumerAssignmentResult struct { Response *JSApiConsumerCreateResponse `json:"response,omitempty"` } -// processClusterCreateConsumer is when we are a member fo the group and need to create the consumer. -func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { +// processClusterCreateConsumer is when we are a member of the group and need to create the consumer. +func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state *ConsumerState) { if ca == nil { return } @@ -2577,8 +2582,8 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { } // If we have an initial state set apply that now. - if ca.State != nil && o != nil { - err = o.setStoreState(ca.State) + if state != nil && o != nil { + err = o.setStoreState(state) } if err != nil {