[FIXED] JetStream: Update of an R1 consumer would not get a response

The update was accepted but the server would not respond to the
client/CLI.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-10-24 17:24:12 -06:00
parent a4dfc5b4bc
commit 7ca85e0e80
2 changed files with 11 additions and 3 deletions

View File

@@ -3486,8 +3486,8 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}
// If we look like we are scaling up, let's send our current state to the group.
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && leader
// Check if this is an update.
isConfigUpdate = reflect.DeepEqual(oca.Config, ca.Config)
// Signal that this is an update
isConfigUpdate = true
}
n := rg.node
js.mu.Unlock()
@@ -3575,7 +3575,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// Single replica consumer, process manually here.
js.mu.Lock()
// Force response in case we think this is an update.
if isConfigUpdate {
if !js.metaRecovering && isConfigUpdate {
ca.responded = false
}
js.mu.Unlock()

View File

@@ -16868,12 +16868,20 @@ func TestJetStreamConsumerStreamUpdate(t *testing.T) {
_, err = js.AddStream(&nats.StreamConfig{Name: "foo", Duplicates: 1 * time.Minute, Replicas: replica})
defer js.DeleteStream("foo")
require_NoError(t, err)
// Update with no change
_, err = js.UpdateStream(&nats.StreamConfig{Name: "foo", Duplicates: 1 * time.Minute, Replicas: replica})
require_NoError(t, err)
// Update with change
_, err = js.UpdateStream(&nats.StreamConfig{Description: "stream", Name: "foo", Duplicates: 1 * time.Minute, Replicas: replica})
require_NoError(t, err)
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dur1", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
// Update with no change
_, err = js.UpdateConsumer("foo", &nats.ConsumerConfig{Durable: "dur1", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
// Update with change
_, err = js.UpdateConsumer("foo", &nats.ConsumerConfig{Description: "consumer", Durable: "dur1", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)
}
t.Run("clustered", func(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, `