Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-01-27 13:41:48 -08:00
3 changed files with 68 additions and 8 deletions

View File

@@ -6631,9 +6631,12 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
mset, err = nl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
if state := mset.state(); state.FirstSeq != 2001 || state.LastSeq != 3000 {
t.Fatalf("Incorrect state: %+v", state)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
if state := mset.state(); state.FirstSeq != 2001 || state.LastSeq != 3000 {
return fmt.Errorf("Incorrect state: %+v", state)
}
return nil
})
// Make sure we only sent 1 sync catchup msg.
nmsgs, _, _ := sub.Pending()

View File

@@ -2576,3 +2576,47 @@ func TestJetStreamClusterCurrentVsHealth(t *testing.T) {
}
}
}
// Several users and customers use this setup, but many times across leafnodes.
// This should be allowed in same account since we are really protecting against
// multiple pub acks with cycle detection.
func TestJetStreamClusterActiveActiveSourcedStreams(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "A",
Subjects: []string{"A.>"},
})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "B",
Subjects: []string{"B.>"},
})
require_NoError(t, err)
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "A",
Subjects: []string{"A.>"},
Sources: []*nats.StreamSource{{
Name: "B",
FilterSubject: "B.>",
}},
})
require_NoError(t, err)
// Before this would fail.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "B",
Subjects: []string{"B.>"},
Sources: []*nats.StreamSource{{
Name: "A",
FilterSubject: "A.>",
}},
})
require_NoError(t, err)
}

View File

@@ -1180,20 +1180,33 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
// cycle check for source cycle
toVisit := []*StreamConfig{&cfg}
visited := make(map[string]struct{})
overlaps := func(subjects []string, filter string) bool {
if filter == _EMPTY_ {
return true
}
for _, subject := range subjects {
if SubjectsCollide(subject, filter) {
return true
}
}
return false
}
for len(toVisit) > 0 {
cfg := toVisit[0]
toVisit = toVisit[1:]
visited[cfg.Name] = struct{}{}
for _, src := range cfg.Sources {
if src.External != nil {
// TODO (mh) look up service imports and see if src.External.ApiPrefix returns an account
// this will be much easier without the delivery subject
continue
}
// We can detect a cycle between streams, but let's double check that the
// subjects actually form a cycle.
if _, ok := visited[src.Name]; ok {
return StreamConfig{}, NewJSStreamInvalidConfigError(errors.New("detected cycle"))
}
if exists, cfg := getStream(src.Name); exists {
if overlaps(cfg.Subjects, src.FilterSubject) {
return StreamConfig{}, NewJSStreamInvalidConfigError(errors.New("detected cycle"))
}
} else if exists, cfg := getStream(src.Name); exists {
toVisit = append(toVisit, &cfg)
}
}