Check for subject overlaps after check for pre-existing

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-25 19:00:15 -07:00
parent c05ff97d09
commit 5d6fe9e4b0

View File

@@ -3221,27 +3221,10 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
}
cfg := &ccfg
js.mu.RLock()
asa := cc.streams[acc.Name]
numStreams := len(asa)
// Check for subject collisions here.
for _, sa := range asa {
for _, subj := range sa.Config.Subjects {
for _, tsubj := range cfg.Subjects {
if SubjectsCollide(tsubj, subj) {
js.mu.RUnlock()
resp.Error = jsError(fmt.Errorf("subjects overlap with an existing stream"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
}
}
}
js.mu.RUnlock()
// Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa.
jsa.mu.RLock()
asa := cc.streams[acc.Name]
numStreams := len(asa)
exceeded := jsa.limits.MaxStreams > 0 && numStreams >= jsa.limits.MaxStreams
jsa.mu.RUnlock()
@@ -3268,6 +3251,19 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
return
}
// Check for subject collisions here.
for _, sa := range asa {
for _, subj := range sa.Config.Subjects {
for _, tsubj := range cfg.Subjects {
if SubjectsCollide(tsubj, subj) {
resp.Error = jsError(fmt.Errorf("subjects overlap with an existing stream"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
}
}
}
// Raft group selection and placement.
rg := cc.createGroupForStream(ci, cfg)
if rg == nil {