mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Reworked gateway processing of RS+ and RS-
Invoke updateInterestForAccountOnGateway() as a defer after all locks have been released. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1545,8 +1545,22 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error())
|
||||
}
|
||||
|
||||
var e *outsie
|
||||
var useSl, newe bool
|
||||
var (
|
||||
e *outsie
|
||||
useSl bool
|
||||
newe bool
|
||||
callUpdate bool
|
||||
srv *Server
|
||||
sub *subscription
|
||||
)
|
||||
|
||||
// Possibly execute this on exit after all locks have been released.
|
||||
// If callUpdate is true, srv and sub will be not nil.
|
||||
defer func() {
|
||||
if callUpdate {
|
||||
srv.updateInterestForAccountOnGateway(accName, sub, -1)
|
||||
}
|
||||
}()
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@@ -1554,6 +1568,8 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
ei, _ := c.gw.outsim.Load(accName)
|
||||
if ei != nil {
|
||||
e = ei.(*outsie)
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
// If there is an entry, for plain sub we need
|
||||
// to know if we should store the sub
|
||||
useSl = queue != nil || e.mode != modeOptimistic
|
||||
@@ -1566,14 +1582,13 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
e = &outsie{ni: make(map[string]struct{}), sl: NewSublist()}
|
||||
newe = true
|
||||
}
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
// This is when a sub or queue sub is supposed to be in
|
||||
// the sublist. Look for it and remove.
|
||||
if useSl {
|
||||
var ok bool
|
||||
key := arg
|
||||
// m[string()] does not cause mem allocation
|
||||
sub, ok := c.subs[string(key)]
|
||||
sub, ok = c.subs[string(key)]
|
||||
// if RS- for a sub that we don't have, just ignore.
|
||||
if !ok {
|
||||
return nil
|
||||
@@ -1591,10 +1606,9 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
|
||||
c.gw.outsim.Delete(accName)
|
||||
}
|
||||
}
|
||||
srv := c.srv
|
||||
c.mu.Unlock()
|
||||
srv.updateInterestForAccountOnGateway(accName, sub, -1)
|
||||
c.mu.Lock()
|
||||
// We are going to call updateInterestForAccountOnGateway on exit.
|
||||
srv = c.srv
|
||||
callUpdate = true
|
||||
} else {
|
||||
e.ni[string(subject)] = struct{}{}
|
||||
if newe {
|
||||
@@ -1633,8 +1647,22 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
accName := args[0]
|
||||
subject := args[1]
|
||||
|
||||
var e *outsie
|
||||
var useSl, newe bool
|
||||
var (
|
||||
e *outsie
|
||||
useSl bool
|
||||
newe bool
|
||||
callUpdate bool
|
||||
srv *Server
|
||||
sub *subscription
|
||||
)
|
||||
|
||||
// Possibly execute this on exit after all locks have been released.
|
||||
// If callUpdate is true, srv and sub will be not nil.
|
||||
defer func() {
|
||||
if callUpdate {
|
||||
srv.updateInterestForAccountOnGateway(string(accName), sub, 1)
|
||||
}
|
||||
}()
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@@ -1646,6 +1674,8 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
// getting many RS- from the remote..
|
||||
if ei != nil {
|
||||
e = ei.(*outsie)
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
useSl = queue != nil || e.mode != modeOptimistic
|
||||
} else if queue == nil {
|
||||
return nil
|
||||
@@ -1654,8 +1684,6 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
newe = true
|
||||
useSl = true
|
||||
}
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
if useSl {
|
||||
var key []byte
|
||||
// We store remote subs by account/subject[/queue].
|
||||
@@ -1685,7 +1713,7 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
csubject = make([]byte, len(subject))
|
||||
copy(csubject, subject)
|
||||
}
|
||||
sub := &subscription{client: c, subject: csubject, queue: cqueue, qw: qw}
|
||||
sub = &subscription{client: c, subject: csubject, queue: cqueue, qw: qw}
|
||||
// If no error inserting in sublist...
|
||||
if e.sl.Insert(sub) == nil {
|
||||
c.subs[string(key)] = sub
|
||||
@@ -1697,10 +1725,9 @@ func (c *client) processGatewayRSub(arg []byte) error {
|
||||
c.gw.outsim.Store(string(accName), e)
|
||||
}
|
||||
}
|
||||
srv := c.srv
|
||||
c.mu.Unlock()
|
||||
srv.updateInterestForAccountOnGateway(string(accName), sub, 1)
|
||||
c.mu.Lock()
|
||||
// We are going to call updateInterestForAccountOnGateway on exit.
|
||||
srv = c.srv
|
||||
callUpdate = true
|
||||
} else {
|
||||
subj := string(subject)
|
||||
// If this is an RS+ for a wc subject, then
|
||||
|
||||
Reference in New Issue
Block a user