diff --git a/server/gateway.go b/server/gateway.go index 886f204a..2a7de885 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -118,6 +118,14 @@ func (im GatewayInterestMode) String() string { } } +var gwDoNotForceInterestOnlyMode bool + +// GatewayDoNotForceInterestOnlyMode is used ONLY in tests. +// DO NOT USE in normal code or if you embed the NATS Server. +func GatewayDoNotForceInterestOnlyMode(doNotForce bool) { + gwDoNotForceInterestOnlyMode = doNotForce +} + type srvGateway struct { totalQSubs int64 //total number of queue subs in all remote gateways (used with atomic operations) sync.RWMutex @@ -193,16 +201,21 @@ type gatewayCfg struct { // Struct for client's gateway related fields type gateway struct { name string - outbound bool cfg *gatewayCfg connectURL *url.URL // Needed when sending CONNECT after receiving INFO from remote outsim *sync.Map // Per-account subject interest (or no-interest) (outbound conn) insim map[string]*insie // Per-account subject no-interest sent or modeInterestOnly mode (inbound conn) + // This is an outbound GW connection + outbound bool // Set/check in readLoop without lock. This is to know that an inbound has sent the CONNECT protocol first connected bool // Set to true if outbound is to a server that only knows about $GR, not $GNR useOldPrefix bool + // If true, it indicates that the inbound side will switch any account to + // interest-only mode "immediately", so the outbound should disregard + // the optimistic mode when checking for interest. + interestOnlyMode bool } // Outbound subject interest entry. @@ -510,6 +523,14 @@ func (s *Server) startGatewayAcceptLoop() { GatewayNRP: true, Headers: s.supportsHeaders(), } + // Unless in some tests we want to keep the old behavior, we are now + // (since v2.9.0) indicate that this server will switch all accounts + // to InterestOnly mode when accepting an inbound or when a new + // account is fetched. + if !gwDoNotForceInterestOnlyMode { + info.GatewayIOM = true + } + // If we have selected a random port... if port == 0 { // Write resolved port back to options. @@ -1038,6 +1059,7 @@ func (c *client) processGatewayInfo(info *Info) { // from this INFO protocol and can sign it in the CONNECT we are // going to send now. c.mu.Lock() + c.gw.interestOnlyMode = info.GatewayIOM c.sendGatewayConnect(opts) c.Debugf("Gateway connect protocol sent to %q", gwName) // Send INFO too @@ -1122,8 +1144,9 @@ func (c *client) processGatewayInfo(info *Info) { js := s.js s.mu.Unlock() - // Switch JetStream accounts to interest-only mode. - if js != nil { + // If running in some tests, maintain the original behavior. + if gwDoNotForceInterestOnlyMode && js != nil { + // Switch JetStream accounts to interest-only mode. var accounts []string js.mu.Lock() if len(js.accounts) > 0 { @@ -1140,6 +1163,15 @@ func (c *client) processGatewayInfo(info *Info) { } } } + } else if !gwDoNotForceInterestOnlyMode { + // Starting 2.9.0, we are phasing out the optimistic mode, so change + // all accounts to interest-only mode, unless instructed not to do so + // in some tests. + s.accounts.Range(func(_, v interface{}) bool { + acc := v.(*Account) + s.switchAccountToInterestMode(acc.GetName()) + return true + }) } } } @@ -2050,17 +2082,19 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { if accountInMap && ei == nil { return false, nil } - // Assume interest if account not in map. - psi := !accountInMap + // Assume interest if account not in map, unless we support + // only interest-only mode. + psi := !accountInMap && !c.gw.interestOnlyMode var r *SublistResult if accountInMap { // If in map, check for subs interest with sublist. e := ei.(*outsie) e.RLock() - // We may be in transition to modeInterestOnly + // Unless each side has agreed on interest-only mode, + // we may be in transition to modeInterestOnly // but until e.ni is nil, use it to know if we // should suppress interest or not. - if e.ni != nil { + if !c.gw.interestOnlyMode && e.ni != nil { if _, inMap := e.ni[subj]; !inMap { psi = true } diff --git a/server/gateway_test.go b/server/gateway_test.go index b8f9c5ab..d7f7cd70 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -144,6 +144,60 @@ func checkForAccountNoInterest(t *testing.T, c *client, account string, expected }) } +func checkGWInterestOnlyMode(t *testing.T, s *Server, outboundGWName, accName string) { + t.Helper() + checkGWInterestOnlyModeOrNotPresent(t, s, outboundGWName, accName, false) +} + +func checkGWInterestOnlyModeOrNotPresent(t *testing.T, s *Server, outboundGWName, accName string, notPresentOk bool) { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + gwc := s.getOutboundGatewayConnection(outboundGWName) + if gwc == nil { + return fmt.Errorf("No outbound gateway connection %q for server %v", outboundGWName, s) + } + gwc.mu.Lock() + defer gwc.mu.Unlock() + out, ok := gwc.gw.outsim.Load(accName) + if !ok { + if notPresentOk { + return nil + } else { + return fmt.Errorf("Server %v - outbound gateway connection %q: no account %q found in map", + s, outboundGWName, accName) + } + } + if out == nil { + return fmt.Errorf("Server %v - outbound gateway connection %q: interest map not found for account %q", + s, outboundGWName, accName) + } else if mode := out.(*outsie).mode; mode != InterestOnly { + return fmt.Errorf( + "Server %v - outbound gateway connection %q: account %q mode shoule be InterestOnly but is %v", + s, outboundGWName, accName, mode) + } + return nil + }) +} + +func checkGWInterestOnlyModeInterestOn(t *testing.T, s *Server, outboundGWName, accName, subject string) { + t.Helper() + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + c := s.getOutboundGatewayConnection(outboundGWName) + outsiei, _ := c.gw.outsim.Load(accName) + if outsiei == nil { + return fmt.Errorf("Server %s - outbound gateway connection %q: no map entry found for account %q", + s, outboundGWName, accName) + } + outsie := outsiei.(*outsie) + r := outsie.sl.Match(subject) + if len(r.psubs) == 0 { + return fmt.Errorf("Server %s - outbound gateway connection %q - account %q: no subject interest for %q", + s, outboundGWName, accName, subject) + } + return nil + }) +} + func waitCh(t *testing.T, ch chan bool, errTxt string) { t.Helper() select { @@ -1666,6 +1720,9 @@ func setAccountUserPassInOptions(o *Options, accName, username, password string) } func TestGatewayAccountInterest(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + o2 := testDefaultOptionsForGateway("B") // Add users to cause s2 to require auth. Will add an account with user later. o2.Users = append([]*User(nil), &User{Username: "test", Password: "pwd"}) @@ -1841,6 +1898,9 @@ func TestGatewayAccountUnsub(t *testing.T) { } func TestGatewaySubjectInterest(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + o1 := testDefaultOptionsForGateway("A") setAccountUserPassInOptions(o1, "$foo", "ivan", "password") s1 := runGatewayServer(o1) @@ -2497,6 +2557,9 @@ func TestGatewaySendQSubsOnGatewayConnect(t *testing.T) { } func TestGatewaySendRemoteQSubs(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + ob1 := testDefaultOptionsForGateway("B") sb1 := runGatewayServer(ob1) defer sb1.Shutdown() @@ -3281,6 +3344,9 @@ func getInboundGatewayConnection(s *Server, name string) *client { } func TestGatewaySendAllSubs(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + gatewayMaxRUnsubBeforeSwitch = 100 defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }() @@ -3392,21 +3458,9 @@ func TestGatewaySendAllSubs(t *testing.T) { if !switchedMode { return fmt.Errorf("C has still not switched mode") } - switchedMode = false - // Now check B outbound connection to C - c = sb.getOutboundGatewayConnection("C") - ei, _ := c.gw.outsim.Load(globalAccountName) - if ei != nil { - e := ei.(*outsie) - e.RLock() - switchedMode = e.ni == nil && e.mode == InterestOnly - e.RUnlock() - } - if !switchedMode { - return fmt.Errorf("C has still not switched mode") - } return nil }) + checkGWInterestOnlyMode(t, sb, "C", globalAccountName) wg.Wait() // Check consCount and accsCount on C @@ -3582,6 +3636,9 @@ func TestGatewayRaceOnClose(t *testing.T) { // Similar to TestNewRoutesServiceImport but with 2 GW servers instead // of a cluster of 2 servers. func TestGatewayServiceImport(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + oa := testDefaultOptionsForGateway("A") setAccountUserPassInOptions(oa, "$foo", "clientA", "password") setAccountUserPassInOptions(oa, "$bar", "yyyyyyy", "password") @@ -3809,21 +3866,7 @@ func TestGatewayServiceImport(t *testing.T) { } natsFlush(t, clientB) - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - c := sb.getOutboundGatewayConnection("A") - outsiei, _ := c.gw.outsim.Load("$foo") - if outsiei == nil { - return fmt.Errorf("Nothing found for $foo") - } - outsie := outsiei.(*outsie) - outsie.RLock() - mode := outsie.mode - outsie.RUnlock() - if mode != InterestOnly { - return fmt.Errorf("Should have switched to interest only mode") - } - return nil - }) + checkGWInterestOnlyMode(t, sb, "A", "$foo") // Go back to clientB on $bar. clientB.Close() @@ -3839,19 +3882,7 @@ func TestGatewayServiceImport(t *testing.T) { // Sine it is interest-only, B should receive an interest // on $foo test.request - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - c := sb.getOutboundGatewayConnection("A") - outsiei, _ := c.gw.outsim.Load("$foo") - if outsiei == nil { - return fmt.Errorf("Nothing found for $foo") - } - outsie := outsiei.(*outsie) - r := outsie.sl.Match("test.request") - if len(r.psubs) != 1 { - return fmt.Errorf("No registered interest on test.request") - } - return nil - }) + checkGWInterestOnlyModeInterestOn(t, sb, "A", "$foo", "test.request") // Send the request from clientB on foo.request, natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) @@ -3893,6 +3924,9 @@ func TestGatewayServiceImport(t *testing.T) { } func TestGatewayServiceImportWithQueue(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + oa := testDefaultOptionsForGateway("A") setAccountUserPassInOptions(oa, "$foo", "clientA", "password") setAccountUserPassInOptions(oa, "$bar", "yyyyyyy", "password") @@ -4124,21 +4158,7 @@ func TestGatewayServiceImportWithQueue(t *testing.T) { } natsFlush(t, clientB) - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - c := sb.getOutboundGatewayConnection("A") - outsiei, _ := c.gw.outsim.Load("$foo") - if outsiei == nil { - return fmt.Errorf("Nothing found for $foo") - } - outsie := outsiei.(*outsie) - outsie.RLock() - mode := outsie.mode - outsie.RUnlock() - if mode != InterestOnly { - return fmt.Errorf("Should have switched to interest only mode") - } - return nil - }) + checkGWInterestOnlyMode(t, sb, "A", "$foo") // Go back to clientB on $bar. clientB.Close() @@ -4154,19 +4174,7 @@ func TestGatewayServiceImportWithQueue(t *testing.T) { // Sine it is interest-only, B should receive an interest // on $foo test.request - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - c := sb.getOutboundGatewayConnection("A") - outsiei, _ := c.gw.outsim.Load("$foo") - if outsiei == nil { - return fmt.Errorf("Nothing found for $foo") - } - outsie := outsiei.(*outsie) - r := outsie.sl.Match("test.request") - if len(r.psubs) != 1 { - return fmt.Errorf("No registered interest on test.request") - } - return nil - }) + checkGWInterestOnlyModeInterestOn(t, sb, "A", "$foo", "test.request") // Send the request from clientB on foo.request, natsPubReq(t, clientB, "foo.request", "reply", []byte("hi")) @@ -4517,24 +4525,7 @@ func TestGatewayServiceImportComplexSetup(t *testing.T) { } natsFlush(t, clientA) // Wait for B2 to switch to interest-only - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - c := sa2.getOutboundGatewayConnection("B") - if c == nil { - return fmt.Errorf("No outbound to B2") - } - outsiei, _ := c.gw.outsim.Load("$foo") - if outsiei == nil { - return fmt.Errorf("Nothing for $foo") - } - outsie := outsiei.(*outsie) - outsie.RLock() - mode := outsie.mode - outsie.RUnlock() - if mode != InterestOnly { - return fmt.Errorf("Not in interest-only mode yet") - } - return nil - }) + checkGWInterestOnlyMode(t, sa2, "B", "$foo") subA = natsSubSync(t, clientA, "test.request") natsFlush(t, clientA) @@ -4890,24 +4881,7 @@ func TestGatewayServiceExportWithWildcards(t *testing.T) { natsFlush(t, clientA) // Wait for B2 to switch to interest-only - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - c := sa2.getOutboundGatewayConnection("B") - if c == nil { - return fmt.Errorf("No outbound to B2") - } - outsiei, _ := c.gw.outsim.Load("$foo") - if outsiei == nil { - return fmt.Errorf("Nothing for $foo") - } - outsie := outsiei.(*outsie) - outsie.RLock() - mode := outsie.mode - outsie.RUnlock() - if mode != InterestOnly { - return fmt.Errorf("Not in interest-only mode yet") - } - return nil - }) + checkGWInterestOnlyMode(t, sa2, "B", "$foo") subA = natsSubSync(t, clientA, "ngs.update.*") natsFlush(t, clientA) @@ -5541,6 +5515,9 @@ func TestGatewayClientsDontReceiveMsgsOnGWPrefix(t *testing.T) { } func TestGatewayNoAccInterestThenQSubThenRegularSub(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") sb := runGatewayServer(ob) defer sb.Shutdown() @@ -5600,6 +5577,9 @@ func TestGatewayNoAccInterestThenQSubThenRegularSub(t *testing.T) { // Similar to TestGatewayNoAccInterestThenQSubThenRegularSub but simulate // older incorrect behavior. func TestGatewayHandleUnexpectedASubUnsub(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") sb := runGatewayServer(ob) defer sb.Shutdown() @@ -5735,6 +5715,9 @@ func (l *captureGWInterestSwitchLogger) Debugf(format string, args ...interface{ } func TestGatewayLogAccountInterestModeSwitch(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") sb := runGatewayServer(ob) defer sb.Shutdown() @@ -5782,21 +5765,7 @@ func TestGatewayLogAccountInterestModeSwitch(t *testing.T) { return nil }) - gwB := sa.getOutboundGatewayConnection("B") - checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - ei, ok := gwB.gw.outsim.Load(globalAccountName) - if !ok || ei == nil { - return fmt.Errorf("not switched yet") - } - e := ei.(*outsie) - e.RLock() - mode := e.mode - e.RUnlock() - if mode != InterestOnly { - return fmt.Errorf("not in interest mode only yet") - } - return nil - }) + checkGWInterestOnlyMode(t, sa, "B", globalAccountName) checkLog := func(t *testing.T, l *captureGWInterestSwitchLogger) { t.Helper() @@ -5835,6 +5804,9 @@ func TestGatewayLogAccountInterestModeSwitch(t *testing.T) { } func TestGatewayAccountInterestModeSwitchOnlyOncePerAccount(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") sb := runGatewayServer(ob) defer sb.Shutdown() @@ -6780,3 +6752,72 @@ func TestGatewayNoPanicOnStartupWithMonitoring(t *testing.T) { s.Shutdown() wg.Wait() } + +func TestGatewaySwitchToInterestOnlyModeImmediately(t *testing.T) { + o2 := testDefaultOptionsForGateway("B") + // Add users to cause s2 to require auth. Will add an account with user later. + o2.Users = append([]*User(nil), &User{Username: "test", Password: "pwd"}) + s2 := runGatewayServer(o2) + defer s2.Shutdown() + + o1 := testGatewayOptionsFromToWithServers(t, "A", "B", s2) + setAccountUserPassInOptions(o1, "$foo", "ivan", "password") + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + waitForOutboundGateways(t, s1, 1, time.Second) + waitForOutboundGateways(t, s2, 1, time.Second) + + s1Url := fmt.Sprintf("nats://ivan:password@127.0.0.1:%d", o1.Port) + nc := natsConnect(t, s1Url) + defer nc.Close() + natsPub(t, nc, "foo", []byte("hello")) + natsFlush(t, nc) + + checkCount := func(t *testing.T, c *client, expected int) { + t.Helper() + c.mu.Lock() + out := c.outMsgs + c.mu.Unlock() + if int(out) != expected { + t.Fatalf("Expected %d message(s) to be sent over, got %v", expected, out) + } + } + // No message should be sent + gwcb := s1.getOutboundGatewayConnection("B") + checkCount(t, gwcb, 0) + + // Check that we are in interest-only mode, but in this case, since s2 does + // have the account, we should have the account not even present in the map. + checkGWInterestOnlyModeOrNotPresent(t, s1, "B", "$foo", true) + + // Add account to S2 and a client. + s2FooAcc, err := s2.RegisterAccount("$foo") + if err != nil { + t.Fatalf("Error registering account: %v", err) + } + s2.mu.Lock() + s2.users["ivan"] = &User{Account: s2FooAcc, Username: "ivan", Password: "password"} + s2.mu.Unlock() + s2Url := fmt.Sprintf("nats://ivan:password@127.0.0.1:%d", o2.Port) + ncS2 := natsConnect(t, s2Url) + defer ncS2.Close() + natsSubSync(t, ncS2, "asub") + // This time we will have the account in the map and it will be interest-only + checkGWInterestOnlyMode(t, s1, "B", "$foo") + + // Now publish a message, still should not go because the sub is on "asub" + natsPub(t, nc, "foo", []byte("hello")) + natsFlush(t, nc) + checkCount(t, gwcb, 0) + + natsSubSync(t, ncS2, "foo") + natsFlush(t, ncS2) + + checkGWInterestOnlyModeInterestOn(t, s1, "B", "$foo", "foo") + + // Publish on foo + natsPub(t, nc, "foo", []byte("hello")) + natsFlush(t, nc) + checkCount(t, gwcb, 1) +} diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 6d8862e0..486f0c51 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -22,6 +22,8 @@ import ( "errors" "fmt" "math/rand" + "net/http" + "net/http/httptest" "reflect" "strings" "sync" @@ -29,7 +31,9 @@ import ( "testing" "time" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" ) func TestJetStreamSuperClusterMetaPlacement(t *testing.T) { @@ -413,6 +417,9 @@ func TestJetStreamSuperClusterPeerReassign(t *testing.T) { } func TestJetStreamSuperClusterInterestOnlyMode(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + template := ` listen: 127.0.0.1:-1 server_name: %s @@ -3611,3 +3618,210 @@ func TestJetStreamSuperClusterStreamCathupLongRTT(t *testing.T) { return nil }) } + +func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyStaticConfig(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { domain: ngs, max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf: { listen: 127.0.0.1:-1 } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + accounts { + ONE { + users = [ { user: "one", pass: "pwd" } ] + jetstream: enabled + } + TWO { users = [ { user: "two", pass: "pwd" } ] } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } + ` + sc := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 5, 3, + func(serverName, clusterName, storeDir, conf string) string { + sname := serverName[strings.Index(serverName, "-")+1:] + switch sname { + case "S4", "S5": + conf = strings.ReplaceAll(conf, "jetstream: { ", "#jetstream: { ") + default: + conf = strings.ReplaceAll(conf, "leaf: { ", "#leaf: { ") + } + return conf + }, nil) + defer sc.shutdown() + + // Connect our client to a non JS server + c := sc.randomCluster() + var s *Server + for _, as := range c.servers { + if !as.JetStreamEnabled() { + s = as + break + } + } + if s == nil { + t.Fatal("Did not find a non JS server!") + } + nc, js := jsClientConnect(t, s, nats.UserInfo("one", "pwd")) + defer nc.Close() + + // Just create a stream and then make sure that all gateways have switched + // to interest-only mode. + si, err := js.AddStream(&nats.StreamConfig{Name: "interest", Replicas: 3}) + require_NoError(t, err) + + sc.waitOnStreamLeader("ONE", "interest") + + check := func(accName string) { + t.Helper() + for _, c := range sc.clusters { + for _, s := range c.servers { + // Check only JS servers outbound GW connections + if !s.JetStreamEnabled() { + continue + } + opts := s.getOpts() + for _, gw := range opts.Gateway.Gateways { + if gw.Name == opts.Gateway.Name { + continue + } + checkGWInterestOnlyMode(t, s, gw.Name, accName) + } + } + } + } + // Starting v2.9.0, all accounts should be switched to interest-only mode + check("ONE") + check("TWO") + + var gwsa [16]*client + gws := gwsa[:0] + + s = sc.serverByName(si.Cluster.Leader) + // Get the GW outbound connections + s.getOutboundGatewayConnections(&gws) + for _, gwc := range gws { + gwc.mu.Lock() + gwc.nc.Close() + gwc.mu.Unlock() + } + waitForOutboundGateways(t, s, 2, 5*time.Second) + check("ONE") + check("TWO") +} + +func TestJetStreamSuperClusterMixedModeSwitchToInterestOnlyOperatorConfig(t *testing.T) { + kp, _ := nkeys.FromSeed(oSeed) + + skp, _ := nkeys.CreateAccount() + spub, _ := skp.PublicKey() + nac := jwt.NewAccountClaims(spub) + sjwt, err := nac.Encode(kp) + require_NoError(t, err) + + akp, _ := nkeys.CreateAccount() + apub, _ := akp.PublicKey() + nac = jwt.NewAccountClaims(apub) + // Set some limits to enable JS. + nac.Limits.JetStreamLimits.DiskStorage = 1024 * 1024 + nac.Limits.JetStreamLimits.Streams = 10 + ajwt, err := nac.Encode(kp) + require_NoError(t, err) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, spub) { + w.Write([]byte(sjwt)) + } else { + w.Write([]byte(ajwt)) + } + })) + defer ts.Close() + + operator := fmt.Sprintf(` + operator: %s + resolver: URL("%s/ngs/v1/accounts/jwt/") + `, ojwt, ts.URL) + + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { domain: ngs, max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf: { listen: 127.0.0.1:-1 } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + operator + sc := createJetStreamSuperClusterWithTemplateAndModHook(t, tmpl, 5, 3, + func(serverName, clusterName, storeDir, conf string) string { + conf = strings.ReplaceAll(conf, "system_account: \"$SYS\"", fmt.Sprintf("system_account: \"%s\"", spub)) + sname := serverName[strings.Index(serverName, "-")+1:] + switch sname { + case "S4", "S5": + conf = strings.ReplaceAll(conf, "jetstream: { ", "#jetstream: { ") + default: + conf = strings.ReplaceAll(conf, "leaf: { ", "#leaf: { ") + } + return conf + }, nil) + defer sc.shutdown() + + // Connect our client to a non JS server + c := sc.randomCluster() + var s *Server + for _, as := range c.servers { + if !as.JetStreamEnabled() { + s = as + break + } + } + if s == nil { + t.Fatal("Did not find a non JS server!") + } + nc, js := jsClientConnect(t, s, createUserCreds(t, nil, akp)) + defer nc.Close() + + // Just create a stream and then make sure that all gateways have switched + // to interest-only mode. + si, err := js.AddStream(&nats.StreamConfig{Name: "interest", Replicas: 3}) + require_NoError(t, err) + + sc.waitOnStreamLeader(apub, "interest") + + check := func(s *Server) { + opts := s.getOpts() + for _, gw := range opts.Gateway.Gateways { + if gw.Name == opts.Gateway.Name { + continue + } + checkGWInterestOnlyMode(t, s, gw.Name, apub) + } + } + s = sc.serverByName(si.Cluster.Leader) + check(s) + + // Let's cause a leadership change and verify that it still works. + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "interest"), nil, time.Second) + require_NoError(t, err) + sc.waitOnStreamLeader(apub, "interest") + + si, err = js.StreamInfo("interest") + require_NoError(t, err) + s = sc.serverByName(si.Cluster.Leader) + check(s) + + var gwsa [16]*client + gws := gwsa[:0] + // Get the GW outbound connections + s.getOutboundGatewayConnections(&gws) + for _, gwc := range gws { + gwc.mu.Lock() + gwc.nc.Close() + gwc.mu.Unlock() + } + waitForOutboundGateways(t, s, 2, 5*time.Second) + check(s) +} diff --git a/server/monitor_test.go b/server/monitor_test.go index d6f756f0..6b432ee4 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3168,6 +3168,9 @@ func TestMonitorGatewayz(t *testing.T) { } func TestMonitorGatewayzAccounts(t *testing.T) { + GatewayDoNotForceInterestOnlyMode(true) + defer GatewayDoNotForceInterestOnlyMode(false) + resetPreviousHTTPConnections() // Create bunch of Accounts diff --git a/server/server.go b/server/server.go index 3515daf4..d94968f3 100644 --- a/server/server.go +++ b/server/server.go @@ -98,6 +98,7 @@ type Info struct { GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do GatewayCmdPayload []byte `json:"gateway_cmd_payload,omitempty"` // Command payload when needed GatewayNRP bool `json:"gateway_nrp,omitempty"` // Uses new $GNR. prefix for mapped replies + GatewayIOM bool `json:"gateway_iom,omitempty"` // Indicate that all accounts will be switched to InterestOnly mode "right away" // LeafNode Specific LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to. @@ -1413,6 +1414,12 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account { // Can not have server lock here. s.mu.Unlock() s.registerSystemImports(acc) + // Starting 2.9.0, we are phasing out the optimistic mode, so change + // the account to interest-only mode (except if instructed not to do + // it in some tests). + if s.gateway.enabled && !gwDoNotForceInterestOnlyMode { + s.switchAccountToInterestMode(acc.GetName()) + } s.mu.Lock() return nil diff --git a/test/gateway_test.go b/test/gateway_test.go index 74a1e61a..91e2cb49 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -82,6 +82,9 @@ func expectNumberOfProtos(t *testing.T, expFn expectFun, proto *regexp.Regexp, e } func TestGatewayAccountInterest(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") sb := runGatewayServer(ob) defer sb.Shutdown() @@ -159,6 +162,9 @@ func TestGatewayAccountInterest(t *testing.T) { } func TestGatewaySubjectInterest(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") fooAcc := server.NewAccount("$foo") ob.Accounts = []*server.Account{fooAcc} @@ -301,6 +307,9 @@ func TestGatewaySubjectInterest(t *testing.T) { } func TestGatewayQueue(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") fooAcc := server.NewAccount("$foo") ob.Accounts = []*server.Account{fooAcc} @@ -403,6 +412,9 @@ func TestGatewayQueue(t *testing.T) { } func TestGatewaySendAllSubs(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") sb := runGatewayServer(ob) defer sb.Shutdown() @@ -496,6 +508,9 @@ func TestGatewayNoPanicOnBadProtocol(t *testing.T) { } func TestGatewayNoAccUnsubAfterQSub(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") sb := runGatewayServer(ob) defer sb.Shutdown() @@ -531,6 +546,9 @@ func TestGatewayNoAccUnsubAfterQSub(t *testing.T) { } func TestGatewayErrorOnRSentFromOutbound(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") sb := runGatewayServer(ob) defer sb.Shutdown() @@ -675,6 +693,9 @@ func TestGatewayTLSMixedIPAndDNS(t *testing.T) { } func TestGatewayAdvertiseInCluster(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + ob1 := testDefaultOptionsForGateway("B") ob1.Cluster.Name = "B" ob1.Cluster.Host = "127.0.0.1" @@ -770,6 +791,9 @@ func TestGatewayAuthTimeout(t *testing.T) { } func TestGatewayFirstPingGoesAfterConnect(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + ob := testDefaultOptionsForGateway("B") // For this test, we want the first ping to NOT be disabled. ob.DisableShortFirstPing = false diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 688007c9..abd84fd7 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -2665,6 +2665,9 @@ func TestLeafNodeSwitchGatewayToInterestModeOnly(t *testing.T) { // route connections to simulate. func TestLeafNodeResetsMSGProto(t *testing.T) { + server.GatewayDoNotForceInterestOnlyMode(true) + defer server.GatewayDoNotForceInterestOnlyMode(false) + opts := testDefaultOptionsForLeafNodes() opts.Cluster.Name = "xyz" opts.Cluster.Host = opts.Host