Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-06-05 14:13:18 -07:00
11 changed files with 140 additions and 21 deletions

View File

@@ -3108,7 +3108,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
accStreams = make(map[string]*streamAssignment)
} else if osa := accStreams[stream]; osa != nil && osa != sa {
// Copy over private existing state from former SA.
sa.Group.node = osa.Group.node
if sa.Group != nil {
sa.Group.node = osa.Group.node
}
sa.consumers = osa.consumers
sa.responded = osa.responded
sa.err = osa.err
@@ -3199,7 +3201,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
}
// Copy over private existing state from former SA.
sa.Group.node = osa.Group.node
if sa.Group != nil {
sa.Group.node = osa.Group.node
}
sa.consumers = osa.consumers
sa.err = osa.err
@@ -3217,7 +3221,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
sa.responded = false
} else {
// Make sure to clean up any old node in case this stream moves back here.
sa.Group.node = nil
if sa.Group != nil {
sa.Group.node = nil
}
}
js.mu.Unlock()
@@ -3790,7 +3796,9 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
} else if oca := sa.consumers[ca.Name]; oca != nil {
wasExisting = true
// Copy over private existing state from former SA.
ca.Group.node = oca.Group.node
if ca.Group != nil {
ca.Group.node = oca.Group.node
}
ca.responded = oca.responded
ca.err = oca.err
}
@@ -3905,8 +3913,12 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
var needDelete bool
if accStreams := cc.streams[ca.Client.serviceAccount()]; accStreams != nil {
if sa := accStreams[ca.Stream]; sa != nil && sa.consumers != nil && sa.consumers[ca.Name] != nil {
needDelete = true
delete(sa.consumers, ca.Name)
oca := sa.consumers[ca.Name]
// Make sure this removal is for what we have, otherwise ignore.
if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name {
needDelete = true
delete(sa.consumers, ca.Name)
}
}
}
js.mu.Unlock()