mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Allow interest propagation with daisy chained leafnodes
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1075,8 +1075,9 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
|
||||
c.mu.Lock()
|
||||
|
||||
// If we are solicited make sure this is a local client.
|
||||
if c.isSolicitedLeafNode() && sub.client.kind != CLIENT {
|
||||
// If we are solicited make sure this is a local client or a non-solicited leaf node
|
||||
skind := sub.client.kind
|
||||
if c.isSolicitedLeafNode() && !(skind == CLIENT || (skind == LEAF && !sub.client.isSolicitedLeafNode())) {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -1279,9 +1280,11 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
|
||||
}
|
||||
// Now check on leafnode updates for other leaf nodes.
|
||||
srv.updateLeafNodes(acc, sub, 1)
|
||||
}
|
||||
// Now check on leafnode updates for other leaf nodes. We understand solicited
|
||||
// and non-solicited state in this call so we will do the right thing.
|
||||
srv.updateLeafNodes(acc, sub, 1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/jwt"
|
||||
"github.com/nats-io/nats-server/v2/logger"
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nkeys"
|
||||
@@ -460,10 +461,15 @@ func TestLeafNodeAndRoutes(t *testing.T) {
|
||||
|
||||
// Helper function to check that a leaf node has connected to our server.
|
||||
func checkLeafNodeConnected(t *testing.T, s *server.Server) {
|
||||
t.Helper()
|
||||
checkLeafNodeConnections(t, s, 1)
|
||||
}
|
||||
|
||||
func checkLeafNodeConnections(t *testing.T, s *server.Server, expected int) {
|
||||
t.Helper()
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
if nln := s.NumLeafNodes(); nln != 1 {
|
||||
return fmt.Errorf("Expected a connected leafnode for server %q, got none", s.ID())
|
||||
if nln := s.NumLeafNodes(); nln != expected {
|
||||
return fmt.Errorf("Expected a connected leafnode for server %q, got %d", s.ID(), nln)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@@ -2902,3 +2908,56 @@ func TestLeafNodeNoRaceGeneratingNonce(t *testing.T) {
|
||||
close(quitCh)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func runSolicitAndAcceptLeafServer(lso *server.Options) (*server.Server, *server.Options) {
|
||||
surl := fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port)
|
||||
o := testDefaultOptionsForLeafNodes()
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(surl)
|
||||
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
|
||||
o.LeafNode.ReconnectInterval = 100 * time.Millisecond
|
||||
return RunServer(o), o
|
||||
}
|
||||
|
||||
func TestLeafNodeDaisyChain(t *testing.T) {
|
||||
// To quickly enable trace and debug logging
|
||||
// doLog, doTrace, doDebug = true, true, true
|
||||
s1, opts1 := runLeafServer()
|
||||
defer s1.Shutdown()
|
||||
|
||||
s2, opts2 := runSolicitAndAcceptLeafServer(opts1)
|
||||
defer s2.Shutdown()
|
||||
checkLeafNodeConnected(t, s1)
|
||||
|
||||
s3, _ := runSolicitLeafServer(opts2)
|
||||
defer s3.Shutdown()
|
||||
checkLeafNodeConnections(t, s2, 2)
|
||||
|
||||
// Make so we can tell the two apart since in same PID.
|
||||
if doLog {
|
||||
s1.SetLogger(logger.NewTestLogger("[S-1] - ", false), true, true)
|
||||
s2.SetLogger(logger.NewTestLogger("[S-2] - ", false), true, true)
|
||||
s3.SetLogger(logger.NewTestLogger("[S-3] - ", false), true, true)
|
||||
}
|
||||
|
||||
nc1, err := nats.Connect(s1.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer nc1.Close()
|
||||
|
||||
nc1.Subscribe("ngs.usage", func(msg *nats.Msg) {
|
||||
msg.Respond([]byte("22 msgs"))
|
||||
})
|
||||
nc1.Flush()
|
||||
|
||||
nc2, err := nats.Connect(s3.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
if _, err = nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
|
||||
t.Fatalf("Expected a response")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user