diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 71e36125..231feea6 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6631,9 +6631,12 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) { mset, err = nl.GlobalAccount().lookupStream("TEST") require_NoError(t, err) - if state := mset.state(); state.FirstSeq != 2001 || state.LastSeq != 3000 { - t.Fatalf("Incorrect state: %+v", state) - } + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if state := mset.state(); state.FirstSeq != 2001 || state.LastSeq != 3000 { + return fmt.Errorf("Incorrect state: %+v", state) + } + return nil + }) // Make sure we only sent 1 sync catchup msg. nmsgs, _, _ := sub.Pending() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 85c55693..d3fd566e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2576,3 +2576,47 @@ func TestJetStreamClusterCurrentVsHealth(t *testing.T) { } } } + +// Several users and customers use this setup, but many times across leafnodes. +// This should be allowed in same account since we are really protecting against +// multiple pub acks with cycle detection. +func TestJetStreamClusterActiveActiveSourcedStreams(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "A", + Subjects: []string{"A.>"}, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "B", + Subjects: []string{"B.>"}, + }) + require_NoError(t, err) + + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "A", + Subjects: []string{"A.>"}, + Sources: []*nats.StreamSource{{ + Name: "B", + FilterSubject: "B.>", + }}, + }) + require_NoError(t, err) + + // Before this would fail. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "B", + Subjects: []string{"B.>"}, + Sources: []*nats.StreamSource{{ + Name: "A", + FilterSubject: "A.>", + }}, + }) + require_NoError(t, err) +} diff --git a/server/stream.go b/server/stream.go index c3f906cf..e578e5e4 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1180,20 +1180,33 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi // cycle check for source cycle toVisit := []*StreamConfig{&cfg} visited := make(map[string]struct{}) + overlaps := func(subjects []string, filter string) bool { + if filter == _EMPTY_ { + return true + } + for _, subject := range subjects { + if SubjectsCollide(subject, filter) { + return true + } + } + return false + } + for len(toVisit) > 0 { cfg := toVisit[0] toVisit = toVisit[1:] visited[cfg.Name] = struct{}{} for _, src := range cfg.Sources { if src.External != nil { - // TODO (mh) look up service imports and see if src.External.ApiPrefix returns an account - // this will be much easier without the delivery subject continue } + // We can detect a cycle between streams, but let's double check that the + // subjects actually form a cycle. if _, ok := visited[src.Name]; ok { - return StreamConfig{}, NewJSStreamInvalidConfigError(errors.New("detected cycle")) - } - if exists, cfg := getStream(src.Name); exists { + if overlaps(cfg.Subjects, src.FilterSubject) { + return StreamConfig{}, NewJSStreamInvalidConfigError(errors.New("detected cycle")) + } + } else if exists, cfg := getStream(src.Name); exists { toVisit = append(toVisit, &cfg) } }