diff --git a/server/opts_test.go b/server/opts_test.go index d79d001c..3714c21f 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -26,6 +26,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "testing" "time" @@ -1894,24 +1895,36 @@ func TestParseExport(t *testing.T) { defer nc1.Close() nc2 := connect("u2") defer nc2.Close() - subscribe := func(nc *nats.Conn, msgs int, subj string) (*sync.WaitGroup, *nats.Subscription) { - wg := sync.WaitGroup{} - wg.Add(msgs) - sub, err := nc.Subscribe(subj, func(msg *nats.Msg) { + + // Due to the fact that above CONNECT events are generated and sent from + // a system go routine, it is possible that by the time we create the + // subscriptions below, the interest would exist and messages be sent, + // which was causing issues since wg.Done() was called too many times. + // Add a little delay to minimize risk, but also use counter to decide + // when to call wg.Done() to avoid panic due to negative number. + time.Sleep(100 * time.Millisecond) + + wg := sync.WaitGroup{} + wg.Add(1) + count := int32(0) + // We expect a total of 6 messages + expected := int32(6) + subscribe := func(nc *nats.Conn, subj string) { + t.Helper() + _, err := nc.Subscribe(subj, func(msg *nats.Msg) { if msg.Reply != _EMPTY_ { msg.Respond(msg.Data) } - wg.Done() + if atomic.AddInt32(&count, 1) == expected { + wg.Done() + } }) require_NoError(t, err) nc.Flush() - return &wg, sub } //Subscribe to CONNS events - wg1, s1 := subscribe(nc1, 2, "$SYS.SERVER.ACCOUNT.accI1.CONNS") - defer s1.Unsubscribe() - wg2, s2 := subscribe(nc2, 2, "$SYS.SERVER.ACCOUNT.accI2.CONNS") - defer s2.Unsubscribe() + subscribe(nc1, "$SYS.SERVER.ACCOUNT.accI1.CONNS") + subscribe(nc2, "$SYS.SERVER.ACCOUNT.accI2.CONNS") // Trigger 2 CONNS event nc3 := connect("u1") nc3.Close() @@ -1920,8 +1933,7 @@ func TestParseExport(t *testing.T) { // test service ncE := connect("ue") defer ncE.Close() - wge, se := subscribe(ncE, 2, "foo.*") - defer se.Unsubscribe() + subscribe(ncE, "foo.*") request := func(nc *nats.Conn, msg string) { if m, err := nc.Request("foo", []byte(msg), time.Second); err != nil { t.Fatal("Failed request ", msg, err) @@ -1933,9 +1945,7 @@ func TestParseExport(t *testing.T) { } request(nc1, "1") request(nc2, "1") - for _, wg := range []*sync.WaitGroup{wge, wg1, wg2} { - wg.Wait() - } + wg.Wait() } func TestAccountUsersLoadedProperly(t *testing.T) { diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 14b403ba..e7a4a7f2 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -1852,10 +1852,14 @@ func TestServiceLatencyMissingResults(t *testing.T) { defer sub.Unsubscribe() // Create sync listener for latency. - lsub, _ := nc1.SubscribeSync("service.weather.latency") + latSubj := "service.weather.latency" + lsub, _ := nc1.SubscribeSync(latSubj) defer lsub.Unsubscribe() nc1.Flush() + // Make sure the subscription propagates to s2 server. + checkSubInterest(t, s2, "weather", latSubj, time.Second) + // Create requestor on s2. nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "one", "password", opts2.Host, opts2.Port), nats.UseOldRequestStyle()) if err != nil {