mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #1533 from nats-io/silent_sub_loss_in_cluster
Adding unit test demonstrating silent subscription loss
This commit is contained in:
@@ -1691,6 +1691,163 @@ func TestAccountURLResolverNoFetchOnReload(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
t.SkipNow()
|
||||
const subj = "subscription.loss.test"
|
||||
// Create Operator
|
||||
op, _ := nkeys.CreateOperator()
|
||||
opub, _ := op.PublicKey()
|
||||
oc := jwt.NewOperatorClaims(opub)
|
||||
oc.Subject = opub
|
||||
ojwt, err := oc.Encode(op)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating operator JWT: %v", err)
|
||||
}
|
||||
// Create Account
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
apub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(apub)
|
||||
ajwt, err := nac.Encode(op)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Create User
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
uSeed, _ := nkp.Seed()
|
||||
upub, _ := nkp.PublicKey()
|
||||
nuc := newJWTTestUserClaims()
|
||||
nuc.Subject = upub
|
||||
uJwt, err := nuc.Encode(akp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
creds := genCredsFile(t, uJwt, uSeed)
|
||||
defer os.Remove(creds)
|
||||
// Simulate an account server that drops the first request to /B/acc
|
||||
chanA := make(chan string, 4)
|
||||
defer close(chanA)
|
||||
chanB := make(chan string, 4)
|
||||
defer close(chanB)
|
||||
bCnt := int32(0)
|
||||
errCnt := int32(1)
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/A/" {
|
||||
// Server A startup
|
||||
w.Write([]byte(ajwt))
|
||||
chanA <- "/A/"
|
||||
} else if r.URL.Path == "/B/" {
|
||||
// Server B startup
|
||||
w.Write([]byte(ajwt))
|
||||
chanB <- "/B/"
|
||||
} else if r.URL.Path == "/A/"+apub {
|
||||
// First Client connecting to Server A
|
||||
w.Write([]byte(ajwt))
|
||||
chanA <- "/A/"
|
||||
} else if r.URL.Path == "/B/"+apub {
|
||||
// This will be observed twice:
|
||||
// on a route connect (which is not responded to) then once the second client logs on
|
||||
if atomic.AddInt32(&bCnt, 1) > errCnt {
|
||||
w.Write([]byte(ajwt))
|
||||
}
|
||||
chanB <- "/B/"
|
||||
} else {
|
||||
t.Fatal("not expected")
|
||||
}
|
||||
}))
|
||||
defer ts.Close()
|
||||
// Create seed server A
|
||||
confA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
operator: %s
|
||||
resolver: URL("%s/A/")
|
||||
cluster {
|
||||
name: clust
|
||||
listen: -1
|
||||
}
|
||||
`, ojwt, ts.URL)))
|
||||
defer os.Remove(confA)
|
||||
sA, _ := RunServerWithConfig(confA)
|
||||
defer sA.Shutdown()
|
||||
// Create Server B
|
||||
confB := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
operator: %s
|
||||
resolver: URL("%s/B/")
|
||||
cluster {
|
||||
name: clust
|
||||
listen: -1
|
||||
routes [
|
||||
nats-route://localhost:%d
|
||||
]
|
||||
}
|
||||
`, ojwt, ts.URL, sA.opts.Cluster.Port)))
|
||||
defer os.Remove(confB)
|
||||
sB, _ := RunServerWithConfig(confB)
|
||||
defer sB.Shutdown()
|
||||
// startup cluster
|
||||
checkClusterFormed(t, sA, sB)
|
||||
// Both server observed one fetch on startup
|
||||
if path := <-chanA; path != "/A/" {
|
||||
t.Fatal("Expect one fetch on A", path)
|
||||
}
|
||||
if path := <-chanB; path != "/B/" {
|
||||
t.Fatal("Expect one fetch on B", path)
|
||||
}
|
||||
// Create first client, directly connects to A
|
||||
urlA := fmt.Sprintf("nats://%s:%d", sA.opts.Host, sA.opts.Port)
|
||||
ncA, err := nats.Connect(urlA, nats.UserCredentials(creds),
|
||||
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}
|
||||
}),
|
||||
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to connect, got %v %s", err, urlA)
|
||||
}
|
||||
defer ncA.Close()
|
||||
// create a test subscription
|
||||
subA, err := ncA.SubscribeSync(subj)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error during subscribe: %v", err)
|
||||
}
|
||||
defer subA.Unsubscribe()
|
||||
// Both server observed one fetch for the connecting client
|
||||
if path := <-chanA; path != "/A/" {
|
||||
t.Fatal("Expect one fetch on A")
|
||||
}
|
||||
if path := <-chanB; path != "/B/" {
|
||||
t.Fatal("Expect one fetch on B")
|
||||
}
|
||||
// create second client, directly connect to B
|
||||
urlB := fmt.Sprintf("nats://%s:%d", sB.opts.Host, sB.opts.Port)
|
||||
ncB, err := nats.Connect(urlB, nats.UserCredentials(creds))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to connect, got %v %s", err, urlB)
|
||||
}
|
||||
defer ncB.Close()
|
||||
// Because the first request was a failure, a second request is observed as a result of connect to B
|
||||
for i := int32(0); i < errCnt; i++ {
|
||||
if path := <-chanB; path != "/B/" {
|
||||
t.Fatal("Expect one fetch on B")
|
||||
}
|
||||
}
|
||||
checkClusterFormed(t, sA, sB)
|
||||
// the route subscription was lost due to the failed fetch
|
||||
// Now we test if some recover mechanism is in play
|
||||
checkSubInterest(t, sB, apub, subj, 10*time.Second) // Will fail as a result of this issue
|
||||
if err := ncB.Publish(subj, []byte("msg")); err != nil {
|
||||
t.Fatalf("Expected to publish %v", err)
|
||||
}
|
||||
// expect the message from B to flow to A
|
||||
if _, err := subA.NextMsg(10 * time.Second); err != nil {
|
||||
t.Fatalf("Expected to receive a message %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJWTUserSigningKey(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user