Merge pull request #1196 from nats-io/daisy

Allow interest propagation with daisy-chained leafnodes
This commit is contained in:
Derek Collison
2019-11-17 17:46:23 -08:00
committed by GitHub
2 changed files with 68 additions and 6 deletions

View File

@@ -29,6 +29,7 @@ import (
"time"
"github.com/nats-io/jwt"
"github.com/nats-io/nats-server/v2/logger"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
@@ -461,10 +462,15 @@ func TestLeafNodeAndRoutes(t *testing.T) {
// Helper function to check that a leaf node has connected to our server.
func checkLeafNodeConnected(t *testing.T, s *server.Server) {
t.Helper()
checkLeafNodeConnections(t, s, 1)
}
func checkLeafNodeConnections(t *testing.T, s *server.Server, expected int) {
t.Helper()
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
if nln := s.NumLeafNodes(); nln != 1 {
return fmt.Errorf("Expected a connected leafnode for server %q, got none", s.ID())
if nln := s.NumLeafNodes(); nln != expected {
return fmt.Errorf("Expected a connected leafnode for server %q, got %d", s.ID(), nln)
}
return nil
})
@@ -2915,3 +2921,56 @@ func TestLeafNodeNoRaceGeneratingNonce(t *testing.T) {
close(quitCh)
wg.Wait()
}
func runSolicitAndAcceptLeafServer(lso *server.Options) (*server.Server, *server.Options) {
surl := fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port)
o := testDefaultOptionsForLeafNodes()
o.Port = -1
rurl, _ := url.Parse(surl)
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
o.LeafNode.ReconnectInterval = 100 * time.Millisecond
return RunServer(o), o
}
func TestLeafNodeDaisyChain(t *testing.T) {
// To quickly enable trace and debug logging
// doLog, doTrace, doDebug = true, true, true
s1, opts1 := runLeafServer()
defer s1.Shutdown()
s2, opts2 := runSolicitAndAcceptLeafServer(opts1)
defer s2.Shutdown()
checkLeafNodeConnected(t, s1)
s3, _ := runSolicitLeafServer(opts2)
defer s3.Shutdown()
checkLeafNodeConnections(t, s2, 2)
// Make so we can tell the two apart since in same PID.
if doLog {
s1.SetLogger(logger.NewTestLogger("[S-1] - ", false), true, true)
s2.SetLogger(logger.NewTestLogger("[S-2] - ", false), true, true)
s3.SetLogger(logger.NewTestLogger("[S-3] - ", false), true, true)
}
nc1, err := nats.Connect(s1.ClientURL())
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc1.Close()
nc1.Subscribe("ngs.usage", func(msg *nats.Msg) {
msg.Respond([]byte("22 msgs"))
})
nc1.Flush()
nc2, err := nats.Connect(s3.ClientURL())
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc2.Close()
if _, err = nc2.Request("ngs.usage", []byte("1h"), time.Second); err != nil {
t.Fatalf("Expected a response")
}
}