From a23ef5b740115346a4e70b107edab35103440d41 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 30 Nov 2018 19:08:15 -0700 Subject: [PATCH] Switch to send-all-subs when number of RS- gets too big Signed-off-by: Ivan Kozlovic --- server/client.go | 22 +- server/gateway.go | 1029 +++++++++++++++++++++++++++------------- server/gateway_test.go | 509 +++++++++++++++++--- server/parser.go | 4 +- server/route.go | 54 +-- server/server.go | 9 +- test/gateway_test.go | 45 +- 7 files changed, 1215 insertions(+), 457 deletions(-) diff --git a/server/client.go b/server/client.go index 9289aa26..2ebece09 100644 --- a/server/client.go +++ b/server/client.go @@ -629,6 +629,9 @@ func (c *client) readLoop() { s := c.srv c.in.rsz = startBufSize defer s.grWG.Done() + if c.gw != nil && c.gw.outbound { + defer c.gatewayOutboundConnectionReadLoopExited() + } c.mu.Unlock() if nc == nil { @@ -1489,6 +1492,7 @@ func (c *client) processSub(argo []byte) (err error) { sid := string(sub.sid) acc := c.acc + updateGWs := false // Subscribe here. if c.subs[sid] == nil { c.subs[sid] = sub @@ -1496,6 +1500,8 @@ func (c *client) processSub(argo []byte) (err error) { err = acc.sl.Insert(sub) if err != nil { delete(c.subs, sid) + } else { + updateGWs = c.srv.gateway.enabled } } } @@ -1515,6 +1521,9 @@ func (c *client) processSub(argo []byte) (err error) { // If we are routing and this is a local sub, add to the route map for the associated account. if kind == CLIENT || kind == SYSTEM { c.srv.updateRouteSubscriptionMap(acc, sub, 1) + if updateGWs { + c.srv.gatewayUpdateSubInterest(acc.Name, sub, 1) + } } } @@ -1739,6 +1748,7 @@ func (c *client) processUnsub(arg []byte) error { kind := c.kind var acc *Account + updateGWs := false if sub, ok = c.subs[string(sid)]; ok { acc = c.acc if max > 0 { @@ -1748,6 +1758,7 @@ func (c *client) processUnsub(arg []byte) error { sub.max = 0 unsub = true } + updateGWs = c.srv.gateway.enabled } c.mu.Unlock() @@ -1759,6 +1770,9 @@ func (c *client) processUnsub(arg []byte) error { c.unsubscribe(acc, sub, false) if acc != nil && kind == CLIENT || kind == SYSTEM { c.srv.updateRouteSubscriptionMap(acc, sub, -1) + if updateGWs { + c.srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + } } } @@ -2089,7 +2103,8 @@ func (c *client) processInboundClientMsg(msg []byte) { // mode and the remote gateways have queue subs, then we need to // collect the queue groups this message was sent to so that we // exclude them when sending to gateways. - if len(r.qsubs) > 0 && c.srv.gateway.enabled && atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { + if len(r.qsubs) > 0 && c.srv.gateway.enabled && + atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { qnames = &queues } c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames) @@ -2619,7 +2634,7 @@ func (c *client) closeConnection(reason ClosedState) { // Remove clients subscriptions. if kind == CLIENT { acc.sl.RemoveBatch(subs) - } else { + } else if kind == ROUTER { go c.removeRemoteSubs() } @@ -2653,6 +2668,9 @@ func (c *client) closeConnection(reason ClosedState) { qsubs[key] = &qsub{sub, 1} } } + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + } } // Process any qsubs here. for _, esub := range qsubs { diff --git a/server/gateway.go b/server/gateway.go index dadd8504..45220164 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -29,38 +29,65 @@ import ( ) const ( - defaultSolicitGatewaysDelay = time.Second - defaultGatewayConnectDelay = time.Second - defaultGatewayReconnectDelay = time.Second + defaultSolicitGatewaysDelay = time.Second + defaultGatewayConnectDelay = time.Second + defaultGatewayReconnectDelay = time.Second + defaultGatewayMaxRUnsubBeforeSwitch = 1000 ) var ( - gatewayConnectDelay = defaultGatewayConnectDelay - gatewayReconnectDelay = defaultGatewayReconnectDelay + gatewayConnectDelay = defaultGatewayConnectDelay + gatewayReconnectDelay = defaultGatewayReconnectDelay + gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch ) const ( - gatewayCmdGossip byte = 1 + gatewayCmdGossip byte = 1 + gatewayCmdAllSubsStart byte = 2 + gatewayCmdAllSubsComplete byte = 3 ) type srvGateway struct { totalQSubs int64 //total number of queue subs in all remote gateways (used with atomic operations) sync.RWMutex - enabled bool // Immutable, true if both a name and port are configured - name string // Name of the Gateway on this server - out map[string]*client // outbound gateways - outo []*client // outbound gateways maintained in an order suitable for sending msgs (currently based on RTT) - in map[uint64]*client // inbound gateways - remotes map[string]*gatewayCfg // Config of remote gateways - URLs map[string]struct{} // Set of all Gateway URLs in the cluster - URL string // This server gateway URL (after possible random port is resolved) - info *Info // Gateway Info protocol - infoJSON []byte // Marshal'ed Info protocol - defPerms *GatewayPermissions // Default permissions (when accepting an unknown remote gateway) - rqs map[string]*subscription // Map of remote queue subscriptions (key is account+subject+queue) - runknown bool // Rejects unknown (not configured) gateway connections - resolver netResolver // Used to resolve host name before calling net.Dial() - sqbsz int // Max buffer size to send queue subs protocol. Used for testing. + enabled bool // Immutable, true if both a name and port are configured + name string // Name of the Gateway on this server + out map[string]*client // outbound gateways + outo []*client // outbound gateways maintained in an order suitable for sending msgs (currently based on RTT) + in map[uint64]*client // inbound gateways + remotes map[string]*gatewayCfg // Config of remote gateways + URLs map[string]struct{} // Set of all Gateway URLs in the cluster + URL string // This server gateway URL (after possible random port is resolved) + info *Info // Gateway Info protocol + infoJSON []byte // Marshal'ed Info protocol + defPerms *GatewayPermissions // Default permissions (when accepting an unknown remote gateway) + runknown bool // Rejects unknown (not configured) gateway connections + + // We maintain the interest of subjects and queues per account. + // For a given account, entries in the map could be something like this: + // foo.bar {n: 3} // 3 subs on foo.bar + // foo.> {n: 6} // 6 subs on foo.> + // foo bar {n: 1, q: true} // 1 qsub on foo, queue bar + // foo baz {n: 3, q: true} // 3 qsubs on foo, queue baz + pasi struct { + // Protect map since accessed from different go-routine and avoid + // possible race resulting in RS+ being sent before RS- resulting + // in incorrect interest suppression. + // Will use while sending QSubs (on GW connection accept) and when + // switching to the send-all-subs mode. + sync.Mutex + m map[string]map[string]*sitally + } + + resolver netResolver // Used to resolve host name before calling net.Dial() + sqbsz int // Max buffer size to send queue subs protocol. Used for testing. +} + +// Subject interest tally. Also indicates if the key in the map is a +// queue or not. +type sitally struct { + n int32 // number of subscriptions directly matching + q bool // indicate that this is a queue } type gatewayCfg struct { @@ -74,14 +101,44 @@ type gatewayCfg struct { // Struct for client's gateway related fields type gateway struct { - name string - outbound bool - cfg *gatewayCfg - connectURL *url.URL // Needed when sending CONNECT after receiving INFO from remote - infoJSON []byte // Needed when sending INFO after receiving INFO from remote - noInterest sync.Map // For outbound connection, record no-interest for account/subject - sentNoInterest map[string]map[string]struct{} // For inbound connection, record that no-interest was sent for account/subject - qsubsInterest sync.Map // Queue subscriptions interest for this gateway. + name string + outbound bool + cfg *gatewayCfg + connectURL *url.URL // Needed when sending CONNECT after receiving INFO from remote + infoJSON []byte // Needed when sending INFO after receiving INFO from remote + outsim *sync.Map // Per-account subject interest (or no-interest) (outbound conn) + insim map[string]*insie // Per-account subject no-interest sent or send-all-subs mode (inbound conn) +} + +// Outbound subject interest entry. +type outsie struct { + sync.RWMutex + // Indicate that all subs should be stored. This is + // set to true when receiving the command from the + // remote that we are about to receive all its subs. + sas bool + // If not nil, used for no-interest for plain subs. + // If a subject is present in this map, it means that + // the remote is not interested in that subject. + // When we have received the command that says that + // the remote has sent all its subs, this is set to nil. + ni map[string]struct{} + // Contains queue subscriptions when in optimistic mode, + // and all subs when pk is > 0. + sl *Sublist +} + +// Inbound subject interest entry. +// If `ni` is not nil, it stores the subjects for which an +// RS- was sent to the remote gateway. When a subscription +// is created, this is used to know if we need to send +// an RS+ to clear the no-interest in the remote. +// When an account is switched to perfect knowledge (we send +// all subs of an account to the remote), then `ni` is nil and +// when all subs have been sent, `sap` is set to true. +type insie struct { + ni map[string]struct{} // Record if RS- was sent for given subject + sas bool // Send-all-subs mode (always send RS+/-) } // clone returns a deep copy of the RemoteGatewayOpts object @@ -134,13 +191,14 @@ func newGateway(opts *Options) (*srvGateway, error) { in: make(map[uint64]*client), remotes: make(map[string]*gatewayCfg), URLs: make(map[string]struct{}), - rqs: make(map[string]*subscription), resolver: opts.Gateway.resolver, runknown: opts.Gateway.RejectUnknown, } gateway.Lock() defer gateway.Unlock() + gateway.pasi.m = make(map[string]map[string]*sitally) + if gateway.resolver == nil { gateway.resolver = netResolver(net.DefaultResolver) } @@ -491,10 +549,11 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { // the remote's INFO protocol, save the URL we need to connect to. c.gw.connectURL = url c.gw.infoJSON = infoJSON + c.gw.outsim = &sync.Map{} c.Noticef("Creating outbound gateway connection to %q", cfg.Name) } else { - c.gw.sentNoInterest = make(map[string]map[string]struct{}) // Inbound gateway connection + c.gw.insim = make(map[string]*insie) c.Noticef("Processing inbound gateway connection") } @@ -615,6 +674,8 @@ func (c *client) sendGatewayConnect() { // Returns an error to the connection if the CONNECT is not from a gateway // (for instance a client or route connecting to the gateway port), or // if the destination does not match the gateway name of this server. +// +// func (c *client) processGatewayConnect(arg []byte) error { connect := &connectInfo{} if err := json.Unmarshal(arg, connect); err != nil { @@ -659,6 +720,8 @@ func (c *client) processGatewayConnect(arg []byte) error { // // For an inbound gateway, the gateway is simply registered and the info protocol // is saved to be used after processing the CONNECT. +// +// func (c *client) processGatewayInfo(info *Info) { var ( gwName string @@ -678,7 +741,9 @@ func (c *client) processGatewayInfo(info *Info) { } else if isFirstINFO { c.gw.name = info.Gateway } - c.opts.Name = info.ID + if isFirstINFO { + c.opts.Name = info.ID + } c.mu.Unlock() // For an outbound connection... @@ -717,7 +782,9 @@ func (c *client) processGatewayInfo(info *Info) { } // Possibly add URLs that we get from the INFO protocol. - cfg.updateURLs(info.GatewayURLs) + if len(info.GatewayURLs) > 0 { + cfg.updateURLs(info.GatewayURLs) + } // If this is the first INFO, send our connect if isFirstINFO { @@ -738,6 +805,18 @@ func (c *client) processGatewayInfo(info *Info) { c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID) // Now that the outbound gateway is registered, we can remove from temp map. s.removeFromTempClients(cid) + } else if info.GatewayCmd > 0 { + switch info.GatewayCmd { + case gatewayCmdAllSubsStart: + c.gatewayAllSubsReceiveStart(info) + return + case gatewayCmdAllSubsComplete: + c.gatewayAllSubsReceiveComplete(info) + return + default: + s.Warnf("Received unknown command %v from gateway %q", info.GatewayCmd, gwName) + return + } } // Flood local cluster with information about this gateway. @@ -754,8 +833,14 @@ func (c *client) processGatewayInfo(info *Info) { // Now that it is registered, we can remove from temp map. s.removeFromTempClients(cid) - // Send our QSubs - s.sendQueueSubsToGateway(c) + // Send our QSubs, since this may take some time, execute + // in a separate go-routine so that if there is incoming + // data from the otherside, we don't cause a slow consumer + // error. + s.startGoRoutine(func() { + s.sendQueueSubsToGateway(c) + s.grWG.Done() + }) // Initiate outbound connection. This function will behave correctly if // we have already one. @@ -824,110 +909,126 @@ func (s *Server) forwardNewGatewayToLocalCluster(oinfo *Info) { } // Sends queue subscriptions interest to remote gateway. +// This is sent from the inbound side, that is, the side that receives +// messages from the remote's outbound connection. This side is +// the one sending the subscription interest. func (s *Server) sendQueueSubsToGateway(c *client) { - const subsStackSize = 4096 + s.sendSubsToGateway(c, nil) +} + +// Sends all subscriptions for the given account to the remove gateway +// This is sent from the inbound side, that is, the side that receives +// messages from the remote's outbound connection. This side is +// the one sending the subscription interest. +func (s *Server) sendAccountSubsToGateway(c *client, accName []byte) { + s.sendSubsToGateway(c, accName) +} + +// Sends subscriptions to remote gateway. +func (s *Server) sendSubsToGateway(c *client, accountName []byte) { var ( - accsa = [256]*Account{} - accs = accsa[:0] - rqsa = [subsStackSize]*subscription{} - rqs = rqsa[:0] - bufa = [32 * 1024]byte{} - buf = bufa[:0] - epa = [1024]int{} - ep = epa[:0] + bufa = [32 * 1024]byte{} + buf = bufa[:0] + epa = [1024]int{} + ep = epa[:0] ) - // Collect accounts - s.mu.Lock() - for _, acc := range s.accounts { - accs = append(accs, acc) - } - s.mu.Unlock() - // Collect remote queue subs - s.gateway.RLock() - for _, sub := range s.gateway.rqs { - rqs = append(rqs, sub) - } - s.gateway.RUnlock() - // Print debug statement if we get more queue subs - // that we allocate on the stack. - if cap(rqs) > subsStackSize { - s.Debugf("High number of remote queue subscriptions (%v, %v)", len(rqs), cap(rqs)) - } + gw := s.gateway - // Build proto for local subs - for _, acc := range accs { - acc.mu.RLock() - for subAndQueue, rm := range acc.rm { - if rm.qi > 0 { + // This needs to run under this lock for the whole duration + gw.pasi.Lock() + defer gw.pasi.Unlock() + + // Build the protocols + buildProto := func(accName []byte, acc map[string]*sitally, doQueues bool) { + for saq, si := range acc { + if doQueues && si.q || !doQueues && !si.q { buf = append(buf, rSubBytes...) - buf = append(buf, acc.Name...) + buf = append(buf, accName...) buf = append(buf, ' ') - buf = append(buf, subAndQueue...) - buf = append(buf, ' ', '1') + // For queue subs (si.q is true), saq will be + // subject + ' ' + queue, for plain subs, this is + // just the subject. + buf = append(buf, saq...) + if doQueues { + buf = append(buf, ' ', '1') + } buf = append(buf, CR_LF...) ep = append(ep, len(buf)) } } - acc.mu.RUnlock() } - // Now with the remote queue subs - for _, qsub := range rqs { - buf = append(buf, rSubBytes...) - // The remote queue sub sid is account+' '+subject+' '+queue name - buf = append(buf, qsub.sid...) - buf = append(buf, ' ', '1') - buf = append(buf, CR_LF...) - ep = append(ep, len(buf)) - } - - // Send - if len(buf) > 0 { - mbs := s.gateway.sqbsz - mp := int(c.out.mp / 2) - if mbs > mp { - mbs = mp - } - le := 0 - li := 0 - for i := 0; i < len(ep); i++ { - if ep[i]-le > mbs { - var end int - // If single proto is bigger than our max buffer size, - // send anyway. If it reaches a max in queueOutbound, - // that function will close the connection. - if i-li <= 1 { - end = ep[i] - } else { - end = ep[i-1] - i-- - } - c.mu.Lock() - c.queueOutbound(buf[le:end]) - c.flushOutbound() - closed := c.flags.isSet(clearConnection) - c.mu.Unlock() - if closed { - return - } - le = ep[i] - li = i - } - } + // If account is specified... + if accountName != nil { + // Simply send all plain subs (no queues) for this specific account + buildProto(accountName, gw.pasi.m[string(accountName)], false) + // Instruct to send all subs (RS+/-) for this account from now on. c.mu.Lock() - c.queueOutbound(buf[le:]) - c.flushOutbound() - closed := c.flags.isSet(clearConnection) - if !closed { - c.Debugf("Sent queue subscriptions to gateway") + e := c.gw.insim[string(accountName)] + if e != nil { + e.sas = true } c.mu.Unlock() + } else { + // Send queues for all accounts + for accName, acc := range gw.pasi.m { + buildProto([]byte(accName), acc, true) + } } + + // Nothing to send. + if len(buf) == 0 { + return + } + if len(buf) > cap(bufa) { + s.Debugf("Sending subscriptions to %q, buffer size: %v", c.gw.name, len(buf)) + } + // Send + mbs := gw.sqbsz + mp := int(c.out.mp / 2) + if mbs > mp { + mbs = mp + } + le := 0 + li := 0 + for i := 0; i < len(ep); i++ { + if ep[i]-le > mbs { + var end int + // If single proto is bigger than our max buffer size, + // send anyway. If it reaches a max in queueOutbound, + // that function will close the connection. + if i-li <= 1 { + end = ep[i] + } else { + end = ep[i-1] + i-- + } + c.mu.Lock() + c.queueOutbound(buf[le:end]) + c.flushOutbound() + closed := c.flags.isSet(clearConnection) + c.mu.Unlock() + if closed { + return + } + le = ep[i] + li = i + } + } + c.mu.Lock() + c.queueOutbound(buf[le:]) + c.flushOutbound() + closed := c.flags.isSet(clearConnection) + if !closed { + c.Debugf("Sent queue subscriptions to gateway") + } + c.mu.Unlock() } // This is invoked when getting an INFO protocol for gateway on the ROUTER port. // This function will then execute appropriate function based on the command // contained in the protocol. +// func (s *Server) processGatewayInfoFromRoute(info *Info, routeSrvID string, route *client) { switch info.GatewayCmd { case gatewayCmdGossip: @@ -1271,6 +1372,25 @@ func (s *Server) getInboundGatewayConnections(a *[]*client) { s.gateway.RUnlock() } +// Returns the inbound gateway connection for the given gateway name, +// or nil if it does not exist. +func (s *Server) getInboundGatewayConnection(name string) *client { + var c *client + s.gateway.RLock() + for _, gwc := range s.gateway.in { + gwc.mu.Lock() + if gwc.gw.name == name { + c = gwc + } + gwc.mu.Unlock() + if c != nil { + break + } + } + s.gateway.RUnlock() + return c +} + // This is invoked when a gateway connection is closed and the server // is removing this connection from its state. func (s *Server) removeRemoteGatewayConnection(c *client) { @@ -1306,6 +1426,21 @@ func (s *Server) removeRemoteGatewayConnection(c *client) { s.removeFromTempClients(cid) } +// This allows some cleanup with guarantee that readloop has +// exited and so no protocol message is being processed. +func (c *client) gatewayOutboundConnectionReadLoopExited() { + qSubsRemoved := int64(0) + c.mu.Lock() + for _, sub := range c.subs { + if sub.queue != nil { + qSubsRemoved++ + } + } + c.mu.Unlock() + // Update total count of qsubs in remote gateways. + atomic.AddInt64(&c.srv.gateway.totalQSubs, qSubsRemoved*-1) +} + // GatewayAddr returns the net.Addr object for the gateway listener. func (s *Server) GatewayAddr() *net.TCPAddr { s.mu.Lock() @@ -1316,77 +1451,84 @@ func (s *Server) GatewayAddr() *net.TCPAddr { return s.gatewayListener.Addr().(*net.TCPAddr) } -// A- protocol received when sending messages on an account that -// the remote gateway has no interest in. Mark this account +// A- protocol received from the remote after sending messages +// on an account that it has no interest in. Mark this account // with a "no interest" marker to prevent further messages send. +// func (c *client) processGatewayAccountUnsub(accName string) { // Just to indicate activity around "subscriptions" events. c.in.subs++ - c.gw.noInterest.Store(accName, nil) + c.gw.outsim.Store(accName, nil) } -// A+ protocol received by remote gateway if it had previously +// A+ protocol received from remote gateway if it had previously // sent an A-. Clear the "no interest" marker for this account. +// func (c *client) processGatewayAccountSub(accName string) error { // Just to indicate activity around "subscriptions" events. c.in.subs++ - c.gw.noInterest.Delete(accName) + c.gw.outsim.Delete(accName) return nil } -// RS- protocol received when sending messages on an subject -// that the remote gateway has no interest in (but knows about the account). -// Mark this subject with a "no interest" marker to prevent further -// messages send. -func (c *client) processGatewaySubjectUnsub(arg []byte) error { +// RS- protocol received from the remote after sending messages +// on an subject that it has no interest in (but knows about the +// account). Mark this subject with a "no interest" marker to +// prevent further messages send. +// If in perfect knowledge mode or for a queue sub, remove from +// the sublist if present. +// +func (c *client) processGatewayRUnsub(arg []byte) error { accName, subject, queue, err := c.parseUnsubProto(arg) if err != nil { return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error()) } - // Queue subs are treated differently. - if queue != nil { - sli, _ := c.gw.qsubsInterest.Load(accName) - if sli != nil { - sl := sli.(*Sublist) - r := sl.Match(string(subject)) - if len(r.qsubs) > 0 { - for i := 0; i < len(r.qsubs); i++ { - qsubs := r.qsubs[i] - if bytes.Equal(qsubs[0].queue, queue) { - sl.Remove(qsubs[0]) - atomic.AddInt64(&c.srv.gateway.totalQSubs, -1) - if sl.Count() == 0 { - c.gw.qsubsInterest.Delete(accName) - } - } - } + + var e *outsie + var useSl, newe bool + + ei, _ := c.gw.outsim.Load(accName) + if ei != nil { + e = ei.(*outsie) + e.Lock() + defer e.Unlock() + // If there is an entry, for plain sub we need + // to know if we should store the sub + useSl = queue != nil || e.sas + } else if queue != nil { // should not even happen... + return nil + } else { + // Plain sub, assume optimistic sends, create entry. + e = &outsie{ni: make(map[string]struct{}), sl: NewSublist()} + newe = true + } + // This is when a sub or queue sub is supposed to be in + // the sublist. Look for it and remove. + if useSl { + key := arg + // m[string()] does not cause mem allocation + sub, ok := c.subs[string(key)] + // if RS- for a sub that we don't have, either a queue + // or in perfect knowledge mode, just ignore. + if !ok { + return nil + } + if e.sl.Remove(sub) == nil { + delete(c.subs, string(key)) + if queue != nil { + atomic.AddInt64(&c.srv.gateway.totalQSubs, -1) + } + // If last, we can remove the whole entry + // only when not asked to store all subs, and if there + // is no element in the `ni` map. + if e.sl.Count() == 0 && !e.sas && len(e.ni) == 0 { + c.gw.outsim.Delete(accName) } } } else { - // For a given gateway, we will receive the A+,A-,RS+ and RS- - // from the same tcp connection/go routine. So although there - // may be different go-routines doing lookups on this map, - // we are guaranteed that there is no Store/Delete happening - // in parallel. - sni, noInterest := c.gw.noInterest.Load(accName) - // Do we have a no-interest at the account level? If so, - // don't bother setting no-interest on that subject. - if noInterest && sni == nil { - return nil - } - var ( - subjNoInterest *sync.Map - store bool - ) - if sni == nil { - subjNoInterest = &sync.Map{} - store = true - } else { - subjNoInterest = sni.(*sync.Map) - } - subjNoInterest.Store(string(subject), struct{}{}) - if store { - c.gw.noInterest.Store(accName, subjNoInterest) + e.ni[string(subject)] = struct{}{} + if newe { + c.gw.outsim.Store(accName, e) } } return nil @@ -1395,8 +1537,10 @@ func (c *client) processGatewaySubjectUnsub(arg []byte) error { // For plain subs, RS+ protocol received from remote gateway if it // had previously sent a RS-. Clear the "no interest" marker for // this subject (under this account). -// For queue subs, register interest from remote gateway. -func (c *client) processGatewaySubjectSub(arg []byte) error { +// For queue subs, or if in perfect knowledge mode, register interest +// from remote gateway. +// +func (c *client) processGatewayRSub(arg []byte) error { c.traceInOp("RS+", arg) // Indicate activity. @@ -1419,37 +1563,79 @@ func (c *client) processGatewaySubjectSub(arg []byte) error { accName := args[0] subject := args[1] - if queue != nil { - var ( - sl *Sublist - store bool - ) - sli, _ := c.gw.qsubsInterest.Load(string(accName)) - if sli != nil { - sl = sli.(*Sublist) - } else { - sl = NewSublist() - store = true - } - // Copy subject and queue to avoid referencing a possibly - // big underlying buffer. - cbuf := make([]byte, len(subject)+len(queue)) - copy(cbuf, subject) - copy(cbuf[len(subject):], queue) - sub := &subscription{client: c, subject: cbuf[:len(subject)], queue: cbuf[len(subject):], qw: qw} - sl.Insert(sub) - if store { - c.gw.qsubsInterest.Store(string(accName), sl) - } - atomic.AddInt64(&c.srv.gateway.totalQSubs, 1) + var e *outsie + var useSl, newe bool + + ei, _ := c.gw.outsim.Load(string(accName)) + // We should always have an existing entry for plain subs because + // in optimistic mode we would have received RS- first, and + // in full knowledge, we are receiving RS+ for an account after + // getting many RS- from the remote.. + if ei != nil { + e = ei.(*outsie) + e.Lock() + defer e.Unlock() + useSl = queue != nil || e.sas + } else if queue == nil { + return nil } else { - // See remark from processGatewaySubjectUnsub(). - sni, _ := c.gw.noInterest.Load(string(accName)) - // If this account is not even present, or if there is no - // specific subject with no-interest, we are done. - if sni != nil { - subjNoInterest := sni.(*sync.Map) - subjNoInterest.Delete(string(subject)) + e = &outsie{ni: make(map[string]struct{}), sl: NewSublist()} + newe = true + useSl = true + } + if useSl { + var key []byte + // We store remote subs by account/subject[/queue]. + // For queue, remove the trailing weight + if queue != nil { + key = arg[:len(arg)-len(args[3])-1] + } else { + key = arg + } + // If RS+ for a sub that we already have, ignore. + // (m[string()] does not allocate memory) + if _, ok := c.subs[string(key)]; ok { + return nil + } + // new subscription. copy subject (and queue) to + // not reference the underlying possibly big buffer. + var csubject []byte + var cqueue []byte + if queue != nil { + // make single allocation and use different slices + // to point to subject and queue name. + cbuf := make([]byte, len(subject)+1+len(queue)) + copy(cbuf, key[len(accName)+1:]) + csubject = cbuf[:len(subject)] + cqueue = cbuf[len(subject)+1:] + } else { + csubject = make([]byte, len(subject)) + copy(csubject, subject) + } + sub := &subscription{client: c, subject: csubject, queue: cqueue, qw: qw} + // If no error inserting in sublist... + if e.sl.Insert(sub) == nil { + c.subs[string(key)] = sub + if newe { + c.gw.outsim.Store(string(accName), e) + } + if queue != nil { + atomic.AddInt64(&c.srv.gateway.totalQSubs, 1) + } + } + } else { + subj := string(subject) + // If this is an RS+ for a wc subject, then + // remove from the no interest map all subjects + // that are a subset of this wc subject. + if subjectHasWildcard(subj) { + for k := range e.ni { + if subjectIsSubsetMatch(k, subj) { + delete(e.ni, k) + } + } + } else { + delete(e.ni, subj) } } return nil @@ -1459,31 +1645,41 @@ func (c *client) processGatewaySubjectSub(arg []byte) error { // given account/subject (which means, it does not have a registered // no-interest on the account and/or subject) and the sublist result // for queue subscriptions. +// func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { - sni, accountInMap := c.gw.noInterest.Load(acc) - // If there is an entry for this account and sni is nil, + ei, accountInMap := c.gw.outsim.Load(acc) + // If there is an entry for this account and ei is nil, // it means that the remote is not interested at all in // this account and we could not possibly have queue subs. - if accountInMap && sni == nil { + if accountInMap && ei == nil { return false, nil } + // Assume interest if account not in map. + psi := !accountInMap var r *SublistResult - // Get the matches for queue subs. - sli, _ := c.gw.qsubsInterest.Load(acc) - if sli != nil { - sl := sli.(*Sublist) - r = sl.Match(subj) - } - // Represents plain sub interest - psi := true if accountInMap { - // If there is a value, sni is itself a sync.Map for subject - // no-interest (in that account). - subjectNoInterest := sni.(*sync.Map) - if _, subjInMap := subjectNoInterest.Load(subj); subjInMap { - // If subject is present in map, it means a no-interest - // for this subject. - psi = false + // If in map, check for subs interest with sublist. + e := ei.(*outsie) + r = e.sl.Match(subj) + // If there is plain subs returned, we don't have to + // check if we should use the no-interest map because + // it means that we are in perfect knowledge mode. + // Only if there is nothing returned for r.psubs that + // we need to check. + if len(r.psubs) > 0 { + psi = true + } else { + e.RLock() + // We may be in transition to perfect knowledge mode + // (e.sas is true), but until e.ni is nil, use it to + // know if we should suppress interest or not. + if e.ni != nil { + if _, inMap := e.ni[subj]; !inMap { + psi = true + } + } + e.RUnlock() } } return psi, r @@ -1493,6 +1689,7 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { // to remote gateways a "no interest" in the past when receiving messages. // If we did, we send to the remote gateway an A+ protocol (see // processGatewayAccountSub()). +// func (s *Server) endAccountNoInterestForGateways(accName string) { gwsa := [4]*client{} gws := gwsa[:0] @@ -1500,15 +1697,20 @@ func (s *Server) endAccountNoInterestForGateways(accName string) { if len(gws) == 0 { return } - protoa := [256]byte{} - proto := protoa[:0] - proto = append(proto, aSubBytes...) - proto = append(proto, accName...) - proto = append(proto, CR_LF...) + var protoa [256]byte + var proto []byte for _, c := range gws { c.mu.Lock() - if _, noInterest := c.gw.sentNoInterest[accName]; noInterest { - delete(c.gw.sentNoInterest, accName) + // If value in map, it means we sent an A- and need + // to clear and send A+ now. + if _, inMap := c.gw.insim[accName]; inMap { + delete(c.gw.insim, accName) + if proto == nil { + proto = protoa[:0] + proto = append(proto, aSubBytes...) + proto = append(proto, accName...) + proto = append(proto, CR_LF...) + } if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -1518,12 +1720,14 @@ func (s *Server) endAccountNoInterestForGateways(accName string) { } } -// This is invoked when the first plain subscriber on a given subject is created. -// We check if we did send to remote gateways a "no interest" in the past when -// receiving messages on that account/subject. -// If we did, we send to the remote gateway an RS+ protocol. -func (s *Server) endSubjectNoInterestForGateways(accName string, sub *subscription) { - // This is for plain sub only +// This is invoked when registering (or unregistering) the first +// (or last) subscription on a given account/subject. For each +// GWs inbound connections, we will check if we need to send +// the protocol. In optimistic mode we would send an RS+ only +// if we had previously sent an RS-. If we are in the send-all-subs +// mode then the protocol is always sent. +// +func (s *Server) maybeSendSubOrUnsubToGateways(accName string, sub *subscription, added bool) { if sub.queue != nil { return } @@ -1534,38 +1738,50 @@ func (s *Server) endSubjectNoInterestForGateways(accName string, sub *subscripti return } var ( - protoa = [1024]byte{} - subjectsa = [4096]string{} - subject = string(sub.subject) - hasWc = subjectHasWildcard(subject) + protoa [512]byte + proto []byte + subject = string(sub.subject) + hasWc = subjectHasWildcard(subject) ) for _, c := range gws { - subjects := subjectsa[:0] - // Check that we had sent a no-interest on this account/subject. + sendProto := false c.mu.Lock() - if sni := c.gw.sentNoInterest[accName]; sni != nil { - // For wildcard subjects, we look for the subjects - // this subscription is matching. - if hasWc { - // Existing no-interest subject - for enis := range sni { - if subjectIsSubsetMatch(enis, subject) { - delete(sni, enis) - subjects = append(subjects, enis) + e := c.gw.insim[accName] + if e != nil { + // If there is a map, need to check if we had sent no-interest. + if e.ni != nil { + // For wildcard subjects, we will remove from our no-interest + // map, all subjects that are a subset of this wc subject, but we + // still send the wc subject and let the remote do its own cleanup. + if hasWc { + for enis := range e.ni { + if subjectIsSubsetMatch(enis, subject) { + delete(e.ni, enis) + sendProto = true + } } + } else if _, noInterest := e.ni[subject]; noInterest { + delete(e.ni, subject) + sendProto = true } - } else if _, noInterest := sni[string(sub.subject)]; noInterest { - delete(sni, subject) - subjects = append(subjects, subject) + } else if e.sas { + // We are in the mode where we always send RS+/- protocols. + sendProto = true } } - for _, subj := range subjects { - proto := protoa[:0] - proto = append(proto, rSubBytes...) - proto = append(proto, accName...) - proto = append(proto, ' ') - proto = append(proto, subj...) - proto = append(proto, CR_LF...) + if sendProto { + if proto == nil { + proto = protoa[:0] + if added { + proto = append(proto, rSubBytes...) + } else { + proto = append(proto, rUnsubBytes...) + } + proto = append(proto, accName...) + proto = append(proto, ' ') + proto = append(proto, sub.subject...) + proto = append(proto, CR_LF...) + } if c.trace { c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) } @@ -1575,17 +1791,13 @@ func (s *Server) endSubjectNoInterestForGateways(accName string, sub *subscripti } } -// This is invoked when a queue subscription is registered. -func (s *Server) sendQueueSubToGateways(accName string, sub *subscription, isRemote bool) { - if sub.queue == nil { +// This is invoked when the first (or last) queue subscription on a +// given subject/group is registered (or unregistered). Sent to all +// inbound gateways. +func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscription, added bool) { + if qsub.queue == nil { return } - // Keep track only of remote queue subs... - if isRemote { - s.gateway.Lock() - s.gateway.rqs[string(sub.sid)] = sub - s.gateway.Unlock() - } gwsa := [4]*client{} gws := gwsa[:0] @@ -1594,17 +1806,22 @@ func (s *Server) sendQueueSubToGateways(accName string, sub *subscription, isRem return } - protoa := [512]byte{} - proto := protoa[:0] + var protoa [512]byte + var proto []byte for _, c := range gws { - if len(proto) == 0 { - proto = append(proto, rSubBytes...) + if proto == nil { + proto = protoa[:0] + if added { + proto = append(proto, rSubBytes...) + } else { + proto = append(proto, rUnsubBytes...) + } proto = append(proto, accName...) proto = append(proto, ' ') - proto = append(proto, sub.subject...) - if sub.queue != nil { - proto = append(proto, ' ') - proto = append(proto, sub.queue...) + proto = append(proto, qsub.subject...) + proto = append(proto, ' ') + proto = append(proto, qsub.queue...) + if added { // For now, just use 1 for the weight proto = append(proto, ' ', '1') } @@ -1619,35 +1836,67 @@ func (s *Server) sendQueueSubToGateways(accName string, sub *subscription, isRem } } -func (s *Server) sendQueueUnsubToGateways(accName string, qsub *subscription, isRemote bool) { - // If this is a remote queue sub, need to remove from remote queue subs map - if isRemote { - s.gateway.Lock() - delete(s.gateway.rqs, string(qsub.sid)) - s.gateway.Unlock() - } - gwsa := [4]*client{} - gws := gwsa[:0] - s.getInboundGatewayConnections(&gws) - if len(gws) == 0 { - return - } - protoa := [512]byte{} - proto := protoa[:0] - proto = append(proto, rUnsubBytes...) - proto = append(proto, accName...) - proto = append(proto, ' ') - proto = append(proto, qsub.subject...) - proto = append(proto, ' ') - proto = append(proto, qsub.queue...) - proto = append(proto, CR_LF...) - for _, c := range gws { - c.mu.Lock() - if c.trace { - c.traceOutOp("", proto[:len(proto)-LEN_CR_LF]) +// This is invoked when a (queue) subscription is added/removed locally +// or in our cluster. We use ref counting to know when to update +// the inbound gateways. +// +func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, change int32) { + var ( + keya [1024]byte + key = keya[:0] + entry *sitally + isNew bool + ) + + s.gateway.pasi.Lock() + defer s.gateway.pasi.Unlock() + + accMap := s.gateway.pasi.m + + // First see if we have the account + st := accMap[accName] + if st == nil { + // Ignore remove of something we don't have + if change < 0 { + return + } + st = make(map[string]*sitally) + accMap[accName] = st + isNew = true + } + // Lookup: build the key as subject[+' '+queue] + key = append(key, sub.subject...) + if sub.queue != nil { + key = append(key, ' ') + key = append(key, sub.queue...) + } + if !isNew { + entry = st[string(key)] + } + first := false + last := false + if entry == nil { + // Ignore remove of something we don't have + if change < 0 { + return + } + entry = &sitally{n: 1, q: sub.queue != nil} + st[string(key)] = entry + first = true + } else { + entry.n += change + if entry.n <= 0 { + delete(st, string(key)) + last = true + } + } + if first || last { + if entry.q { + s.sendQueueSubOrUnsubToGateways(accName, sub, first) + } else { + s.maybeSendSubOrUnsubToGateways(accName, sub, first) } - c.sendProto(proto, true) - c.mu.Unlock() } } @@ -1655,6 +1904,7 @@ func (s *Server) sendQueueUnsubToGateways(accName string, qsub *subscription, is // that message is not sent to a given gateway if for instance // it is known that this gateway has no interest in account or // subject, etc.. +// func (c *client) sendMsgToGateways(msg []byte, qgroups [][]byte) { gwsa := [4]*client{} gws := gwsa[:0] @@ -1732,6 +1982,11 @@ func (c *client) sendMsgToGateways(msg []byte, qgroups [][]byte) { } } +// Process a message coming from a remote gateway. Send to any sub/qsub +// in our cluster that is matching. When receiving a message for an +// account or subject for which there is no interest in this cluster +// an A-/RS- protocol may be send back. +// func (c *client) processInboundGatewayMsg(msg []byte) { // Update statistics c.in.msgs++ @@ -1756,15 +2011,12 @@ func (c *client) processInboundGatewayMsg(msg []byte) { c.Debugf("Unknown account %q for routed message on subject: %q", c.pa.account, c.pa.subject) // Send A- only once... c.mu.Lock() - if _, sent := c.gw.sentNoInterest[string(c.pa.account)]; !sent { - // Send an A- protocol, but keep track that we have sent a "no interest" - // for that account, so that if later this account gets registered, - // we need to send an A+ to this remote gateway. - c.gw.sentNoInterest[string(c.pa.account)] = nil - var ( - protoa = [256]byte{} - proto = protoa[:0] - ) + if _, sent := c.gw.insim[string(c.pa.account)]; !sent { + // Add a nil value to indicate that we have sent an A- + // so that we know to send A+ when/if account gets registered. + c.gw.insim[string(c.pa.account)] = nil + var protoa [256]byte + proto := protoa[:0] proto = append(proto, aUnsubBytes...) proto = append(proto, c.pa.account...) if c.trace { @@ -1782,23 +2034,32 @@ func (c *client) processInboundGatewayMsg(msg []byte) { c.checkForImportServices(acc, msg) } - // If there is no interest on plain subs, possibly send an RS-. + // If there is no interest on plain subs, possibly send an RS-, + // even if there is qsubs interest. if len(r.psubs) == 0 { sendProto := false - // Send an RS- protocol, but keep track that we have said "no interest" - // for that account/subject, so that if later there is a subscription on - // this subject, we need to send an R+ to remote gateways. c.mu.Lock() - sni := c.gw.sentNoInterest[string(c.pa.account)] - if sni == nil { - sni = make(map[string]struct{}) - sni[string(c.pa.subject)] = struct{}{} - c.gw.sentNoInterest[string(c.pa.account)] = sni + // Send an RS- protocol if not already done and only if + // not in the send-all-subs mode. + e := c.gw.insim[string(c.pa.account)] + if e == nil { + e = &insie{ni: make(map[string]struct{})} + e.ni[string(c.pa.subject)] = struct{}{} + c.gw.insim[string(c.pa.account)] = e sendProto = true - } else { - if _, alreadySent := sni[string(c.pa.subject)]; !alreadySent { - sni[string(c.pa.subject)] = struct{}{} - sendProto = true + } else if e.ni != nil { + // If we are not in send-all-subs mode, check if we + // have already sent an RS- + if _, alreadySent := e.ni[string(c.pa.subject)]; !alreadySent { + // TODO(ik): pick some threshold as to when + // we need to switch mode + if len(e.ni) > gatewayMaxRUnsubBeforeSwitch { + // If too many RS-, switch to all-subs-mode. + c.gatewaySwitchAccountToSendAllSubs(e) + } else { + e.ni[string(c.pa.subject)] = struct{}{} + sendProto = true + } } } if sendProto { @@ -1817,8 +2078,6 @@ func (c *client) processInboundGatewayMsg(msg []byte) { c.sendProto(proto, true) } c.mu.Unlock() - // If we have no qsubs interest, or we have but queue filter is empty, - // then we are done. if len(r.qsubs) == 0 || len(c.pa.queues) == 0 { return } @@ -1849,3 +2108,121 @@ func (c *client) processInboundGatewayMsg(msg []byte) { c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil) } + +// Indicates that the remote which we are sending messages to +// has decided to send us all its subs interest so that we +// stop doing optimistic sends. +// +func (c *client) gatewayAllSubsReceiveStart(info *Info) { + account := getAccountFromGatewayCommand(c, info, "start") + if account == "" { + return + } + // Since the remote would send us this start command + // only after sending us too many RS- for this account, + // we should always have an entry here. + // TODO(ik): Should we close connection with protocol violation + // error if that happens? + ei, _ := c.gw.outsim.Load(account) + if ei != nil { + e := ei.(*outsie) + // Would not even need locking here since this is + // checked only from this go routine, but it's a + // one-time event, so... + e.Lock() + e.sas = true + e.Unlock() + } +} + +// Indicates that the remote has finished sending all its +// subscriptions and we should now not send unless we know +// there is explicit interest. +// +func (c *client) gatewayAllSubsReceiveComplete(info *Info) { + account := getAccountFromGatewayCommand(c, info, "complete") + if account == "" { + return + } + // Done receiving all subs from remote. Set the `ni` + // map to nil so that gatewayInterest() no longer + // uses it. + ei, _ := c.gw.outsim.Load(string(account)) + if ei != nil { + e := ei.(*outsie) + // Needs locking here since `ni` is checked by + // many go-routines calling gatewayInterest() + e.Lock() + e.ni = nil + e.Unlock() + } +} + +// small helper to get the account name from the INFO command. +func getAccountFromGatewayCommand(c *client, info *Info, cmd string) string { + if info.GatewayCmdPayload == nil { + c.sendErrAndErr(fmt.Sprintf("Account absent from receive-all-subscriptions-%s command", cmd)) + c.closeConnection(ProtocolViolation) + return "" + } + return string(info.GatewayCmdPayload) +} + +// Switch to send-all-subs mode for the given gateway and account. +// This is invoked when processing an inbound message and we +// reach a point where we had to send a lot of RS- for this +// account. We will send an INFO protocol to indicate that we +// start sending all our subs (for this account), followed by +// all subs (RS+) and finally an INFO to indicate the end of it. +// The remote will then send messages only if it finds explicit +// interest in the sublist created based on all RS+ that we just +// sent. +// The client's lock is held on entry. +// +func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie) { + // Set this map to nil so that the no-interest is + // no longer checked. + e.ni = nil + // Capture this since we are passing it to a go-routine. + account := string(c.pa.account) + s := c.srv + + // Function that will create an INFO protocol + // and set proper command. + sendCmd := func(cmd byte, useLock bool) { + // Use bare server info and simply set the + // gateway name and command + info := Info{ + Gateway: s.getGatewayName(), + GatewayCmd: cmd, + GatewayCmdPayload: []byte(account), + } + b, _ := json.Marshal(&info) + infoJSON := []byte(fmt.Sprintf(InfoProto, b)) + if useLock { + c.mu.Lock() + } + c.sendProto(infoJSON, true) + if useLock { + c.mu.Unlock() + } + } + // Send the start command. When remote receives this, + // it may continue to send optimistic messages, but + // it will start to register RS+/RS- in sublist instead + // of noInterest map. + sendCmd(gatewayCmdAllSubsStart, false) + + // Execute this in separate go-routine as to not block + // the readLoop (which may cause the otherside to close + // the connection due to slow consumer) + s.startGoRoutine(func() { + defer s.grWG.Done() + + s.sendAccountSubsToGateway(c, []byte(account)) + // Send the complete command. When the remote receives + // this, it will not send a message unless it has a + // matching sub from us. + sendCmd(gatewayCmdAllSubsComplete, true) + }) +} diff --git a/server/gateway_test.go b/server/gateway_test.go index d1e2642d..dd79300f 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -80,10 +80,13 @@ func checkForRegisteredQSubInterest(t *testing.T, s *Server, gwName, acc, subj s checkFor(t, timeout, 15*time.Millisecond, func() error { count := 0 c := s.getOutboundGatewayConnection(gwName) - qsi, _ := c.gw.qsubsInterest.Load(acc) - if qsi != nil { - qsl := qsi.(*Sublist) - count = int(qsl.Count()) + ei, _ := c.gw.outsim.Load(acc) + if ei != nil { + sl := ei.(*outsie).sl + r := sl.Match(subj) + for _, qsubs := range r.qsubs { + count += len(qsubs) + } } if count == expected { return nil @@ -92,6 +95,30 @@ func checkForRegisteredQSubInterest(t *testing.T, s *Server, gwName, acc, subj s }) } +func checkForNoInterest(t *testing.T, c *client, account, subject string, expectNoInterest bool, timeout time.Duration) { + t.Helper() + checkFor(t, timeout, 15*time.Millisecond, func() error { + ei, _ := c.gw.outsim.Load(account) + if ei == nil { + return fmt.Errorf("Did not receive subject no-interest") + } + e := ei.(*outsie) + e.RLock() + _, inMap := e.ni[subject] + e.RUnlock() + if expectNoInterest { + if inMap { + return nil + } + return fmt.Errorf("Did not receive subject no-interest on %q", subject) + } + if inMap { + return fmt.Errorf("No-interest on subject %q was not cleared", subject) + } + return nil + }) +} + func waitCh(t *testing.T, ch chan bool, errTxt string) { t.Helper() select { @@ -1165,7 +1192,7 @@ func TestGatewayAccountInterest(t *testing.T) { // S2 should have sent a protocol indicating no interest. checkFor(t, time.Second, 15*time.Millisecond, func() error { - if _, inMap := gwcb.gw.noInterest.Load("$foo"); !inMap { + if _, inMap := gwcb.gw.outsim.Load("$foo"); !inMap { return fmt.Errorf("Did not receive account no interest") } return nil @@ -1181,7 +1208,7 @@ func TestGatewayAccountInterest(t *testing.T) { // Add account to S2, this should clear the no interest for that account. s2.RegisterAccount("$foo") checkFor(t, time.Second, 15*time.Millisecond, func() error { - if _, inMap := gwcb.gw.noInterest.Load("$foo"); inMap { + if _, inMap := gwcb.gw.outsim.Load("$foo"); inMap { return fmt.Errorf("NoInterest has not been cleared") } return nil @@ -1249,27 +1276,11 @@ func TestGatewaySubjectInterest(t *testing.T) { checkCount(t, gwcb, 1) // S2 should have sent a protocol indicating no subject interest. - checkForNoInterest := func(t *testing.T, subject string, expectNoInterest bool) { + checkNoInterest := func(t *testing.T, subject string, expectedNoInterest bool) { t.Helper() - checkFor(t, time.Second, 15*time.Millisecond, func() error { - sni, _ := gwcb.gw.noInterest.Load("$foo") - if sni == nil { - return fmt.Errorf("Did not receive subject no-interest") - } - _, subjNoInterest := sni.(*sync.Map).Load(subject) - if expectNoInterest { - if subjNoInterest { - return nil - } - return fmt.Errorf("Did not receive subject no-interest on %q", subject) - } - if subjNoInterest { - return fmt.Errorf("No-interest on subject %q was not cleared", subject) - } - return nil - }) + checkForNoInterest(t, gwcb, "$foo", subject, expectedNoInterest, 2*time.Second) } - checkForNoInterest(t, "foo", true) + checkNoInterest(t, "foo", true) // Second send should not go through to B natsPub(t, nc, "foo", []byte("hello")) natsFlush(t, nc) @@ -1288,7 +1299,7 @@ func TestGatewaySubjectInterest(t *testing.T) { checkExpectedSubs(t, 0, s1) // This should clear the no interest for this subject - checkForNoInterest(t, "foo", false) + checkNoInterest(t, "foo", false) // Third send should go to B natsPub(t, nc, "foo", []byte("hello")) natsFlush(t, nc) @@ -1307,7 +1318,7 @@ func TestGatewaySubjectInterest(t *testing.T) { natsFlush(t, nc) checkCount(t, gwcb, 3) - checkForNoInterest(t, "foo", true) + checkNoInterest(t, "foo", true) // Send one more time and now it should not go to B natsPub(t, nc, "foo", []byte("hello")) @@ -1320,7 +1331,7 @@ func TestGatewaySubjectInterest(t *testing.T) { checkCount(t, gwcb, 4) // But now we should have receives an RS- on bar. - checkForNoInterest(t, "bar", true) + checkNoInterest(t, "bar", true) // Check that wildcards are supported. Create a subscription on '*' on B. // This should clear the no-interest on both "foo" and "bar" @@ -1328,8 +1339,8 @@ func TestGatewaySubjectInterest(t *testing.T) { natsFlush(t, ncb) checkExpectedSubs(t, 1, s2) checkExpectedSubs(t, 0, s1) - checkForNoInterest(t, "foo", false) - checkForNoInterest(t, "bar", false) + checkNoInterest(t, "foo", false) + checkNoInterest(t, "bar", false) // Publish on message on foo and one on bar and they should go. natsPub(t, nc, "foo", []byte("hello")) natsPub(t, nc, "bar", []byte("hello")) @@ -1351,7 +1362,7 @@ func TestGatewaySubjectInterest(t *testing.T) { natsFlush(t, nc) checkCount(t, gwcb, 1) - checkForNoInterest(t, "foo", true) + checkNoInterest(t, "foo", true) natsPub(t, nc, "foo", []byte("hello")) natsFlush(t, nc) @@ -1386,7 +1397,7 @@ func TestGatewaySubjectInterest(t *testing.T) { checkExpectedSubs(t, 1, s2, s2bis) // Check that subject no-interest on A was cleared. - checkForNoInterest(t, "foo", false) + checkNoInterest(t, "foo", false) // Now publish. Remember, s1 has outbound gateway to s2, and s2 does not // have a local subscription and has previously sent a no-interest on "foo". @@ -1710,13 +1721,14 @@ func TestGatewayQueueSub(t *testing.T) { // gwcB should have the qsubs interest map empty now. checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - gwcB.mu.Lock() - _, exists := gwcB.gw.qsubsInterest.Load(globalAccountName) - gwcB.mu.Unlock() - if exists { - return fmt.Errorf("Qsub interest for account should have been removed") + ei, _ := gwcB.gw.outsim.Load(globalAccountName) + if ei != nil { + sl := ei.(*outsie).sl + if sl.Count() == 0 { + return nil + } } - return nil + return fmt.Errorf("Qsub interest for account should have been removed") }) // Reset counters @@ -1730,17 +1742,34 @@ func TestGatewayQueueSub(t *testing.T) { } func TestGatewayTotalQSubs(t *testing.T) { - o2 := testDefaultOptionsForGateway("B") - s2 := runGatewayServer(o2) - defer s2.Shutdown() + ob1 := testDefaultOptionsForGateway("B") + sb1 := runGatewayServer(ob1) + defer sb1.Shutdown() - s2Url := fmt.Sprintf("nats://127.0.0.1:%d", o2.Port) - subnc := natsConnect(t, s2Url) - defer subnc.Close() + ob2 := testDefaultOptionsForGateway("B") + ob2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", sb1.ClusterAddr().Port)) + sb2 := runGatewayServer(ob2) + defer sb2.Shutdown() - o1 := testGatewayOptionsFromToWithServers(t, "A", "B", s2) - s1 := runGatewayServer(o1) - defer s1.Shutdown() + checkClusterFormed(t, sb1, sb2) + + sb1URL := fmt.Sprintf("nats://%s:%d", ob1.Host, ob1.Port) + ncb1 := natsConnect(t, sb1URL, nats.ReconnectWait(50*time.Millisecond)) + defer ncb1.Close() + + sb2URL := fmt.Sprintf("nats://%s:%d", ob2.Host, ob2.Port) + ncb2 := natsConnect(t, sb2URL, nats.ReconnectWait(50*time.Millisecond)) + defer ncb2.Close() + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb1) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + waitForOutboundGateways(t, sa, 1, 2*time.Second) + waitForOutboundGateways(t, sb1, 1, 2*time.Second) + waitForOutboundGateways(t, sb2, 1, 2*time.Second) + waitForInboundGateways(t, sa, 2, 2*time.Second) + waitForInboundGateways(t, sb1, 1, 2*time.Second) checkTotalQSubs := func(t *testing.T, s *Server, expected int) { t.Helper() @@ -1752,18 +1781,61 @@ func TestGatewayTotalQSubs(t *testing.T) { }) } - qsub1 := natsQueueSub(t, subnc, "foo", "bar", func(_ *nats.Msg) {}) - checkTotalQSubs(t, s1, 1) - qsub2 := natsQueueSub(t, subnc, "foo", "baz", func(_ *nats.Msg) {}) - checkTotalQSubs(t, s1, 2) - qsub3 := natsQueueSub(t, subnc, "bar", "bar", func(_ *nats.Msg) {}) - checkTotalQSubs(t, s1, 3) - natsUnsub(t, qsub1) - checkTotalQSubs(t, s1, 2) + cb := func(_ *nats.Msg) {} + + natsQueueSub(t, ncb1, "foo", "bar", cb) + checkTotalQSubs(t, sa, 1) + qsub2 := natsQueueSub(t, ncb1, "foo", "baz", cb) + checkTotalQSubs(t, sa, 2) + qsub3 := natsQueueSub(t, ncb1, "foo", "baz", cb) + checkTotalQSubs(t, sa, 2) + + // Shutdown sb1, there should be a failover from clients + // to sb2. sb2 will then send the queue subs to sa. + sb1.Shutdown() + + checkClientsCount(t, sb2, 2) + checkExpectedSubs(t, 3, sb2) + + waitForOutboundGateways(t, sa, 1, 2*time.Second) + waitForOutboundGateways(t, sb2, 1, 2*time.Second) + waitForInboundGateways(t, sa, 1, 2*time.Second) + waitForInboundGateways(t, sb2, 1, 2*time.Second) + + // When sb1 is shutdown, the total qsubs on sa should fall + // down to 0, but will be updated as soon as sa and sb2 + // connect to each other. So instead we will verify by + // making sure that the count is 2 instead of 4 if there + // was a bug. + // (note that there are 2 qsubs on same group, so only + // 1 RS+ would have been sent for that group) + checkTotalQSubs(t, sa, 2) + + // Restart sb1 + sb1 = runGatewayServer(ob1) + defer sb1.Shutdown() + + checkClusterFormed(t, sb1, sb2) + + waitForOutboundGateways(t, sa, 1, 2*time.Second) + waitForOutboundGateways(t, sb1, 1, 2*time.Second) + waitForOutboundGateways(t, sb2, 1, 2*time.Second) + waitForInboundGateways(t, sa, 2, 2*time.Second) + waitForInboundGateways(t, sb1, 0, 2*time.Second) + waitForInboundGateways(t, sb2, 1, 2*time.Second) + + // Now start unsubscribing. Start with one of the duplicate + // and check that count stays same. natsUnsub(t, qsub3) - checkTotalQSubs(t, s1, 1) + checkTotalQSubs(t, sa, 2) + // Now the other, which would cause an RS- natsUnsub(t, qsub2) - checkTotalQSubs(t, s1, 0) + checkTotalQSubs(t, sa, 1) + // Now test that if connections are closed, things are updated + // properly. + ncb1.Close() + ncb2.Close() + checkTotalQSubs(t, sa, 0) } func TestGatewaySendQSubsOnGatewayConnect(t *testing.T) { @@ -1868,10 +1940,16 @@ func TestGatewaySendRemoteQSubs(t *testing.T) { // Wait for the no interest to be received by A checkFor(t, time.Second, 15*time.Millisecond, func() error { gw := sa.getOutboundGatewayConnection("B").gw - if ni, _ := gw.noInterest.Load(globalAccountName); ni == nil { - return fmt.Errorf("No-interest still not registered") + ei, _ := gw.outsim.Load(globalAccountName) + if ei != nil { + e := ei.(*outsie) + e.RLock() + defer e.RUnlock() + if _, inMap := e.ni["foo"]; inMap { + return nil + } } - return nil + return fmt.Errorf("No-interest still not registered") }) // Unsubscribe 1 qsub @@ -1892,13 +1970,17 @@ func TestGatewaySendRemoteQSubs(t *testing.T) { // No more subs now on both sb1 and sb2 checkExpectedSubs(t, 0, sb1, sb2) - // Server sb1 should not have qsub in its rqs map + // Server sb1 should not have qsub in its sub interest map checkFor(t, time.Second, 15*time.Millisecond, func() error { - sb1.gateway.RLock() - entry := sb1.gateway.rqs[fmt.Sprintf("%s foo bar", globalAccountName)] - sb1.gateway.RUnlock() + var entry *sitally + sb1.gateway.pasi.Lock() + asim := sb1.gateway.pasi.m[globalAccountName] + if asim != nil { + entry = asim["foo bar"] + } + sb1.gateway.pasi.Unlock() if entry != nil { - return fmt.Errorf("Map rqs should not have an entry, got %#v", entry) + return fmt.Errorf("Map should not have an entry, got %#v", entry) } return nil }) @@ -1906,10 +1988,14 @@ func TestGatewaySendRemoteQSubs(t *testing.T) { // Let's wait for A to receive the unsubscribe checkFor(t, time.Second, 15*time.Millisecond, func() error { gw := sa.getOutboundGatewayConnection("B").gw - if sli, _ := gw.qsubsInterest.Load(globalAccountName); sli != nil { - return fmt.Errorf("Interest still present") + ei, _ := gw.outsim.Load(globalAccountName) + if ei != nil { + sl := ei.(*outsie).sl + if sl.Count() == 0 { + return nil + } } - return nil + return fmt.Errorf("Interest still present") }) // Now send a message, it won't be sent because A received an RS- @@ -1937,10 +2023,10 @@ func TestGatewaySendRemoteQSubs(t *testing.T) { // Check qsubs interest should be empty checkFor(t, time.Second, 15*time.Millisecond, func() error { gw := sa.getOutboundGatewayConnection("B").gw - if sli, _ := gw.qsubsInterest.Load(globalAccountName); sli != nil { - return fmt.Errorf("Interest still present") + if ei, _ := gw.outsim.Load(globalAccountName); ei == nil { + return nil } - return nil + return fmt.Errorf("Interest still present") }) } @@ -2093,12 +2179,7 @@ func TestGatewayComplexSetup(t *testing.T) { natsFlush(t, ncsb2) checkExpectedSubs(t, 1, sb2) - // TODO(ik): When server sa1 that has inbound from sb1 gets a local qsub on foo - bar, - // it sends it, but when sa1 gets a remote qsub on foo - bar, it send it too. - // We lack the optimization to suppress the remote sending RS+ since we already have - // sent for local. So expected count here is 2. If we later optimize, use "1" here - // instead. - checkForRegisteredQSubInterest(t, sb1, "A", globalAccountName, "foo", 2, time.Second) + checkForRegisteredQSubInterest(t, sb1, "A", globalAccountName, "foo", 1, time.Second) // Publish all messages. The queue sub on cluster B should receive all // messages. @@ -2487,8 +2568,11 @@ func TestGatewaySendQSubsBufSize(t *testing.T) { // Make sure we have the 4 we expected c := s1.getOutboundGatewayConnection("B") - qsi, _ := c.gw.qsubsInterest.Load(globalAccountName) - sl := qsi.(*Sublist) + ei, _ := c.gw.outsim.Load(globalAccountName) + if ei == nil { + t.Fatalf("No interest found") + } + sl := ei.(*outsie).sl r := sl.Match("foo") if len(r.qsubs) != 4 { t.Fatalf("Expected 4 groups, got %v", len(r.qsubs)) @@ -2577,6 +2661,273 @@ func TestGatewayRaceBetweenPubAndSub(t *testing.T) { wg.Wait() } +func TestGatewaySendAllSubs(t *testing.T) { + gatewayMaxRUnsubBeforeSwitch = 100 + defer func() { gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch }() + + ob := testDefaultOptionsForGateway("B") + sb := runGatewayServer(ob) + defer sb.Shutdown() + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb) + sc := runGatewayServer(oc) + defer sc.Shutdown() + + waitForOutboundGateways(t, sa, 2, time.Second) + waitForOutboundGateways(t, sb, 2, time.Second) + waitForOutboundGateways(t, sc, 2, time.Second) + waitForInboundGateways(t, sa, 2, time.Second) + waitForInboundGateways(t, sb, 2, time.Second) + waitForInboundGateways(t, sc, 2, time.Second) + + // On B, create a sub that will reply to requests + bURL := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) + ncB := natsConnect(t, bURL) + defer ncB.Close() + natsSub(t, ncB, "foo", func(m *nats.Msg) { + ncB.Publish(m.Reply, m.Data) + }) + natsFlush(t, ncB) + checkExpectedSubs(t, 1, sb) + + // On C, have some delayed activity while it receives + // unwanted messages and switches to sendAllSubs. + cURL := fmt.Sprintf("nats://%s:%d", oc.Host, oc.Port) + ncC := natsConnect(t, cURL) + defer ncC.Close() + wg := sync.WaitGroup{} + wg.Add(2) + done := make(chan bool) + consCount := 0 + accsCount := 0 + go func() { + defer wg.Done() + for i := 0; ; i++ { + // Create subs and qsubs on same subject + natsSub(t, ncC, fmt.Sprintf("foo.%d", i+1), func(_ *nats.Msg) {}) + natsQueueSub(t, ncC, fmt.Sprintf("foo.%d", i+1), fmt.Sprintf("bar.%d", i+1), func(_ *nats.Msg) {}) + // Create psubs and qsubs on unique subjects + natsSub(t, ncC, fmt.Sprintf("foox.%d", i+1), func(_ *nats.Msg) {}) + natsQueueSub(t, ncC, fmt.Sprintf("fooy.%d", i+1), fmt.Sprintf("bar.%d", i+1), func(_ *nats.Msg) {}) + consCount += 4 + // Register account + sc.RegisterAccount(fmt.Sprintf("acc.%d", i+1)) + accsCount++ + select { + case <-done: + return + case <-time.After(15 * time.Millisecond): + } + } + }() + + go func() { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + for { + for i := 0; i < 10; i++ { + natsPub(t, ncB, fmt.Sprintf("foo.%d", i+1), []byte("hello")) + } + select { + case <-done: + return + case <-time.After(5 * time.Millisecond): + } + } + }() + + // From A, send a lot of requests. + aURL := fmt.Sprintf("nats://%s:%d", oa.Host, oa.Port) + ncA := natsConnect(t, aURL) + defer ncA.Close() + // TODO(ik): May need to change that if we change the threshold + // for when the switch happens. + total := 300 + for i := 0; i < total; i++ { + req := fmt.Sprintf("%d", i) + reply, err := ncA.Request("foo", []byte(req), time.Second) + if err != nil { + t.Fatalf("Error waiting for reply: %v", err) + } + if string(reply.Data) != req { + t.Fatalf("Expected reply %q, got %q", req, reply.Data) + } + } + close(done) + + // Normally, C would receive a message for each req inbox and + // would send and RS- on that to B, making both have an unbounded + // growth of the no-interest map. But after a certain amount + // of RS-, C will send all its sub for the given account and + // instruct B to send only if there is explicit interest. + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + // Check C inbound connection from B + c := sc.getInboundGatewayConnection("B") + c.mu.Lock() + var switchedMode bool + e := c.gw.insim[globalAccountName] + if e != nil { + switchedMode = e.ni == nil && e.sas + } + c.mu.Unlock() + if !switchedMode { + return fmt.Errorf("C has still not switched mode") + } + switchedMode = false + // Now check B outbound connection to C + c = sb.getOutboundGatewayConnection("C") + ei, _ := c.gw.outsim.Load(globalAccountName) + if ei != nil { + e := ei.(*outsie) + e.RLock() + switchedMode = e.ni == nil && e.sas + e.RUnlock() + } + if !switchedMode { + return fmt.Errorf("C has still not switched mode") + } + return nil + }) + wg.Wait() + + // Check consCount and accsCount on C + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + sc.gateway.pasi.Lock() + scount := len(sc.gateway.pasi.m[globalAccountName]) + sc.gateway.pasi.Unlock() + if scount != consCount { + return fmt.Errorf("Expected %v consumers for global account, got %v", consCount, scount) + } + sc.mu.Lock() + acount := len(sc.accounts) + sc.mu.Unlock() + if acount != accsCount+1 { + return fmt.Errorf("Expected %v accounts, got %v", accsCount+1, acount) + } + return nil + }) + + for i := 0; i < total; i++ { + req := fmt.Sprintf("%d", i) + reply, err := ncA.Request("foo", []byte(req), time.Second) + if err != nil { + t.Fatalf("Error waiting for reply: %v", err) + } + if string(reply.Data) != req { + t.Fatalf("Expected reply %q, got %q", req, reply.Data) + } + } + + // Also, after all that, if a sub is created on C, it should + // be sent to B (but not A). Check that this is the case. + // So first send from A on the subject that we are going to + // use for this new sub. + natsPub(t, ncA, "newsub", []byte("hello")) + natsFlush(t, ncA) + aOutboundToC := sa.getOutboundGatewayConnection("C") + checkForNoInterest(t, aOutboundToC, globalAccountName, "newsub", true, 2*time.Second) + + newSubSub := natsSub(t, ncC, "newsub", func(_ *nats.Msg) {}) + natsFlush(t, ncC) + checkExpectedSubs(t, consCount+1) + checkFor(t, time.Second, 15*time.Millisecond, func() error { + c := sb.getOutboundGatewayConnection("C") + ei, _ := c.gw.outsim.Load(globalAccountName) + if ei != nil { + sl := ei.(*outsie).sl + r := sl.Match("newsub") + if len(r.psubs) == 1 { + return nil + } + } + return fmt.Errorf("Newsub not registered on B") + }) + checkForNoInterest(t, aOutboundToC, globalAccountName, "newsub", false, 2*time.Second) + + natsUnsub(t, newSubSub) + natsFlush(t, ncC) + checkExpectedSubs(t, consCount) + checkFor(t, time.Second, 15*time.Millisecond, func() error { + c := sb.getOutboundGatewayConnection("C") + ei, _ := c.gw.outsim.Load(globalAccountName) + if ei != nil { + sl := ei.(*outsie).sl + r := sl.Match("newsub") + if len(r.psubs) == 0 { + return nil + } + } + return fmt.Errorf("Newsub still registered on B") + }) +} + +func TestGatewaySendAllSubsBadProtocol(t *testing.T) { + ob := testDefaultOptionsForGateway("B") + sb := runGatewayServer(ob) + defer sb.Shutdown() + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + waitForOutboundGateways(t, sa, 1, time.Second) + waitForOutboundGateways(t, sb, 1, time.Second) + waitForInboundGateways(t, sa, 1, time.Second) + waitForInboundGateways(t, sb, 1, time.Second) + + c := sa.getInboundGatewayConnection("B") + // Mock an invalid protocol (account name missing) + info := &Info{ + Gateway: "A", + GatewayCmd: gatewayCmdAllSubsStart, + } + b, _ := json.Marshal(info) + c.mu.Lock() + c.sendProto([]byte(fmt.Sprintf("INFO %s\r\n", b)), true) + c.mu.Unlock() + + orgConn := c + checkFor(t, 3*time.Second, 100*time.Millisecond, func() error { + curConn := sa.getInboundGatewayConnection("B") + if orgConn == curConn { + return fmt.Errorf("Not reconnected") + } + return nil + }) + + waitForOutboundGateways(t, sa, 1, 2*time.Second) + waitForOutboundGateways(t, sb, 1, 2*time.Second) + + // Refresh + c = sa.getInboundGatewayConnection("B") + // Do correct start + info.GatewayCmdPayload = []byte(globalAccountName) + b, _ = json.Marshal(info) + c.mu.Lock() + c.sendProto([]byte(fmt.Sprintf("INFO %s\r\n", b)), true) + c.mu.Unlock() + // But incorrect end. + info.GatewayCmd = gatewayCmdAllSubsComplete + info.GatewayCmdPayload = nil + b, _ = json.Marshal(info) + c.mu.Lock() + c.sendProto([]byte(fmt.Sprintf("INFO %s\r\n", b)), true) + c.mu.Unlock() + + orgConn = c + checkFor(t, 3*time.Second, 100*time.Millisecond, func() error { + curConn := sa.getInboundGatewayConnection("B") + if orgConn == curConn { + return fmt.Errorf("Not reconnected") + } + return nil + }) +} + /* func TestGatewayPermissions(t *testing.T) { bo := testDefaultOptionsForGateway("B") diff --git a/server/parser.go b/server/parser.go index d841ec09..0714373c 100644 --- a/server/parser.go +++ b/server/parser.go @@ -395,7 +395,7 @@ func (c *client) parse(buf []byte) error { case ROUTER: err = c.processRemoteSub(arg) case GATEWAY: - err = c.processGatewaySubjectSub(arg) + err = c.processGatewayRSub(arg) } if err != nil { return err @@ -487,7 +487,7 @@ func (c *client) parse(buf []byte) error { case ROUTER: err = c.processRemoteUnsub(arg) case GATEWAY: - err = c.processGatewaySubjectUnsub(arg) + err = c.processGatewayRUnsub(arg) } if err != nil { return err diff --git a/server/route.go b/server/route.go index 792dd102..1a3f0181 100644 --- a/server/route.go +++ b/server/route.go @@ -657,6 +657,9 @@ func (c *client) removeRemoteSubs() { } else { ase.subs = append(ase.subs, sub) } + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(accountName, sub, -1) + } } // Now remove the subs by batch for each account sublist. @@ -715,7 +718,7 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { return nil } - sendToGWs := false + updateGWs := false // We store local subs by account and subject and optionally queue name. // RS- will have the arg exactly as the key. key := string(arg) @@ -724,13 +727,12 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { delete(c.subs, key) acc.sl.Remove(sub) c.removeReplySubTimeout(sub) - // Send only for queue subs - sendToGWs = srv.gateway.enabled && sub.queue != nil + updateGWs = srv.gateway.enabled } c.mu.Unlock() - if sendToGWs { - srv.sendQueueUnsubToGateways(accountName, sub, true) + if updateGWs { + srv.gatewayUpdateSubInterest(accountName, sub, -1) } if c.opts.Verbose { @@ -809,7 +811,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) { } key := string(sub.sid) osub := c.subs[key] - sendToGWs := false + updateGWs := false if osub == nil { c.subs[string(key)] = sub // Now place into the account sl. @@ -820,7 +822,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) { c.sendErr("Invalid Subscription") return nil } - sendToGWs = srv.gateway.enabled + updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. atomic.StoreInt32(&osub.qw, sub.qw) @@ -831,18 +833,8 @@ func (c *client) processRemoteSub(argo []byte) (err error) { if c.opts.Verbose { c.sendOK() } - if sendToGWs { - // For queue subs, we will send an RS+, but if we are here, we - // know there is a single qsub per account/subject/queue: - // sendToGWs is true only if we did not find that key before. - if sub.queue != nil { - srv.sendQueueSubToGateways(acc.Name, sub, true) - } else { - // For a plain sub, this will send an RS+ to gateways only if - // we had previously sent an RS-. In other words, we don't send - // an RS+ per plain sub. - srv.endSubjectNoInterestForGateways(acc.Name, sub) - } + if updateGWs { + srv.gatewayUpdateSubInterest(acc.Name, sub, 1) } return nil } @@ -1260,11 +1252,8 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del } // We always update for a queue subscriber since we need to send our relative weight. - var ( - entry *rme - ok bool - added bool - ) + var entry *rme + var ok bool // Always update if a queue subscriber. update := qi > 0 @@ -1283,7 +1272,6 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del entry = &rme{qi, delta} rm[string(key)] = entry update = true // Adding for normal sub means update. - added = true } if entry != nil { entryN = entry.n @@ -1308,24 +1296,8 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del // subscribes with a smaller weight. if entryN > 0 { s.broadcastSubscribe(sub) - // Here we want to send RS+ only when going from 0 to 1 - if s.gateway.enabled && added && entryN == 1 { - // Always send for queues - if sub.queue != nil { - s.sendQueueSubToGateways(acc.Name, sub, false) - } else { - // If plain sub, send an RS+ only if we had previously - // sent an RS- - s.endSubjectNoInterestForGateways(acc.Name, sub) - } - } } else { s.broadcastUnSubscribe(sub) - // Last of the queue member of this group, so send to - // gateways. - if s.gateway.enabled && sub.queue != nil { - s.sendQueueUnsubToGateways(acc.Name, sub, false) - } } } diff --git a/server/server.go b/server/server.go index 522066f6..a8fa2f3a 100644 --- a/server/server.go +++ b/server/server.go @@ -71,10 +71,11 @@ type Info struct { Export *SubjectPermission `json:"export,omitempty"` // Gateways Specific - Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO) - GatewayURLs []string `json:"gateway_urls,omitempty"` // Gateway URLs in the originating cluster (sent by gateway's INFO) - GatewayURL string `json:"gateway_url,omitempty"` // Gateway URL on that server (sent by route's INFO) - GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do + Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO) + GatewayURLs []string `json:"gateway_urls,omitempty"` // Gateway URLs in the originating cluster (sent by gateway's INFO) + GatewayURL string `json:"gateway_url,omitempty"` // Gateway URL on that server (sent by route's INFO) + GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do + GatewayCmdPayload []byte `json:"gateway_cmd_payload,omitempty"` // Command payload when needed } // Server is our main struct. diff --git a/test/gateway_test.go b/test/gateway_test.go index 024c4256..01222032 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -249,14 +249,27 @@ func TestGatewaySubjectInterest(t *testing.T) { gAExpect(runsubRe) gASend("RMSG $foo bar 2\r\nok\r\n") gAExpect(runsubRe) - // Now have client create sub on "*", this should cause RS+ on foo - // and RS+ on bar. + // Now have client create sub on "*", this should cause RS+ on * + // The remote will have cleared its no-interest on foo and bar + // and this receiving side is supposed to be doing the same. clientSend("SUB * 5\r\nPING\r\n") clientExpect(pongRe) - expectNumberOfProtos(t, gAExpect, rsubRe, 2) + buf := gAExpect(rsubRe) + if !bytes.Contains(buf, []byte("$foo *")) { + t.Fatalf("Expected RS+ on %q, got %q", "*", buf) + } + // Check that the remote has cleared by sending from the client + // on foo and bar + clientSend("PUB foo 2\r\nok\r\n") + clientExpect(msgRe) + clientSend("PUB bar 2\r\nok\r\n") + clientExpect(msgRe) + // Check that A can send too and does not receive an RS- gASend("RMSG $foo foo 2\r\nok\r\n") + expectNothing(t, gA) clientExpect(msgRe) gASend("RMSG $foo bar 2\r\nok\r\n") + expectNothing(t, gA) clientExpect(msgRe) } @@ -361,3 +374,29 @@ func TestGatewayQueue(t *testing.T) { } expectNothing(t, gA) } + +func TestGatewaySendAllSubs(t *testing.T) { + ob := testDefaultOptionsForGateway("B") + sb := runGatewayServer(ob) + defer sb.Shutdown() + + gA := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port) + defer gA.Close() + + gASend, gAExpect := setupGatewayConn(t, gA, "A", "B") + gASend("PING\r\n") + gAExpect(pongRe) + + // Bombard B with messages on different subjects. + // TODO(ik): Adapt if/when we change the conditions for the + // switch. + for i := 0; i < 10001; i++ { + gASend(fmt.Sprintf("RMSG $G foo.%d 2\r\nok\r\n", i)) + if i < 1000 { + gAExpect(runsubRe) + } + } + // Since B has no sub, we should get 2 INFOs with start/end + // commands. + expectNumberOfProtos(t, gAExpect, infoRe, 2) +}