mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] JetStream: possible routing issues through gateways
Internally jetstream may subscribe to some subject and then send a request with a reply subject matching that subscription. Due to interest propagation through a super cluster, it is possible that the reply comes back to a node that is not yet aware of the subscription interest which would cause the reply to be dropped. Some code detects that the subscription is recent and "map" the reply subject so that it can be routed back to the origin server. However, this was done with the use of the connection object that created the subscription, but at the time of the send, a different internal "*client" object may be used which would then cause the code to not be aware of the recent subscription and not do the mapping. This code was changed to scope at the account level instead of connection. A recent change in PR #3412 is no longer needed and was reverted in favor of changes in this PR. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -3761,3 +3761,77 @@ func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyOperatorConfig(t *tes
|
||||
waitForOutboundGateways(t, s, 2, 5*time.Second)
|
||||
check(s)
|
||||
}
|
||||
|
||||
type captureGWRewriteLogger struct {
|
||||
DummyLogger
|
||||
ch chan string
|
||||
}
|
||||
|
||||
func (l *captureGWRewriteLogger) Tracef(format string, args ...interface{}) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
if strings.Contains(msg, "$JS.SNAPSHOT.ACK.TEST") && strings.Contains(msg, gwReplyPrefix) {
|
||||
select {
|
||||
case l.ch <- msg:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamSuperClusterGWReplyRewrite(t *testing.T) {
|
||||
sc := createJetStreamSuperCluster(t, 3, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, sc.serverByName("C1-S1"))
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
sc.waitOnStreamLeader(globalAccountName, "TEST")
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
sendStreamMsg(t, nc, "foo", "msg")
|
||||
}
|
||||
|
||||
nc2, _ := jsClientConnect(t, sc.serverByName("C2-S2"))
|
||||
defer nc2.Close()
|
||||
|
||||
s := sc.clusters[0].streamLeader(globalAccountName, "TEST")
|
||||
var gws []*client
|
||||
s.getOutboundGatewayConnections(&gws)
|
||||
for _, gw := range gws {
|
||||
gw.mu.Lock()
|
||||
gw.trace = true
|
||||
gw.mu.Unlock()
|
||||
}
|
||||
l := &captureGWRewriteLogger{ch: make(chan string, 1)}
|
||||
s.SetLogger(l, false, true)
|
||||
|
||||
// Send a request through the gateway
|
||||
sreq := &JSApiStreamSnapshotRequest{
|
||||
DeliverSubject: nats.NewInbox(),
|
||||
ChunkSize: 512,
|
||||
}
|
||||
natsSub(t, nc2, sreq.DeliverSubject, func(m *nats.Msg) {
|
||||
m.Respond(nil)
|
||||
})
|
||||
natsFlush(t, nc2)
|
||||
req, _ := json.Marshal(sreq)
|
||||
rmsg, err := nc2.Request(fmt.Sprintf(JSApiStreamSnapshotT, "TEST"), req, time.Second)
|
||||
require_NoError(t, err)
|
||||
var resp JSApiStreamSnapshotResponse
|
||||
err = json.Unmarshal(rmsg.Data, &resp)
|
||||
require_NoError(t, err)
|
||||
if resp.Error != nil {
|
||||
t.Fatalf("Did not get correct error response: %+v", resp.Error)
|
||||
}
|
||||
|
||||
// Now we just want to make sure that the reply has the gateway prefix
|
||||
select {
|
||||
case <-l.ch:
|
||||
case <-time.After(10 * time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user