diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 1843bc1d..f9c4198e 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -305,6 +305,7 @@ func generateJSMappingTable(domain string) map[string]string { "INFO": JSApiAccountInfo, "STREAM.>": "$JS.API.STREAM.>", "CONSUMER.>": "$JS.API.CONSUMER.>", + "DIRECT.>": "$JS.API.DIRECT.>", "META.>": "$JS.API.META.>", "SERVER.>": "$JS.API.SERVER.>", "$KV.>": "$KV.>", diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 487f2751..7cedecea 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10541,7 +10541,7 @@ func TestJetStreamClusterMemoryConsumerInterestRetention(t *testing.T) { nsi, err := js.StreamInfo("test") require_NoError(t, err) - if nsi.State != si.State { + if !reflect.DeepEqual(nsi.State, si.State) { t.Fatalf("Stream states do not match: %+v vs %+v", si.State, nsi.State) } @@ -11701,3 +11701,38 @@ func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) { t.Run("Single", func(t *testing.T) { test(t, s, "single", 1) }) t.Run("Clustered", func(t *testing.T) { test(t, c.randomServer(), "clustered", 3) }) } + +func TestJetStreamDirectGetFromLeafnode(t *testing.T) { + tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: CORE, store_dir:", 1) + c := createJetStreamCluster(t, tmpl, "CORE", _EMPTY_, 3, 19022, true) + defer c.shutdown() + + tmpl = strings.Replace(jsClusterTemplWithSingleLeafNode, "store_dir:", "domain: SPOKE, store_dir:", 1) + ln := c.createLeafNodeWithTemplate("LN-SPOKE", tmpl) + defer ln.Shutdown() + + checkLeafNodeConnectedCount(t, ln, 2) + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KV"}) + require_NoError(t, err) + + _, err = kv.PutString("age", "22") + require_NoError(t, err) + + // Now connect to the ln and make sure we can do a domain direct get. + nc, _ = jsClientConnect(t, ln) + defer nc.Close() + + js, err = nc.JetStream(nats.Domain("CORE")) + require_NoError(t, err) + + kv, err = js.KeyValue("KV") + require_NoError(t, err) + + entry, err := kv.Get("age") + require_NoError(t, err) + require_True(t, string(entry.Value()) == "22") +} diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 6670c19a..3b1cc402 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/rand" + "reflect" "strings" "sync" "sync/atomic" @@ -1835,7 +1836,7 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) { si, err = js.StreamInfo("MOVE") require_NoError(t, err) - if si.State != initialState { + if !reflect.DeepEqual(si.State, initialState) { t.Fatalf("States do not match after migration:\n%+v\nvs\n%+v", si.State, initialState) } @@ -2035,7 +2036,7 @@ func TestJetStreamSuperClusterMovingStreamsWithMirror(t *testing.T) { mi, err := js.StreamInfo("MIRROR") require_NoError(t, err) - if si.State != mi.State { + if !reflect.DeepEqual(si.State, mi.State) { return fmt.Errorf("Expected mirror to be the same, got %+v vs %+v", mi.State, si.State) } return nil diff --git a/server/leafnode.go b/server/leafnode.go index 9cb5518a..d2982ed3 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2289,6 +2289,12 @@ func (c *client) processInboundLeafMsg(msg []byte) { atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { flag |= pmrCollectQueueNames } + // If this is a mapped subject that means the mapped interest + // is what got us here, but this might not have a queue designation + // If that is the case, make sure we ignore to process local queue subscribers. + if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 { + flag |= pmrIgnoreEmptyQueueFilter + } _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag) }