mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 11:04:42 -07:00
Merge pull request #1013 from nats-io/fix_gw_qinterest_loss
Fixed loss of queue subscription interest across Gateways in some cases
This commit is contained in:
@@ -1580,7 +1580,27 @@ func (s *Server) GatewayAddr() *net.TCPAddr {
|
||||
func (c *client) processGatewayAccountUnsub(accName string) {
|
||||
// Just to indicate activity around "subscriptions" events.
|
||||
c.in.subs++
|
||||
c.gw.outsim.Store(accName, nil)
|
||||
// This account may have an entry because of queue subs.
|
||||
// If that's the case, we can reset the no-interest map,
|
||||
// but not set the entry to nil.
|
||||
setToNil := true
|
||||
if ei, ok := c.gw.outsim.Load(accName); ei != nil {
|
||||
e := ei.(*outsie)
|
||||
e.Lock()
|
||||
// Reset the no-interest map if we have queue subs
|
||||
// and don't set the entry to nil.
|
||||
if e.qsubs > 0 {
|
||||
e.ni = make(map[string]struct{})
|
||||
setToNil = false
|
||||
}
|
||||
e.Unlock()
|
||||
} else if ok {
|
||||
// Already set to nil, so skip
|
||||
setToNil = false
|
||||
}
|
||||
if setToNil {
|
||||
c.gw.outsim.Store(accName, nil)
|
||||
}
|
||||
}
|
||||
|
||||
// A+ protocol received from remote gateway if it had previously
|
||||
@@ -1589,7 +1609,23 @@ func (c *client) processGatewayAccountUnsub(accName string) {
|
||||
func (c *client) processGatewayAccountSub(accName string) error {
|
||||
// Just to indicate activity around "subscriptions" events.
|
||||
c.in.subs++
|
||||
c.gw.outsim.Delete(accName)
|
||||
// If this account has an entry because of queue subs, we
|
||||
// can't delete the entry.
|
||||
remove := true
|
||||
if ei, ok := c.gw.outsim.Load(accName); ei != nil {
|
||||
e := ei.(*outsie)
|
||||
e.Lock()
|
||||
if e.qsubs > 0 {
|
||||
remove = false
|
||||
}
|
||||
e.Unlock()
|
||||
} else if !ok {
|
||||
// There is no entry, so skip
|
||||
remove = false
|
||||
}
|
||||
if remove {
|
||||
c.gw.outsim.Delete(accName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2006,6 +2042,15 @@ func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscriptio
|
||||
proto = append(proto, CR_LF...)
|
||||
}
|
||||
c.mu.Lock()
|
||||
// If we add a queue sub, and we had previously sent an A-,
|
||||
// we don't need to send an A+ here, but we need to clear
|
||||
// the fact that we did sent the A- so that we don't send
|
||||
// an A+ when we will get the first non-queue sub registered.
|
||||
if added {
|
||||
if ei, ok := c.gw.insim[accName]; ok && ei == nil {
|
||||
delete(c.gw.insim, accName)
|
||||
}
|
||||
}
|
||||
c.sendProto(proto, false)
|
||||
if c.trace {
|
||||
c.traceOutOp("", proto[:len(proto)-LEN_CR_LF])
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
@@ -125,6 +126,22 @@ func checkForSubjectNoInterest(t *testing.T, c *client, account, subject string,
|
||||
})
|
||||
}
|
||||
|
||||
func checkForAccountNoInterest(t *testing.T, c *client, account string, expectedNoInterest bool, timeout time.Duration) {
|
||||
t.Helper()
|
||||
checkFor(t, timeout, 15*time.Millisecond, func() error {
|
||||
ei, ok := c.gw.outsim.Load(account)
|
||||
if !ok && expectedNoInterest {
|
||||
return fmt.Errorf("No-interest for account %q not yet registered", account)
|
||||
} else if ok && !expectedNoInterest {
|
||||
return fmt.Errorf("Account %q should not have a no-interest", account)
|
||||
}
|
||||
if ei != nil {
|
||||
return fmt.Errorf("Account %q should have a global no-interest, not subject no-interest", account)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func waitCh(t *testing.T, ch chan bool, errTxt string) {
|
||||
t.Helper()
|
||||
select {
|
||||
@@ -162,6 +179,15 @@ func natsSubSync(t *testing.T, nc *nats.Conn, subj string) *nats.Subscription {
|
||||
return sub
|
||||
}
|
||||
|
||||
func natsNexMsg(t *testing.T, sub *nats.Subscription, timeout time.Duration) *nats.Msg {
|
||||
t.Helper()
|
||||
msg, err := sub.NextMsg(timeout)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed getting next message: %v", err)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func natsQueueSub(t *testing.T, nc *nats.Conn, subj, queue string, cb nats.MsgHandler) *nats.Subscription {
|
||||
t.Helper()
|
||||
sub, err := nc.QueueSubscribe(subj, queue, cb)
|
||||
@@ -327,13 +353,9 @@ func TestGatewayBasic(t *testing.T) {
|
||||
s1.Shutdown()
|
||||
// When s2 detects the connection is closed, it will attempt
|
||||
// to reconnect once (even if the route is implicit).
|
||||
// Wait a bit before restarting s1. For Windows, we need to wait
|
||||
// more than the dialTimeout before restarting the server.
|
||||
wait := 500 * time.Millisecond
|
||||
if runtime.GOOS == "windows" {
|
||||
wait = 1200 * time.Millisecond
|
||||
}
|
||||
time.Sleep(wait)
|
||||
// We need to wait more than a dial timeout to make sure
|
||||
// s1 does not restart too quickly and s2 can actually reconnect.
|
||||
time.Sleep(DEFAULT_ROUTE_DIAL + 250*time.Millisecond)
|
||||
// Restart s1 without gateway to B.
|
||||
o1.Gateway.Gateways = nil
|
||||
s1 = runGatewayServer(o1)
|
||||
@@ -800,10 +822,10 @@ func TestGatewayTLS(t *testing.T) {
|
||||
s1 = runGatewayServer(o1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, s1, 1, time.Second)
|
||||
waitForOutboundGateways(t, s2, 1, time.Second)
|
||||
waitForInboundGateways(t, s1, 1, time.Second)
|
||||
waitForInboundGateways(t, s2, 1, time.Second)
|
||||
waitForOutboundGateways(t, s1, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, s2, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, s1, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, s2, 1, 2*time.Second)
|
||||
|
||||
cfg = s1.getRemoteGateway("B")
|
||||
cfg.RLock()
|
||||
@@ -2445,8 +2467,8 @@ func TestGatewayComplexSetup(t *testing.T) {
|
||||
}
|
||||
natsFlush(t, ncsa1)
|
||||
|
||||
expectedLow := int(float32(total/2) * 0.7)
|
||||
expectedHigh := int(float32(total/2) * 1.3)
|
||||
expectedLow := int(float32(total/2) * 0.6)
|
||||
expectedHigh := int(float32(total/2) * 1.4)
|
||||
checkCount := func(t *testing.T, count *int32) {
|
||||
t.Helper()
|
||||
c := int(atomic.LoadInt32(count))
|
||||
@@ -4593,3 +4615,182 @@ func TestGatewayMapReplyOnlyForRecentSub(t *testing.T) {
|
||||
t.Fatalf("Did not get replies")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayNoAccInterestThenQSubThenRegularSub(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
// Connect on A and send a message
|
||||
ncA := natsConnect(t, fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port))
|
||||
defer ncA.Close()
|
||||
natsPub(t, ncA, "foo", []byte("hello"))
|
||||
natsFlush(t, ncA)
|
||||
|
||||
// expect an A- on return
|
||||
gwb := sa.getOutboundGatewayConnection("B")
|
||||
checkForAccountNoInterest(t, gwb, globalAccountName, true, time.Second)
|
||||
|
||||
// Create a connection o B, and create a queue sub first
|
||||
ncB := natsConnect(t, fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port))
|
||||
defer ncB.Close()
|
||||
qsub := natsQueueSubSync(t, ncB, "bar", "queue")
|
||||
natsFlush(t, ncB)
|
||||
|
||||
// A should have received a queue interest
|
||||
checkForRegisteredQSubInterest(t, sa, "B", globalAccountName, "bar", 1, time.Second)
|
||||
|
||||
// Now on B, create a regular sub
|
||||
sub := natsSubSync(t, ncB, "baz")
|
||||
natsFlush(t, ncB)
|
||||
|
||||
// From A now, produce a message on each subject and
|
||||
// expect both subs to receive their message.
|
||||
msgForQSub := []byte("msg_qsub")
|
||||
natsPub(t, ncA, "bar", msgForQSub)
|
||||
natsFlush(t, ncA)
|
||||
|
||||
if msg := natsNexMsg(t, qsub, time.Second); !bytes.Equal(msgForQSub, msg.Data) {
|
||||
t.Fatalf("Expected msg for queue sub to be %q, got %q", msgForQSub, msg.Data)
|
||||
}
|
||||
|
||||
// Publish for the regular sub
|
||||
msgForSub := []byte("msg_sub")
|
||||
natsPub(t, ncA, "baz", msgForSub)
|
||||
natsFlush(t, ncA)
|
||||
|
||||
if msg := natsNexMsg(t, sub, time.Second); !bytes.Equal(msgForSub, msg.Data) {
|
||||
t.Fatalf("Expected msg for sub to be %q, got %q", msgForSub, msg.Data)
|
||||
}
|
||||
}
|
||||
|
||||
// Similar to TestGatewayNoAccInterestThenQSubThenRegularSub but simulate
|
||||
// older incorrect behavior.
|
||||
func TestGatewayHandleUnexpectedASubUnsub(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
// Connect on A and send a message
|
||||
ncA := natsConnect(t, fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port))
|
||||
defer ncA.Close()
|
||||
natsPub(t, ncA, "foo", []byte("hello"))
|
||||
natsFlush(t, ncA)
|
||||
|
||||
// expect an A- on return
|
||||
gwb := sa.getOutboundGatewayConnection("B")
|
||||
checkForAccountNoInterest(t, gwb, globalAccountName, true, time.Second)
|
||||
|
||||
// Create a connection o B, and create a queue sub first
|
||||
ncB := natsConnect(t, fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port))
|
||||
defer ncB.Close()
|
||||
qsub := natsQueueSubSync(t, ncB, "bar", "queue")
|
||||
natsFlush(t, ncB)
|
||||
|
||||
// A should have received a queue interest
|
||||
checkForRegisteredQSubInterest(t, sa, "B", globalAccountName, "bar", 1, time.Second)
|
||||
|
||||
// Now on B, create a regular sub
|
||||
sub := natsSubSync(t, ncB, "baz")
|
||||
natsFlush(t, ncB)
|
||||
// and reproduce old, wrong, behavior that would have resulted in sending an A-
|
||||
gwA := getInboundGatewayConnection(sb, "A")
|
||||
gwA.mu.Lock()
|
||||
gwA.sendProto([]byte("A- $G\r\n"), true)
|
||||
gwA.mu.Unlock()
|
||||
|
||||
// From A now, produce a message on each subject and
|
||||
// expect both subs to receive their message.
|
||||
msgForQSub := []byte("msg_qsub")
|
||||
natsPub(t, ncA, "bar", msgForQSub)
|
||||
natsFlush(t, ncA)
|
||||
|
||||
if msg := natsNexMsg(t, qsub, time.Second); !bytes.Equal(msgForQSub, msg.Data) {
|
||||
t.Fatalf("Expected msg for queue sub to be %q, got %q", msgForQSub, msg.Data)
|
||||
}
|
||||
|
||||
// Publish for the regular sub
|
||||
msgForSub := []byte("msg_sub")
|
||||
natsPub(t, ncA, "baz", msgForSub)
|
||||
natsFlush(t, ncA)
|
||||
|
||||
if msg := natsNexMsg(t, sub, time.Second); !bytes.Equal(msgForSub, msg.Data) {
|
||||
t.Fatalf("Expected msg for sub to be %q, got %q", msgForSub, msg.Data)
|
||||
}
|
||||
|
||||
// Remove all subs on B.
|
||||
qsub.Unsubscribe()
|
||||
sub.Unsubscribe()
|
||||
ncB.Flush()
|
||||
|
||||
// Produce a message from A expect A-
|
||||
natsPub(t, ncA, "foo", []byte("hello"))
|
||||
natsFlush(t, ncA)
|
||||
|
||||
// expect an A- on return
|
||||
checkForAccountNoInterest(t, gwb, globalAccountName, true, time.Second)
|
||||
|
||||
// Simulate B sending another A-, on A account no interest should remain same.
|
||||
gwA.mu.Lock()
|
||||
gwA.sendProto([]byte("A- $G\r\n"), true)
|
||||
gwA.mu.Unlock()
|
||||
|
||||
checkForAccountNoInterest(t, gwb, globalAccountName, true, time.Second)
|
||||
|
||||
// Create a queue sub on B
|
||||
qsub = natsQueueSubSync(t, ncB, "bar", "queue")
|
||||
natsFlush(t, ncB)
|
||||
|
||||
checkForRegisteredQSubInterest(t, sa, "B", globalAccountName, "bar", 1, time.Second)
|
||||
|
||||
// Make B send an A+ and verify that we sitll have the registered qsub interest
|
||||
gwA.mu.Lock()
|
||||
gwA.sendProto([]byte("A+ $G\r\n"), true)
|
||||
gwA.mu.Unlock()
|
||||
|
||||
// Give a chance to A to possibly misbehave when receiving this proto
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
// Now check interest is still there
|
||||
checkForRegisteredQSubInterest(t, sa, "B", globalAccountName, "bar", 1, time.Second)
|
||||
|
||||
qsub.Unsubscribe()
|
||||
natsFlush(t, ncB)
|
||||
checkForRegisteredQSubInterest(t, sa, "B", globalAccountName, "bar", 0, time.Second)
|
||||
|
||||
// Send A-, server A should set entry to nil
|
||||
gwA.mu.Lock()
|
||||
gwA.sendProto([]byte("A- $G\r\n"), true)
|
||||
gwA.mu.Unlock()
|
||||
checkForAccountNoInterest(t, gwb, globalAccountName, true, time.Second)
|
||||
|
||||
// Send A+ and entry should be removed since there is no longer reason to
|
||||
// keep the entry.
|
||||
gwA.mu.Lock()
|
||||
gwA.sendProto([]byte("A+ $G\r\n"), true)
|
||||
gwA.mu.Unlock()
|
||||
checkForAccountNoInterest(t, gwb, globalAccountName, false, time.Second)
|
||||
|
||||
// Last A+ should not change because account already removed from map.
|
||||
gwA.mu.Lock()
|
||||
gwA.sendProto([]byte("A+ $G\r\n"), true)
|
||||
gwA.mu.Unlock()
|
||||
checkForAccountNoInterest(t, gwb, globalAccountName, false, time.Second)
|
||||
}
|
||||
|
||||
@@ -461,6 +461,8 @@ func TestGatewayNoPanicOnBadProtocol(t *testing.T) {
|
||||
{"rsub", "RS+ $foo foo 2\r\n"},
|
||||
{"runsub", "RS- $foo foo 2\r\n"},
|
||||
{"pub", "PUB foo 2\r\nok\r\n"},
|
||||
{"msg", "MSG foo 2\r\nok\r\n"},
|
||||
{"rmsg", "RMSG $foo foo 2\r\nok\r\n"},
|
||||
{"anything", "xxxx\r\n"},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
@@ -479,3 +481,38 @@ func TestGatewayNoPanicOnBadProtocol(t *testing.T) {
|
||||
clientSend("PING\r\n")
|
||||
clientExpect(pongRe)
|
||||
}
|
||||
|
||||
func TestGatewayNoAccUnsubAfterQSub(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port)
|
||||
defer gA.Close()
|
||||
|
||||
gASend, gAExpect := setupGatewayConn(t, gA, "A", "B")
|
||||
gASend("PING\r\n")
|
||||
gAExpect(pongRe)
|
||||
|
||||
// Simulate a client connecting to A and publishing a message
|
||||
// so we get an A- from B since there is no interest.
|
||||
gASend("RMSG $G foo 2\r\nok\r\n")
|
||||
gAExpect(aunsubRe)
|
||||
|
||||
// Now create client on B and create queue sub.
|
||||
client := createClientConn(t, ob.Host, ob.Port)
|
||||
defer client.Close()
|
||||
clientSend, clientExpect := setupConn(t, client)
|
||||
|
||||
clientSend("SUB bar queue 1\r\nPING\r\n")
|
||||
clientExpect(pongRe)
|
||||
|
||||
// A should receive an RS+ for this queue sub.
|
||||
gAExpect(rsubRe)
|
||||
|
||||
// On B, create a plain sub now. We should get nothing.
|
||||
clientSend("SUB baz 2\r\nPING\r\n")
|
||||
clientExpect(pongRe)
|
||||
|
||||
expectNothing(t, gA)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user