Do not persist or snapshot consumer state after a restore.

This can lead to a data race and is not needed after being applied.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-04-21 18:50:38 -07:00
parent eb3af67031
commit 2ac05785c3

View File

@@ -2438,6 +2438,11 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
ca.err = oca.err
}
// Capture the optional state. We will pass it along if we are a member to apply.
// This is only applicable when restoring a stream with consumers.
state := ca.State
ca.State = nil
// Place into our internal map under the stream assignment.
// Ok to replace an existing one, we check on process call below.
sa.consumers[ca.Name] = ca
@@ -2449,7 +2454,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// Check if this is for us..
if isMember {
js.processClusterCreateConsumer(ca)
js.processClusterCreateConsumer(ca, state)
} else {
// Clear our raft node here.
// TODO(dlc) - This might be better if done by leader, not the one who is being removed
@@ -2507,8 +2512,8 @@ type consumerAssignmentResult struct {
Response *JSApiConsumerCreateResponse `json:"response,omitempty"`
}
// processClusterCreateConsumer is when we are a member fo the group and need to create the consumer.
func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
// processClusterCreateConsumer is when we are a member of the group and need to create the consumer.
func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state *ConsumerState) {
if ca == nil {
return
}
@@ -2577,8 +2582,8 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment) {
}
// If we have an initial state set apply that now.
if ca.State != nil && o != nil {
err = o.setStoreState(ca.State)
if state != nil && o != nil {
err = o.setStoreState(state)
}
if err != nil {