mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Direct get across a leafnode using cross domain mappings to a queue subscriber did not work.
The interest moved across the leafnode would be for the mapping, and not the actual qsub. So when received if we did detect that we are mapped and do not have a queue filter present make sure to ignore. This will allow queue subscriber processing on the local server that received the message from the leafnode. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -305,6 +305,7 @@ func generateJSMappingTable(domain string) map[string]string {
|
||||
"INFO": JSApiAccountInfo,
|
||||
"STREAM.>": "$JS.API.STREAM.>",
|
||||
"CONSUMER.>": "$JS.API.CONSUMER.>",
|
||||
"DIRECT.>": "$JS.API.DIRECT.>",
|
||||
"META.>": "$JS.API.META.>",
|
||||
"SERVER.>": "$JS.API.SERVER.>",
|
||||
"$KV.>": "$KV.>",
|
||||
|
||||
@@ -10541,7 +10541,7 @@ func TestJetStreamClusterMemoryConsumerInterestRetention(t *testing.T) {
|
||||
|
||||
nsi, err := js.StreamInfo("test")
|
||||
require_NoError(t, err)
|
||||
if nsi.State != si.State {
|
||||
if !reflect.DeepEqual(nsi.State, si.State) {
|
||||
t.Fatalf("Stream states do not match: %+v vs %+v", si.State, nsi.State)
|
||||
}
|
||||
|
||||
@@ -11701,3 +11701,38 @@ func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) {
|
||||
t.Run("Single", func(t *testing.T) { test(t, s, "single", 1) })
|
||||
t.Run("Clustered", func(t *testing.T) { test(t, c.randomServer(), "clustered", 3) })
|
||||
}
|
||||
|
||||
func TestJetStreamDirectGetFromLeafnode(t *testing.T) {
|
||||
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: CORE, store_dir:", 1)
|
||||
c := createJetStreamCluster(t, tmpl, "CORE", _EMPTY_, 3, 19022, true)
|
||||
defer c.shutdown()
|
||||
|
||||
tmpl = strings.Replace(jsClusterTemplWithSingleLeafNode, "store_dir:", "domain: SPOKE, store_dir:", 1)
|
||||
ln := c.createLeafNodeWithTemplate("LN-SPOKE", tmpl)
|
||||
defer ln.Shutdown()
|
||||
|
||||
checkLeafNodeConnectedCount(t, ln, 2)
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KV"})
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = kv.PutString("age", "22")
|
||||
require_NoError(t, err)
|
||||
|
||||
// Now connect to the ln and make sure we can do a domain direct get.
|
||||
nc, _ = jsClientConnect(t, ln)
|
||||
defer nc.Close()
|
||||
|
||||
js, err = nc.JetStream(nats.Domain("CORE"))
|
||||
require_NoError(t, err)
|
||||
|
||||
kv, err = js.KeyValue("KV")
|
||||
require_NoError(t, err)
|
||||
|
||||
entry, err := kv.Get("age")
|
||||
require_NoError(t, err)
|
||||
require_True(t, string(entry.Value()) == "22")
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -1835,7 +1836,7 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) {
|
||||
si, err = js.StreamInfo("MOVE")
|
||||
require_NoError(t, err)
|
||||
|
||||
if si.State != initialState {
|
||||
if !reflect.DeepEqual(si.State, initialState) {
|
||||
t.Fatalf("States do not match after migration:\n%+v\nvs\n%+v", si.State, initialState)
|
||||
}
|
||||
|
||||
@@ -2035,7 +2036,7 @@ func TestJetStreamSuperClusterMovingStreamsWithMirror(t *testing.T) {
|
||||
mi, err := js.StreamInfo("MIRROR")
|
||||
require_NoError(t, err)
|
||||
|
||||
if si.State != mi.State {
|
||||
if !reflect.DeepEqual(si.State, mi.State) {
|
||||
return fmt.Errorf("Expected mirror to be the same, got %+v vs %+v", mi.State, si.State)
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -2289,6 +2289,12 @@ func (c *client) processInboundLeafMsg(msg []byte) {
|
||||
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
|
||||
flag |= pmrCollectQueueNames
|
||||
}
|
||||
// If this is a mapped subject that means the mapped interest
|
||||
// is what got us here, but this might not have a queue designation
|
||||
// If that is the case, make sure we ignore to process local queue subscribers.
|
||||
if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
|
||||
flag |= pmrIgnoreEmptyQueueFilter
|
||||
}
|
||||
_, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user