From 28ccaa4371a0f0137baaef3f8ddefc374ce8b37f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 3 Aug 2022 20:21:28 -0700 Subject: [PATCH] Direct get across a leafnode using cross domain mappings to a queue subscriber did not work. The interest moved across the leafnode would be for the mapping, and not the actual qsub. So when received if we did detect that we are mapped and do not have a queue filter present make sure to ignore. This will allow queue subscriber processing on the local server that received the message from the leafnode. Signed-off-by: Derek Collison --- server/jetstream_api.go | 1 + server/jetstream_cluster_test.go | 37 +++++++++++++++++++++++++- server/jetstream_super_cluster_test.go | 5 ++-- server/leafnode.go | 6 +++++ 4 files changed, 46 insertions(+), 3 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 1843bc1d..f9c4198e 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -305,6 +305,7 @@ func generateJSMappingTable(domain string) map[string]string { "INFO": JSApiAccountInfo, "STREAM.>": "$JS.API.STREAM.>", "CONSUMER.>": "$JS.API.CONSUMER.>", + "DIRECT.>": "$JS.API.DIRECT.>", "META.>": "$JS.API.META.>", "SERVER.>": "$JS.API.SERVER.>", "$KV.>": "$KV.>", diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 487f2751..7cedecea 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10541,7 +10541,7 @@ func TestJetStreamClusterMemoryConsumerInterestRetention(t *testing.T) { nsi, err := js.StreamInfo("test") require_NoError(t, err) - if nsi.State != si.State { + if !reflect.DeepEqual(nsi.State, si.State) { t.Fatalf("Stream states do not match: %+v vs %+v", si.State, nsi.State) } @@ -11701,3 +11701,38 @@ func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) { t.Run("Single", func(t *testing.T) { test(t, s, "single", 1) }) t.Run("Clustered", func(t *testing.T) { test(t, c.randomServer(), "clustered", 3) }) } + +func TestJetStreamDirectGetFromLeafnode(t *testing.T) { + tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: CORE, store_dir:", 1) + c := createJetStreamCluster(t, tmpl, "CORE", _EMPTY_, 3, 19022, true) + defer c.shutdown() + + tmpl = strings.Replace(jsClusterTemplWithSingleLeafNode, "store_dir:", "domain: SPOKE, store_dir:", 1) + ln := c.createLeafNodeWithTemplate("LN-SPOKE", tmpl) + defer ln.Shutdown() + + checkLeafNodeConnectedCount(t, ln, 2) + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KV"}) + require_NoError(t, err) + + _, err = kv.PutString("age", "22") + require_NoError(t, err) + + // Now connect to the ln and make sure we can do a domain direct get. + nc, _ = jsClientConnect(t, ln) + defer nc.Close() + + js, err = nc.JetStream(nats.Domain("CORE")) + require_NoError(t, err) + + kv, err = js.KeyValue("KV") + require_NoError(t, err) + + entry, err := kv.Get("age") + require_NoError(t, err) + require_True(t, string(entry.Value()) == "22") +} diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 6670c19a..3b1cc402 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/rand" + "reflect" "strings" "sync" "sync/atomic" @@ -1835,7 +1836,7 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) { si, err = js.StreamInfo("MOVE") require_NoError(t, err) - if si.State != initialState { + if !reflect.DeepEqual(si.State, initialState) { t.Fatalf("States do not match after migration:\n%+v\nvs\n%+v", si.State, initialState) } @@ -2035,7 +2036,7 @@ func TestJetStreamSuperClusterMovingStreamsWithMirror(t *testing.T) { mi, err := js.StreamInfo("MIRROR") require_NoError(t, err) - if si.State != mi.State { + if !reflect.DeepEqual(si.State, mi.State) { return fmt.Errorf("Expected mirror to be the same, got %+v vs %+v", mi.State, si.State) } return nil diff --git a/server/leafnode.go b/server/leafnode.go index 9cb5518a..d2982ed3 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2289,6 +2289,12 @@ func (c *client) processInboundLeafMsg(msg []byte) { atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { flag |= pmrCollectQueueNames } + // If this is a mapped subject that means the mapped interest + // is what got us here, but this might not have a queue designation + // If that is the case, make sure we ignore to process local queue subscribers. + if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 { + flag |= pmrIgnoreEmptyQueueFilter + } _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag) }