Merge pull request #1005 from nats-io/flappers

Fixes for flappers
This commit is contained in:
Derek Collison
2019-05-21 17:52:01 -07:00
committed by GitHub
3 changed files with 16 additions and 4 deletions

View File

@@ -1644,6 +1644,7 @@ func (c *client) processSub(argo []byte) (err error) {
}
updateGWs := false
// Subscribe here.
if c.subs[sid] == nil {
c.subs[sid] = sub

View File

@@ -3659,6 +3659,7 @@ func TestGatewayServiceImportWithQueue(t *testing.T) {
subA = natsQueueSubSync(t, clientA, "test.request", "queue")
natsFlush(t, clientA)
checkForRegisteredQSubInterest(t, sb, "A", "$foo", "test.request", 1, time.Second)
// Send 100 requests from clientB on foo.request,
for i := 0; i < 100; i++ {
@@ -3669,7 +3670,7 @@ func TestGatewayServiceImportWithQueue(t *testing.T) {
// Consume the requests, but don't reply to them...
for i := 0; i < 100; i++ {
if _, err := subA.NextMsg(time.Second); err != nil {
t.Fatalf("subA did not receive request: %v", err)
t.Fatalf("subA did not receive request %d: %v", i+1, err)
}
}
@@ -4402,12 +4403,14 @@ func TestGatewayServiceExportWithWildcards(t *testing.T) {
clientB2 := natsConnect(t, b2URL)
defer clientB2.Close()
natsSubSync(t, clientB2, "not.used")
natsFlush(t, clientB2)
// Make A2 flood B2 with subjects that B2 is not interested in.
for i := 0; i < 1100; i++ {
natsPub(t, clientA, fmt.Sprintf("no.interest.%d", i), []byte("hello"))
}
natsFlush(t, clientA)
// Wait for B2 to switch to interest-only
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
c := sa2.getOutboundGatewayConnection("B")

View File

@@ -883,7 +883,6 @@ func TestJWTAccountImportActivationExpires(t *testing.T) {
}
addAccountToMemResolver(s, fooPub, fooJWT)
acc, _ := s.LookupAccount(fooPub)
if acc == nil {
t.Fatalf("Expected to retrieve the account")
@@ -897,8 +896,14 @@ func TestJWTAccountImportActivationExpires(t *testing.T) {
activation := jwt.NewActivationClaims(barPub)
activation.ImportSubject = "foo"
activation.ImportType = jwt.Stream
activation.IssuedAt = time.Now().Add(-10 * time.Second).Unix()
activation.Expires = time.Now().Add(time.Second).Unix()
now := time.Now()
activation.IssuedAt = now.Add(-10 * time.Second).Unix()
// These are second resolution. So check that we actually expire in a second and adjust if needed.
expires := now.Add(time.Second).Unix()
if expires == now.Unix() {
expires++
}
activation.Expires = expires
actJWT, err := activation.Encode(fooKP)
if err != nil {
t.Fatalf("Error generating activation token: %v", err)
@@ -910,6 +915,9 @@ func TestJWTAccountImportActivationExpires(t *testing.T) {
t.Fatalf("Error generating account JWT: %v", err)
}
addAccountToMemResolver(s, barPub, barJWT)
if acc, _ := s.LookupAccount(barPub); acc == nil {
t.Fatalf("Expected to retrieve the account")
}
expectPong := func(cr *bufio.Reader) {
t.Helper()