diff --git a/server/const.go b/server/const.go index 4e62f138..c9bb52b8 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.1-RC10" + VERSION = "2.2.1-RC11" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d7f8545f..8176b6d4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1925,6 +1925,11 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { accStreams[stream] = sa cc.streams[acc.Name] = accStreams + // Make sure we respond. + if isMember { + sa.responded = false + } + js.mu.Unlock() // Check if this is for us.. @@ -1947,34 +1952,35 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, sa *streamAssignme return } - js.mu.RLock() + js.mu.Lock() s, rg := js.srv, sa.Group client, subject, reply := sa.Client, sa.Subject, sa.Reply alreadyRunning := rg.node != nil hasResponded := sa.responded - js.mu.RUnlock() + sa.responded = true + js.mu.Unlock() mset, err := acc.lookupStream(sa.Config.Name) if err == nil && mset != nil { + osa := mset.streamAssignment() if !alreadyRunning { s.startGoRoutine(func() { js.monitorStream(mset, sa) }) } mset.setStreamAssignment(sa) if err = mset.update(sa.Config); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) + mset.setStreamAssignment(osa) } } if err != nil { js.mu.Lock() sa.err = err - if rg.node != nil { - rg.node.Delete() - } result := &streamAssignmentResult{ Account: sa.Client.serviceAccount(), Stream: sa.Config.Name, Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}, + Update: true, } result.Response.Error = jsError(err) js.mu.Unlock() @@ -1988,10 +1994,6 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, sa *streamAssignme isLeader := mset.isLeader() mset.mu.RUnlock() - js.mu.Lock() - sa.responded = true - js.mu.Unlock() - if !isLeader || hasResponded { return } @@ -2044,9 +2046,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // Go ahead and create or update the stream. mset, err = acc.lookupStream(sa.Config.Name) if err == nil && mset != nil { + osa := mset.streamAssignment() mset.setStreamAssignment(sa) - if err := mset.update(sa.Config); err != nil { + if err = mset.update(sa.Config); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) + mset.setStreamAssignment(osa) } } else if err == ErrJetStreamStreamNotFound { // Add in the stream here. @@ -2957,6 +2961,7 @@ type streamAssignmentResult struct { Stream string `json:"stream"` Response *JSApiStreamCreateResponse `json:"create_response,omitempty"` Restore *JSApiStreamRestoreResponse `json:"restore_response,omitempty"` + Update bool `json:"is_update,omitempty"` } // Process error results of stream and consumer assignments. @@ -2986,11 +2991,15 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client } else if result.Restore != nil { resp = s.jsonResponse(result.Restore) } - if !sa.responded { + if !sa.responded || result.Update { sa.responded = true js.srv.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, resp) - // TODO(dlc) - Could have mixed results, should track per peer. - // Set sa.err while we are deleting so we will not respond to list/names requests. + } + // Here we will remove this assignment, so this needs to only execute when we are sure + // this is what we want to do. + // TODO(dlc) - Could have mixed results, should track per peer. + // Set sa.err while we are deleting so we will not respond to list/names requests. + if !result.Update && time.Since(sa.Created) < 5*time.Second { sa.err = ErrJetStreamNotAssigned cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } @@ -3021,7 +3030,7 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie // Check if this failed. // TODO(dlc) - Could have mixed results, should track per peer. if result.Response.Error != nil { - // So while we are delting we will not respond to list/names requests. + // So while we are deleting we will not respond to list/names requests. ca.err = ErrJetStreamNotAssigned cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index e9d76cc2..92d4abb8 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -476,6 +476,48 @@ func TestJetStreamClusterStreamUpdateSubjects(t *testing.T) { } } +func TestJetStreamClusterBadStreamUpdate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 3, + } + + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + msg, toSend := []byte("Keep Me"), 50 + for i := 0; i < toSend; i++ { + if _, err := js.Publish("foo", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + // Make sure a bad update will not remove our stream. + cfg.Subjects = []string{"foo..bar"} + if _, err := js.UpdateStream(cfg); err == nil || err == nats.ErrTimeout { + t.Fatalf("Expected error but got none or timeout") + } + + // Make sure we did not delete our original stream. + si, err := js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(si.Config.Subjects, []string{"foo", "bar"}) { + t.Fatalf("Expected subjects to be original ones, got %+v", si.Config.Subjects) + } +} + func TestJetStreamClusterConsumerRedeliveredInfo(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() diff --git a/server/stream.go b/server/stream.go index 95068771..cb25bdda 100644 --- a/server/stream.go +++ b/server/stream.go @@ -767,8 +767,8 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) { // Config returns the stream's configuration. func (mset *stream) config() StreamConfig { - mset.mu.Lock() - defer mset.mu.Unlock() + mset.mu.RLock() + defer mset.mu.RUnlock() return mset.cfg }