mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
fix consumer subject validation on recovery (#3389)
This fixes an issue introduced in #3080 The consumer filter subject check was skipped on recovery. The intent was to bypass the upstream stream subjects. But it also filtered the downstream stream subject. This became a problem when the downstream was itself an upstream. Then during recover, the stream subject was not checked, which lead to delivery of filtered messages that should never have been delivered. Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -446,9 +446,14 @@ func checkConsumerCfg(
|
||||
}
|
||||
|
||||
// As best we can make sure the filtered subject is valid.
|
||||
if config.FilterSubject != _EMPTY_ && !isRecovering {
|
||||
subjects, hasExt := allSubjects(cfg, acc)
|
||||
if !validFilteredSubject(config.FilterSubject, subjects) && !hasExt {
|
||||
if config.FilterSubject != _EMPTY_ {
|
||||
subjects := copyStrings(cfg.Subjects)
|
||||
// explicitly skip validFilteredSubject when recovering
|
||||
hasExt := isRecovering
|
||||
if !isRecovering {
|
||||
subjects, hasExt = gatherSourceMirrorSubjects(subjects, cfg, acc)
|
||||
}
|
||||
if !hasExt && !validFilteredSubject(config.FilterSubject, subjects) {
|
||||
return NewJSConsumerFilterNotSubsetError()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5477,7 +5477,7 @@ func TestJetStreamClusterSourcesUpdateOriginError(t *testing.T) {
|
||||
} else if tsi.State.Msgs != msgsTest {
|
||||
return fmt.Errorf("received %d msgs from TEST, expected %d", tsi.State.Msgs, msgsTest)
|
||||
} else if msi.State.Msgs != msgsM {
|
||||
return fmt.Errorf("received %d msgs from TEST, expected %d", msi.State.Msgs, msgsM)
|
||||
return fmt.Errorf("received %d msgs from M, expected %d", msi.State.Msgs, msgsM)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -1612,8 +1612,7 @@ func (mset *stream) sourcesInfo() (sis []*StreamSourceInfo) {
|
||||
return sis
|
||||
}
|
||||
|
||||
func allSubjects(cfg *StreamConfig, acc *Account) ([]string, bool) {
|
||||
subjects := copyStrings(cfg.Subjects)
|
||||
func gatherSourceMirrorSubjects(subjects []string, cfg *StreamConfig, acc *Account) ([]string, bool) {
|
||||
var hasExt bool
|
||||
var seen map[string]bool
|
||||
|
||||
|
||||
Reference in New Issue
Block a user