Make sure to not delete streams on bad updates.

If an update was asssigned but failed at the stream group server we would send back the result which would always delete the stream.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-29 07:35:30 -07:00
parent 327d913ae1
commit 5a48369b4b
4 changed files with 68 additions and 17 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.1-RC10"
VERSION = "2.2.1-RC11"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -1925,6 +1925,11 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
accStreams[stream] = sa
cc.streams[acc.Name] = accStreams
// Make sure we respond.
if isMember {
sa.responded = false
}
js.mu.Unlock()
// Check if this is for us..
@@ -1947,34 +1952,35 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, sa *streamAssignme
return
}
js.mu.RLock()
js.mu.Lock()
s, rg := js.srv, sa.Group
client, subject, reply := sa.Client, sa.Subject, sa.Reply
alreadyRunning := rg.node != nil
hasResponded := sa.responded
js.mu.RUnlock()
sa.responded = true
js.mu.Unlock()
mset, err := acc.lookupStream(sa.Config.Name)
if err == nil && mset != nil {
osa := mset.streamAssignment()
if !alreadyRunning {
s.startGoRoutine(func() { js.monitorStream(mset, sa) })
}
mset.setStreamAssignment(sa)
if err = mset.update(sa.Config); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
mset.setStreamAssignment(osa)
}
}
if err != nil {
js.mu.Lock()
sa.err = err
if rg.node != nil {
rg.node.Delete()
}
result := &streamAssignmentResult{
Account: sa.Client.serviceAccount(),
Stream: sa.Config.Name,
Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}},
Update: true,
}
result.Response.Error = jsError(err)
js.mu.Unlock()
@@ -1988,10 +1994,6 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, sa *streamAssignme
isLeader := mset.isLeader()
mset.mu.RUnlock()
js.mu.Lock()
sa.responded = true
js.mu.Unlock()
if !isLeader || hasResponded {
return
}
@@ -2044,9 +2046,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
// Go ahead and create or update the stream.
mset, err = acc.lookupStream(sa.Config.Name)
if err == nil && mset != nil {
osa := mset.streamAssignment()
mset.setStreamAssignment(sa)
if err := mset.update(sa.Config); err != nil {
if err = mset.update(sa.Config); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
mset.setStreamAssignment(osa)
}
} else if err == ErrJetStreamStreamNotFound {
// Add in the stream here.
@@ -2957,6 +2961,7 @@ type streamAssignmentResult struct {
Stream string `json:"stream"`
Response *JSApiStreamCreateResponse `json:"create_response,omitempty"`
Restore *JSApiStreamRestoreResponse `json:"restore_response,omitempty"`
Update bool `json:"is_update,omitempty"`
}
// Process error results of stream and consumer assignments.
@@ -2986,11 +2991,15 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client
} else if result.Restore != nil {
resp = s.jsonResponse(result.Restore)
}
if !sa.responded {
if !sa.responded || result.Update {
sa.responded = true
js.srv.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, resp)
// TODO(dlc) - Could have mixed results, should track per peer.
// Set sa.err while we are deleting so we will not respond to list/names requests.
}
// Here we will remove this assignment, so this needs to only execute when we are sure
// this is what we want to do.
// TODO(dlc) - Could have mixed results, should track per peer.
// Set sa.err while we are deleting so we will not respond to list/names requests.
if !result.Update && time.Since(sa.Created) < 5*time.Second {
sa.err = ErrJetStreamNotAssigned
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
}
@@ -3021,7 +3030,7 @@ func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *clie
// Check if this failed.
// TODO(dlc) - Could have mixed results, should track per peer.
if result.Response.Error != nil {
// So while we are delting we will not respond to list/names requests.
// So while we are deleting we will not respond to list/names requests.
ca.err = ErrJetStreamNotAssigned
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
}

View File

@@ -476,6 +476,48 @@ func TestJetStreamClusterStreamUpdateSubjects(t *testing.T) {
}
}
func TestJetStreamClusterBadStreamUpdate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, toSend := []byte("Keep Me"), 50
for i := 0; i < toSend; i++ {
if _, err := js.Publish("foo", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
// Make sure a bad update will not remove our stream.
cfg.Subjects = []string{"foo..bar"}
if _, err := js.UpdateStream(cfg); err == nil || err == nats.ErrTimeout {
t.Fatalf("Expected error but got none or timeout")
}
// Make sure we did not delete our original stream.
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(si.Config.Subjects, []string{"foo", "bar"}) {
t.Fatalf("Expected subjects to be original ones, got %+v", si.Config.Subjects)
}
}
func TestJetStreamClusterConsumerRedeliveredInfo(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

View File

@@ -767,8 +767,8 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
// Config returns the stream's configuration.
func (mset *stream) config() StreamConfig {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.cfg
}