diff --git a/server/jwt_test.go b/server/jwt_test.go index 2046a548..0c7bdb64 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -1691,6 +1691,163 @@ func TestAccountURLResolverNoFetchOnReload(t *testing.T) { } } +func TestAccountURLResolverFetchFailureInCluster(t *testing.T) { + t.SkipNow() + const subj = "subscription.loss.test" + // Create Operator + op, _ := nkeys.CreateOperator() + opub, _ := op.PublicKey() + oc := jwt.NewOperatorClaims(opub) + oc.Subject = opub + ojwt, err := oc.Encode(op) + if err != nil { + t.Fatalf("Error generating operator JWT: %v", err) + } + // Create Account + akp, _ := nkeys.CreateAccount() + apub, _ := akp.PublicKey() + nac := jwt.NewAccountClaims(apub) + ajwt, err := nac.Encode(op) + if err != nil { + t.Fatalf("Error generating account JWT: %v", err) + } + // Create User + nkp, _ := nkeys.CreateUser() + uSeed, _ := nkp.Seed() + upub, _ := nkp.PublicKey() + nuc := newJWTTestUserClaims() + nuc.Subject = upub + uJwt, err := nuc.Encode(akp) + if err != nil { + t.Fatalf("Error generating user JWT: %v", err) + } + creds := genCredsFile(t, uJwt, uSeed) + defer os.Remove(creds) + // Simulate an account server that drops the first request to /B/acc + chanA := make(chan string, 4) + defer close(chanA) + chanB := make(chan string, 4) + defer close(chanB) + bCnt := int32(0) + errCnt := int32(1) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/A/" { + // Server A startup + w.Write([]byte(ajwt)) + chanA <- "/A/" + } else if r.URL.Path == "/B/" { + // Server B startup + w.Write([]byte(ajwt)) + chanB <- "/B/" + } else if r.URL.Path == "/A/"+apub { + // First Client connecting to Server A + w.Write([]byte(ajwt)) + chanA <- "/A/" + } else if r.URL.Path == "/B/"+apub { + // This will be observed twice: + // on a route connect (which is not responded to) then once the second client logs on + if atomic.AddInt32(&bCnt, 1) > errCnt { + w.Write([]byte(ajwt)) + } + chanB <- "/B/" + } else { + t.Fatal("not expected") + } + })) + defer ts.Close() + // Create seed server A + confA := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + operator: %s + resolver: URL("%s/A/") + cluster { + name: clust + listen: -1 + } + `, ojwt, ts.URL))) + defer os.Remove(confA) + sA, _ := RunServerWithConfig(confA) + defer sA.Shutdown() + // Create Server B + confB := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + operator: %s + resolver: URL("%s/B/") + cluster { + name: clust + listen: -1 + routes [ + nats-route://localhost:%d + ] + } + `, ojwt, ts.URL, sA.opts.Cluster.Port))) + defer os.Remove(confB) + sB, _ := RunServerWithConfig(confB) + defer sB.Shutdown() + // startup cluster + checkClusterFormed(t, sA, sB) + // Both server observed one fetch on startup + if path := <-chanA; path != "/A/" { + t.Fatal("Expect one fetch on A", path) + } + if path := <-chanB; path != "/B/" { + t.Fatal("Expect one fetch on B", path) + } + // Create first client, directly connects to A + urlA := fmt.Sprintf("nats://%s:%d", sA.opts.Host, sA.opts.Port) + ncA, err := nats.Connect(urlA, nats.UserCredentials(creds), + nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { + if err != nil { + t.Fatal("error not expected in this test", err) + } + }), + nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + t.Fatal("error not expected in this test", err) + }), + ) + if err != nil { + t.Fatalf("Expected to connect, got %v %s", err, urlA) + } + defer ncA.Close() + // create a test subscription + subA, err := ncA.SubscribeSync(subj) + if err != nil { + t.Fatalf("Expected no error during subscribe: %v", err) + } + defer subA.Unsubscribe() + // Both server observed one fetch for the connecting client + if path := <-chanA; path != "/A/" { + t.Fatal("Expect one fetch on A") + } + if path := <-chanB; path != "/B/" { + t.Fatal("Expect one fetch on B") + } + // create second client, directly connect to B + urlB := fmt.Sprintf("nats://%s:%d", sB.opts.Host, sB.opts.Port) + ncB, err := nats.Connect(urlB, nats.UserCredentials(creds)) + if err != nil { + t.Fatalf("Expected to connect, got %v %s", err, urlB) + } + defer ncB.Close() + // Because the first request was a failure, a second request is observed as a result of connect to B + for i := int32(0); i < errCnt; i++ { + if path := <-chanB; path != "/B/" { + t.Fatal("Expect one fetch on B") + } + } + checkClusterFormed(t, sA, sB) + // the route subscription was lost due to the failed fetch + // Now we test if some recover mechanism is in play + checkSubInterest(t, sB, apub, subj, 10*time.Second) // Will fail as a result of this issue + if err := ncB.Publish(subj, []byte("msg")); err != nil { + t.Fatalf("Expected to publish %v", err) + } + // expect the message from B to flow to A + if _, err := subA.NextMsg(10 * time.Second); err != nil { + t.Fatalf("Expected to receive a message %v", err) + } +} + func TestJWTUserSigningKey(t *testing.T) { s := opTrustBasicSetup() defer s.Shutdown()