From 846b2b5ce01883f6391074b5ab835be1fb4d3528 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 25 Aug 2020 00:20:05 -0400 Subject: [PATCH] Fix flapping test and limit channel recv when there is an error Signed-off-by: Matthias Hanel --- server/jwt_test.go | 57 +++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/server/jwt_test.go b/server/jwt_test.go index 51157174..3aaf123b 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -52,6 +52,15 @@ func init() { } } +func chanRecv(t *testing.T, recvChan <-chan struct{}, limit time.Duration) { + t.Helper() + select { + case <-recvChan: + case <-time.After(limit): + t.Fatal("Should have received from channel") + } +} + func opTrustBasicSetup() *Server { kp, _ := nkeys.FromSeed(oSeed) pub, _ := kp.PublicKey() @@ -1844,8 +1853,8 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) { // startup cluster checkClusterFormed(t, sA, sB) // Both server observed one fetch on startup - <-chanImpA - <-chanImpB + chanRecv(t, chanImpA, 10*time.Second) + chanRecv(t, chanImpB, 10*time.Second) assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB) // Create first client, directly connects to A urlA := fmt.Sprintf("nats://%s:%d", sA.opts.Host, sA.opts.Port) @@ -1870,8 +1879,8 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) { } defer subA.Unsubscribe() // Connect of client triggered a fetch by Server A - <-chanImpA - <-chanExpA + chanRecv(t, chanImpA, 10*time.Second) + chanRecv(t, chanExpA, 10*time.Second) assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB) //time.Sleep(10 * time.Second) // create second client, directly connect to B @@ -1882,8 +1891,8 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) { } defer ncB.Close() // Connect of client triggered a fetch by Server B - <-chanImpB - <-chanExpB + chanRecv(t, chanImpB, 10*time.Second) + chanRecv(t, chanExpB, 10*time.Second) assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB) checkClusterFormed(t, sA, sB) // the route subscription was lost due to the failed fetch @@ -3358,8 +3367,8 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) - <-errChan - <-disconnectChan + chanRecv(t, errChan, 10*time.Second) + chanRecv(t, disconnectChan, 10*time.Second) require_True(t, c.IsReconnecting()) require_False(t, c.IsConnected()) c.Close() @@ -3397,11 +3406,11 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) - <-errChan - <-reConnectChan + chanRecv(t, errChan, 10*time.Second) + chanRecv(t, reConnectChan, 10*time.Second) require_False(t, c.IsReconnecting()) require_True(t, c.IsConnected()) - <-errChan + chanRecv(t, errChan, 10*time.Second) c.Close() }) t.Run("lower jwt expiration overwrites time", func(t *testing.T) { @@ -3431,15 +3440,15 @@ func TestJWTTimeExpiration(t *testing.T) { errChan <- struct{}{} } })) - <-errChan - <-disconnectChan + chanRecv(t, errChan, 10*time.Second) + chanRecv(t, disconnectChan, 10*time.Second) require_True(t, c.IsReconnecting()) require_False(t, c.IsConnected()) c.Close() }) } -func TestJWTSubLimits(t *testing.T) { +func TestJWTLimits(t *testing.T) { doNotExpire := time.Now().AddDate(1, 0, 0) // create account kp, _ := nkeys.CreateAccount() @@ -3470,32 +3479,28 @@ func TestJWTSubLimits(t *testing.T) { } }), ) + defer c.Close() if _, err := c.Subscribe("foo", func(msg *nats.Msg) {}); err != nil { t.Fatalf("couldn't subscribe: %v", err) } if _, err = c.Subscribe("bar", func(msg *nats.Msg) {}); err != nil { t.Fatalf("expected error got: %v", err) } - <-errChan - c.Close() + chanRecv(t, errChan, time.Second) }) t.Run("payload", func(t *testing.T) { creds := createUserWithLimit(t, kp, doNotExpire, func(j *jwt.Limits) { j.Payload = 5 }) defer os.Remove(creds) - c := natsConnect(t, sA.ClientURL(), nats.UserCredentials(creds), - nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { - if e := conn.LastError(); e != nil && strings.Contains(e.Error(), "Maximum Payload Violation") { - errChan <- struct{}{} - } - }), - ) + c := natsConnect(t, sA.ClientURL(), nats.UserCredentials(creds)) + defer c.Close() + if err := c.Flush(); err != nil { + t.Fatalf("flush failed %v", err) + } if err := c.Publish("foo", []byte("world")); err != nil { t.Fatalf("couldn't publish: %v", err) } - if err := c.Publish("foo", []byte("worldX")); err != nil { + if err := c.Publish("foo", []byte("worldX")); err != nats.ErrMaxPayload { t.Fatalf("couldn't publish: %v", err) } - <-errChan - c.Close() }) }