Fixing some flappers

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-05-21 13:49:13 -06:00
committed by Derek Collison
parent f0bfbd176e
commit e976e63099
7 changed files with 76 additions and 30 deletions

View File

@@ -1629,7 +1629,7 @@ func TestGatewayAccountInterest(t *testing.T) {
// S2 and S3 should have sent a protocol indicating no account interest.
checkForAccountNoInterest(t, gwcb, "$foo", true, 2*time.Second)
checkForAccountNoInterest(t, gwcc, "$foo", true, 2*time.Second)
// Second send should not go through to B
// Second send should not go to B nor C.
natsPub(t, nc, "foo", []byte("hello"))
natsFlush(t, nc)
checkCount(t, gwcb, 1)
@@ -1649,12 +1649,9 @@ func TestGatewayAccountInterest(t *testing.T) {
defer ncS2.Close()
// Any subscription should cause s2 to send an A+
natsSubSync(t, ncS2, "asub")
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if _, inMap := gwcb.gw.outsim.Load("$foo"); inMap {
return fmt.Errorf("NoInterest has not been cleared")
}
return nil
})
// Wait for the A+
checkForAccountNoInterest(t, gwcb, "$foo", false, 2*time.Second)
// Now publish a message that should go to B
natsPub(t, nc, "foo", []byte("hello"))
natsFlush(t, nc)
@@ -1662,12 +1659,22 @@ func TestGatewayAccountInterest(t *testing.T) {
// Still won't go to C since there is no sub interest
checkCount(t, gwcc, 1)
// By closing the client from S2, the sole subscription for this
// account will disappear and since S2 sent an A+, it will send
// an A-.
ncS2.Close()
// We should have received a subject no interest for foo
checkForSubjectNoInterest(t, gwcb, "$foo", "foo", true, 2*time.Second)
// Now if we close the client, which removed the sole subscription,
// and publish to a new subject, we should then get an A-
ncS2.Close()
// Wait a bit...
time.Sleep(20 * time.Millisecond)
// Publish on new subject
natsPub(t, nc, "bar", []byte("hello"))
natsFlush(t, nc)
// It should go out to B...
checkCount(t, gwcb, 3)
// But then we should get a A-
checkForAccountNoInterest(t, gwcb, "$foo", true, 2*time.Second)
// Restart C and that should reset the no-interest
s3.Shutdown()
s3 = runGatewayServer(o3)
@@ -1685,7 +1692,7 @@ func TestGatewayAccountInterest(t *testing.T) {
natsPub(t, nc, "foo", []byte("hello"))
natsFlush(t, nc)
// it should not go to B (no sub interest)
checkCount(t, gwcb, 2)
checkCount(t, gwcb, 3)
// but will go to C
checkCount(t, gwcc, 1)
}

View File

@@ -64,5 +64,5 @@ MY_SERVICE = {
# and maximum allowed.
MY_STREAM_SERVICE = {
subscribe = "my.service.req"
allow_responses = {max: 10, ttl: "10ms"}
allow_responses = {max: 10, ttl: "50ms"}
}

View File

@@ -2257,7 +2257,7 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) {
checkSubPending := func(numExpected int) {
t.Helper()
checkFor(t, 200*time.Millisecond, 10*time.Millisecond, func() error {
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != numExpected {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
}
@@ -3747,7 +3747,13 @@ func TestJetStreamConsumerReplayRate(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
now := time.Now()
if now.Sub(last) > 5*time.Millisecond {
// Delivery from AddConsumer starts in a go routine, so be
// more tolerant for the first message.
limit := 5 * time.Millisecond
if i == 0 {
limit = 10 * time.Millisecond
}
if now.Sub(last) > limit {
t.Fatalf("Expected firehose/instant delivery, got message gap of %v", now.Sub(last))
}
last = now

View File

@@ -1392,6 +1392,10 @@ func TestNewRouteServiceExportWithWildcards(t *testing.T) {
sendA("SUB ngs.update.* 1\r\nPING\r\n")
expectA(pongRe)
if err := checkExpectedSubs(2, srvA, srvB); err != nil {
t.Fatal(err.Error())
}
// Now setup client B on srvB who will do a sub from account $bar
// that should map account $foo's foo subject.
clientB := createClientConn(t, optsB.Host, optsB.Port)

View File

@@ -182,7 +182,9 @@ func checkServiceLatency(t *testing.T, sl server.ServiceLatency, start time.Time
serviceTime = serviceTime.Round(time.Millisecond)
startDelta := sl.RequestStart.Sub(start)
if startDelta > 5*time.Millisecond {
// Original test was 5ms, but got GitHub Action failure with "Bad start delta 5.033929ms",
// so be more generous.
if startDelta > 10*time.Millisecond {
t.Fatalf("Bad start delta %v", startDelta)
}
if sl.ServiceLatency < time.Duration(float64(serviceTime)*0.8) {
@@ -422,6 +424,8 @@ func TestServiceLatencyRemoteConnect(t *testing.T) {
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
defer nc.Close()
subsBefore := int(sc.clusters[0].servers[0].NumSubscriptions())
// The service listener.
serviceTime := 25 * time.Millisecond
nc.Subscribe("ngs.usage.*", func(msg *nats.Msg) {
@@ -431,6 +435,11 @@ func TestServiceLatencyRemoteConnect(t *testing.T) {
// Listen for metrics
rsub, _ := nc.SubscribeSync("results")
nc.Flush()
if err := checkExpectedSubs(subsBefore+2, sc.clusters[0].servers...); err != nil {
t.Fatal(err.Error())
}
// Same Cluster Requestor
nc2 := clientConnect(t, sc.clusters[0].opts[2], "bar")
@@ -685,12 +694,12 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) {
numResponders := 5
// Create 10 queue subscribers for the service. Randomly select the server.
// Create 5 queue subscribers for the service. Randomly select the server.
for i := 0; i < numResponders; i++ {
nc := clientConnectWithName(t, selectServer(), "foo", sname(i))
defer nc.Close()
nc.QueueSubscribe("ngs.usage.*", "SERVICE", func(msg *nats.Msg) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Millisecond)
time.Sleep(2*time.Millisecond + time.Duration(rand.Int63n(10))*time.Millisecond)
msg.Respond([]byte("22 msgs"))
})
nc.Flush()
@@ -710,8 +719,9 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) {
defer nc.Close()
results := make(map[string]time.Duration)
serviced := make(map[string]struct{})
var rlock sync.Mutex
ch := make(chan (bool))
ch := make(chan (bool), 1)
received := int32(0)
toSend := int32(100)
@@ -721,20 +731,28 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) {
json.Unmarshal(msg.Data, &sl)
rlock.Lock()
results[sl.Responder.Name] += sl.ServiceLatency
serviced[sl.Responder.Name] = struct{}{}
rlock.Unlock()
if r := atomic.AddInt32(&received, 1); r >= toSend {
ch <- true
select {
case ch <- true:
default:
}
}
})
nc.Flush()
// Send 200 requests from random locations.
for i := 0; i < 200; i++ {
// Send requests from random locations.
for i := 0; i < int(toSend); i++ {
doRequest()
}
// Wait on all results.
<-ch
select {
case <-ch:
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive all results in time")
}
rlock.Lock()
defer rlock.Unlock()
@@ -742,8 +760,12 @@ func TestServiceLatencyWithQueueSubscribersAndNames(t *testing.T) {
// Make sure each total is generally over 10ms
thresh := 10 * time.Millisecond
for i := 0; i < numResponders; i++ {
if rl := results[sname(i)]; rl < thresh {
t.Fatalf("Total for %q is less then threshold: %v vs %v", sname(i), thresh, rl)
sn := sname(i)
if _, ok := serviced[sn]; !ok {
continue
}
if rl := results[sn]; rl < thresh {
t.Fatalf("Total for %q is less then threshold: %v vs %v", sn, thresh, rl)
}
}
}
@@ -1182,6 +1204,9 @@ func TestServiceLatencyFailureReportingMultipleServers(t *testing.T) {
t.Fatalf("Test %q, Expected to get a bad request status [400], got %d", cs.desc, sl.Status)
}
// We wait here for the gateways to report no interest b/c optimistic mode.
time.Sleep(50 * time.Millisecond)
// Proper request, but no responders.
nc2.Request("ngs.usage", []byte("1h"), 10*time.Millisecond)
sl = getMetricResult()

View File

@@ -430,9 +430,13 @@ func TestServiceExportsPruningCleanup(t *testing.T) {
expectedPending := func(expected int) {
t.Helper()
if nre := acc.NumPendingResponses("foo"); nre != expected {
t.Fatalf("Expected %d entries, got %d", expected, nre)
}
// Caller is sleeping a bit before, but avoid flappers.
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if nre := acc.NumPendingResponses("foo"); nre != expected {
return fmt.Errorf("Expected %d entries, got %d", expected, nre)
}
return nil
})
}
// Requestor
@@ -461,7 +465,7 @@ func TestServiceExportsPruningCleanup(t *testing.T) {
nc2.Flush()
expectedPending(10)
time.Sleep(2 * newRt)
time.Sleep(4 * newRt)
expectedPending(0)
}

View File

@@ -309,7 +309,7 @@ func TestUserAuthorizationAllowResponses(t *testing.T) {
matches = msgRe.FindAllSubmatch(expectResult(t, c, msgRe), -1)
checkMsg(t, matches[0], "my.service.req", "1", "resp.bar.11", "2", "ok")
time.Sleep(20 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
sendProto(t, c, "PUB resp.bar.11 2\r\nok\r\n")
expectResult(t, c, errRe)