From 5a48369b4ba36a5d5d993e55105ac017f71f5cd1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 29 Mar 2021 07:35:30 -0700 Subject: [PATCH] Make sure to not delete streams on bad updates. If an update was asssigned but failed at the stream group server we would send back the result which would always delete the stream. Signed-off-by: Derek Collison --- server/const.go | 2 +- server/jetstream_cluster.go | 37 +++++++++++++++++----------- server/jetstream_cluster_test.go | 42 ++++++++++++++++++++++++++++++++ server/stream.go | 4 +-- 4 files changed, 68 insertions(+), 17 deletions(-) diff --git a/server/const.go b/server/const.go index 4e62f138..c9bb52b8 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.1-RC10" + VERSION = "2.2.1-RC11" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d7f8545f..8176b6d4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1925,6 +1925,11 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { accStreams[stream] = sa cc.streams[acc.Name] = accStreams + // Make sure we respond. + if isMember { + sa.responded = false + } + js.mu.Unlock() // Check if this is for us.. @@ -1947,34 +1952,35 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, sa *streamAssignme return } - js.mu.RLock() + js.mu.Lock() s, rg := js.srv, sa.Group client, subject, reply := sa.Client, sa.Subject, sa.Reply alreadyRunning := rg.node != nil hasResponded := sa.responded - js.mu.RUnlock() + sa.responded = true + js.mu.Unlock() mset, err := acc.lookupStream(sa.Config.Name) if err == nil && mset != nil { + osa := mset.streamAssignment() if !alreadyRunning { s.startGoRoutine(func() { js.monitorStream(mset, sa) }) } mset.setStreamAssignment(sa) if err = mset.update(sa.Config); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) + mset.setStreamAssignment(osa) } } if err != nil { js.mu.Lock() sa.err = err - if rg.node != nil { - rg.node.Delete() - } result := &streamAssignmentResult{ Account: sa.Client.serviceAccount(), Stream: sa.Config.Name, Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}, + Update: true, } result.Response.Error = jsError(err) js.mu.Unlock() @@ -1988,10 +1994,6 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, sa *streamAssignme isLeader := mset.isLeader() mset.mu.RUnlock() - js.mu.Lock() - sa.responded = true - js.mu.Unlock() - if !isLeader || hasResponded { return } @@ -2044,9 +2046,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // Go ahead and create or update the stream. mset, err = acc.lookupStream(sa.Config.Name) if err == nil && mset != nil { + osa := mset.streamAssignment() mset.setStreamAssignment(sa) - if err := mset.update(sa.Config); err != nil { + if err = mset.update(sa.Config); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) + mset.setStreamAssignment(osa) } } else if err == ErrJetStreamStreamNotFound { // Add in the stream here. @@ -2957,6 +2961,7 @@ type streamAssignmentResult struct { Stream string `json:"stream"` Response *JSApiStreamCreateResponse `json:"create_response,omitempty"` Restore *JSApiStreamRestoreResponse `json:"restore_response,omitempty"` + Update bool `json:"is_update,omitempty"` } // Process error results of stream and consumer assignments. @@ -2986,11 +2991,15 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client } else if result.Restore != nil { resp = s.jsonResponse(result.Restore) } - if !sa.responded { + if !sa.responded || result.Update { sa.responded = true js.srv.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, resp) - // TODO(dlc) - Could have mixed results, should track per peer. - // Set sa.err while we are deleting so we will not respond to list/names requests. + } + // Here we will remove this assignment, so this needs to only execute when we are sure + // this is what we want to do. + // TODO(dlc) - Could have mixed results, should track per peer. + // Set sa.err while we are deleting so we will not respond to list/names requests. + if !result.Update && time.Since(sa.Created) < 5*time.Second { sa.err = ErrJetStreamNotAssigned cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } @@ -3021,7 +3030,7 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie // Check if this failed. // TODO(dlc) - Could have mixed results, should track per peer. if result.Response.Error != nil { - // So while we are delting we will not respond to list/names requests. + // So while we are deleting we will not respond to list/names requests. ca.err = ErrJetStreamNotAssigned cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index e9d76cc2..92d4abb8 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -476,6 +476,48 @@ func TestJetStreamClusterStreamUpdateSubjects(t *testing.T) { } } +func TestJetStreamClusterBadStreamUpdate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + } + + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + msg, toSend := []byte("Keep Me"), 50 + for i := 0; i < toSend; i++ { + if _, err := js.Publish("foo", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + // Make sure a bad update will not remove our stream. + cfg.Subjects = []string{"foo..bar"} + if _, err := js.UpdateStream(cfg); err == nil || err == nats.ErrTimeout { + t.Fatalf("Expected error but got none or timeout") + } + + // Make sure we did not delete our original stream. + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(si.Config.Subjects, []string{"foo", "bar"}) { + t.Fatalf("Expected subjects to be original ones, got %+v", si.Config.Subjects) + } +} + func TestJetStreamClusterConsumerRedeliveredInfo(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/stream.go b/server/stream.go index 95068771..cb25bdda 100644 --- a/server/stream.go +++ b/server/stream.go @@ -767,8 +767,8 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) { // Config returns the stream's configuration. func (mset *stream) config() StreamConfig { - mset.mu.Lock() - defer mset.mu.Unlock() + mset.mu.RLock() + defer mset.mu.RUnlock() return mset.cfg }