Expand test to target mirrors and sources in different JS domains.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-05-03 16:49:00 -07:00
committed by Ivan Kozlovic
parent df664e780e
commit c8d2132ee5
2 changed files with 71 additions and 4 deletions

View File

@@ -5818,8 +5818,8 @@ func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T)
// Make sure we can add a stream, etc.
si, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Name: "TEST22",
Subjects: []string{"bar"},
Replicas: 2,
})
if err != nil {
@@ -5829,6 +5829,73 @@ func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T)
t.Fatalf("Expected stream to be placed in the \"HUB\"")
}
jsLocal, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error getting JetStream context: %v", err)
}
// Create a mirror on the local leafnode for stream TEST22.
_, err = jsLocal.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{
Name: "TEST22",
External: &nats.ExternalStream{APIPrefix: "$JS.HUB.API"},
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Publish a message to the HUB's TEST22 stream.
if _, err := js.Publish("bar", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
// Make sure the message arrives in our mirror.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := jsLocal.StreamInfo("M")
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg, got state: %+v", si.State)
}
return nil
})
// Now do the reverse and create a sourced stream in the HUB from our local stream on leafnode.
// Inside the HUB we need to be able to find our local leafnode JetStream assets, so we need
// a mapping in the LN server to allow this to work. Normally this will just be in server config.
acc, err := ln.LookupAccount("JSY")
if err != nil {
c.t.Fatalf("Unexpected error on %v: %v", ln, err)
}
if err := acc.AddMapping("$JS.LN.API.>", "$JS.API.>"); err != nil {
c.t.Fatalf("Error adding mapping: %v", err)
}
// js is the HUB JetStream context here.
_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{
Name: "M",
External: &nats.ExternalStream{APIPrefix: "$JS.LN.API"},
}},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Make sure the message arrives in our sourced stream.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg, got state: %+v", si.State)
}
return nil
})
}
func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) {

View File

@@ -1387,7 +1387,7 @@ func (mset *stream) setupMirrorConsumer() error {
var deliverSubject string
ext := mset.cfg.Mirror.External
if ext != nil {
if ext != nil && ext.DeliverPrefix != _EMPTY_ {
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".M"), "..", ".")
} else {
deliverSubject = syncSubject("$JS.M")
@@ -1594,7 +1594,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
var deliverSubject string
ext := ssi.External
if ext != nil {
if ext != nil && ext.DeliverPrefix != _EMPTY_ {
deliverSubject = strings.ReplaceAll(ext.DeliverPrefix+syncSubject(".S"), "..", ".")
} else {
deliverSubject = syncSubject("$JS.S")