diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 40708337..2908c1b8 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5818,8 +5818,8 @@ func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T) // Make sure we can add a stream, etc. si, err = js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, + Name: "TEST22", + Subjects: []string{"bar"}, Replicas: 2, }) if err != nil { @@ -5829,6 +5829,73 @@ func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T) t.Fatalf("Expected stream to be placed in the \"HUB\"") } + jsLocal, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error getting JetStream context: %v", err) + } + + // Create a mirror on the local leafnode for stream TEST22. + _, err = jsLocal.AddStream(&nats.StreamConfig{ + Name: "M", + Mirror: &nats.StreamSource{ + Name: "TEST22", + External: &nats.ExternalStream{APIPrefix: "$JS.HUB.API"}, + }, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Publish a message to the HUB's TEST22 stream. + if _, err := js.Publish("bar", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + // Make sure the message arrives in our mirror. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := jsLocal.StreamInfo("M") + if err != nil { + return fmt.Errorf("Could not get stream info: %v", err) + } + if si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg, got state: %+v", si.State) + } + return nil + }) + + // Now do the reverse and create a sourced stream in the HUB from our local stream on leafnode. + // Inside the HUB we need to be able to find our local leafnode JetStream assets, so we need + // a mapping in the LN server to allow this to work. Normally this will just be in server config. + acc, err := ln.LookupAccount("JSY") + if err != nil { + c.t.Fatalf("Unexpected error on %v: %v", ln, err) + } + if err := acc.AddMapping("$JS.LN.API.>", "$JS.API.>"); err != nil { + c.t.Fatalf("Error adding mapping: %v", err) + } + + // js is the HUB JetStream context here. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "S", + Sources: []*nats.StreamSource{{ + Name: "M", + External: &nats.ExternalStream{APIPrefix: "$JS.LN.API"}, + }}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Make sure the message arrives in our sourced stream. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("S") + if err != nil { + return fmt.Errorf("Could not get stream info: %v", err) + } + if si.State.Msgs != 1 { + return fmt.Errorf("Expected 1 msg, got state: %+v", si.State) + } + return nil + }) } func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) { diff --git a/server/stream.go b/server/stream.go index f2ee192d..38a940ac 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1387,7 +1387,7 @@ func (mset *stream) setupMirrorConsumer() error { var deliverSubject string ext := mset.cfg.Mirror.External - if ext != nil { + if ext != nil && ext.DeliverPrefix != _EMPTY_ { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".") } else { deliverSubject = syncSubject("$JS.M") @@ -1594,7 +1594,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { var deliverSubject string ext := ssi.External - if ext != nil { + if ext != nil && ext.DeliverPrefix != _EMPTY_ { deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".") } else { deliverSubject = syncSubject("$JS.S")