mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge pull request #139 from nats-io/issue-138
Fix possible non-propagation of subscription interest to routed server.
This commit is contained in:
@@ -50,7 +50,8 @@ type client struct {
|
||||
parseState
|
||||
stats
|
||||
|
||||
route *route
|
||||
route *route
|
||||
sendLocalSubs bool
|
||||
}
|
||||
|
||||
func (c *client) String() (id string) {
|
||||
|
||||
@@ -88,6 +88,18 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
|
||||
b := bytes.Buffer{}
|
||||
|
||||
s.mu.Lock()
|
||||
if s.routes[route.cid] == nil {
|
||||
|
||||
// We are too early, let createRoute call this function.
|
||||
route.mu.Lock()
|
||||
route.sendLocalSubs = true
|
||||
route.mu.Unlock()
|
||||
|
||||
s.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for _, client := range s.clients {
|
||||
client.mu.Lock()
|
||||
subs := client.subs.All()
|
||||
@@ -182,6 +194,17 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
s.routes[c.cid] = c
|
||||
s.mu.Unlock()
|
||||
|
||||
// Now that the route is registered, we need to make sure that
|
||||
// the send of the local subs was not done too early (from
|
||||
// processRouteInfo). If it was, then send again.
|
||||
c.mu.Lock()
|
||||
sendLocalSubs := c.sendLocalSubs
|
||||
c.mu.Unlock()
|
||||
|
||||
if sendLocalSubs {
|
||||
s.sendLocalSubsToRoute(c)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user