diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 0b0ed6d7..183658e9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2438,6 +2438,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { ca.err = oca.err } + // Capture the optional state. We will pass it along if we are a member to apply. + // This is only applicable when restoring a stream with consumers. + state := ca.State + ca.State = nil + // Place into our internal map under the stream assignment. // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca @@ -2449,7 +2454,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Check if this is for us.. if isMember { - js.processClusterCreateConsumer(ca) + js.processClusterCreateConsumer(ca, state) } else { // Clear our raft node here. // TODO(dlc) - This might be better if done by leader, not the one who is being removed @@ -2507,8 +2512,8 @@ type consumerAssignmentResult struct { Response *JSApiConsumerCreateResponse `json:"response,omitempty"` } -// processClusterCreateConsumer is when we are a member fo the group and need to create the consumer. -func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { +// processClusterCreateConsumer is when we are a member of the group and need to create the consumer. +func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state *ConsumerState) { if ca == nil { return } @@ -2577,8 +2582,8 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) { } // If we have an initial state set apply that now. - if ca.State != nil && o != nil { - err = o.setStoreState(ca.State) + if state != nil && o != nil { + err = o.setStoreState(state) } if err != nil {