mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix flapping test and limit channel recv when there is an error
Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -52,6 +52,15 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
func chanRecv(t *testing.T, recvChan <-chan struct{}, limit time.Duration) {
|
||||
t.Helper()
|
||||
select {
|
||||
case <-recvChan:
|
||||
case <-time.After(limit):
|
||||
t.Fatal("Should have received from channel")
|
||||
}
|
||||
}
|
||||
|
||||
func opTrustBasicSetup() *Server {
|
||||
kp, _ := nkeys.FromSeed(oSeed)
|
||||
pub, _ := kp.PublicKey()
|
||||
@@ -1844,8 +1853,8 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
// startup cluster
|
||||
checkClusterFormed(t, sA, sB)
|
||||
// Both server observed one fetch on startup
|
||||
<-chanImpA
|
||||
<-chanImpB
|
||||
chanRecv(t, chanImpA, 10*time.Second)
|
||||
chanRecv(t, chanImpB, 10*time.Second)
|
||||
assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB)
|
||||
// Create first client, directly connects to A
|
||||
urlA := fmt.Sprintf("nats://%s:%d", sA.opts.Host, sA.opts.Port)
|
||||
@@ -1870,8 +1879,8 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
}
|
||||
defer subA.Unsubscribe()
|
||||
// Connect of client triggered a fetch by Server A
|
||||
<-chanImpA
|
||||
<-chanExpA
|
||||
chanRecv(t, chanImpA, 10*time.Second)
|
||||
chanRecv(t, chanExpA, 10*time.Second)
|
||||
assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB)
|
||||
//time.Sleep(10 * time.Second)
|
||||
// create second client, directly connect to B
|
||||
@@ -1882,8 +1891,8 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
}
|
||||
defer ncB.Close()
|
||||
// Connect of client triggered a fetch by Server B
|
||||
<-chanImpB
|
||||
<-chanExpB
|
||||
chanRecv(t, chanImpB, 10*time.Second)
|
||||
chanRecv(t, chanExpB, 10*time.Second)
|
||||
assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB)
|
||||
checkClusterFormed(t, sA, sB)
|
||||
// the route subscription was lost due to the failed fetch
|
||||
@@ -3358,8 +3367,8 @@ func TestJWTTimeExpiration(t *testing.T) {
|
||||
errChan <- struct{}{}
|
||||
}
|
||||
}))
|
||||
<-errChan
|
||||
<-disconnectChan
|
||||
chanRecv(t, errChan, 10*time.Second)
|
||||
chanRecv(t, disconnectChan, 10*time.Second)
|
||||
require_True(t, c.IsReconnecting())
|
||||
require_False(t, c.IsConnected())
|
||||
c.Close()
|
||||
@@ -3397,11 +3406,11 @@ func TestJWTTimeExpiration(t *testing.T) {
|
||||
errChan <- struct{}{}
|
||||
}
|
||||
}))
|
||||
<-errChan
|
||||
<-reConnectChan
|
||||
chanRecv(t, errChan, 10*time.Second)
|
||||
chanRecv(t, reConnectChan, 10*time.Second)
|
||||
require_False(t, c.IsReconnecting())
|
||||
require_True(t, c.IsConnected())
|
||||
<-errChan
|
||||
chanRecv(t, errChan, 10*time.Second)
|
||||
c.Close()
|
||||
})
|
||||
t.Run("lower jwt expiration overwrites time", func(t *testing.T) {
|
||||
@@ -3431,15 +3440,15 @@ func TestJWTTimeExpiration(t *testing.T) {
|
||||
errChan <- struct{}{}
|
||||
}
|
||||
}))
|
||||
<-errChan
|
||||
<-disconnectChan
|
||||
chanRecv(t, errChan, 10*time.Second)
|
||||
chanRecv(t, disconnectChan, 10*time.Second)
|
||||
require_True(t, c.IsReconnecting())
|
||||
require_False(t, c.IsConnected())
|
||||
c.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func TestJWTSubLimits(t *testing.T) {
|
||||
func TestJWTLimits(t *testing.T) {
|
||||
doNotExpire := time.Now().AddDate(1, 0, 0)
|
||||
// create account
|
||||
kp, _ := nkeys.CreateAccount()
|
||||
@@ -3470,32 +3479,28 @@ func TestJWTSubLimits(t *testing.T) {
|
||||
}
|
||||
}),
|
||||
)
|
||||
defer c.Close()
|
||||
if _, err := c.Subscribe("foo", func(msg *nats.Msg) {}); err != nil {
|
||||
t.Fatalf("couldn't subscribe: %v", err)
|
||||
}
|
||||
if _, err = c.Subscribe("bar", func(msg *nats.Msg) {}); err != nil {
|
||||
t.Fatalf("expected error got: %v", err)
|
||||
}
|
||||
<-errChan
|
||||
c.Close()
|
||||
chanRecv(t, errChan, time.Second)
|
||||
})
|
||||
t.Run("payload", func(t *testing.T) {
|
||||
creds := createUserWithLimit(t, kp, doNotExpire, func(j *jwt.Limits) { j.Payload = 5 })
|
||||
defer os.Remove(creds)
|
||||
c := natsConnect(t, sA.ClientURL(), nats.UserCredentials(creds),
|
||||
nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
|
||||
if e := conn.LastError(); e != nil && strings.Contains(e.Error(), "Maximum Payload Violation") {
|
||||
errChan <- struct{}{}
|
||||
}
|
||||
}),
|
||||
)
|
||||
c := natsConnect(t, sA.ClientURL(), nats.UserCredentials(creds))
|
||||
defer c.Close()
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Fatalf("flush failed %v", err)
|
||||
}
|
||||
if err := c.Publish("foo", []byte("world")); err != nil {
|
||||
t.Fatalf("couldn't publish: %v", err)
|
||||
}
|
||||
if err := c.Publish("foo", []byte("worldX")); err != nil {
|
||||
if err := c.Publish("foo", []byte("worldX")); err != nats.ErrMaxPayload {
|
||||
t.Fatalf("couldn't publish: %v", err)
|
||||
}
|
||||
<-errChan
|
||||
c.Close()
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user