mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -26,6 +26,7 @@ import (
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -1894,24 +1895,36 @@ func TestParseExport(t *testing.T) {
|
||||
defer nc1.Close()
|
||||
nc2 := connect("u2")
|
||||
defer nc2.Close()
|
||||
subscribe := func(nc *nats.Conn, msgs int, subj string) (*sync.WaitGroup, *nats.Subscription) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(msgs)
|
||||
sub, err := nc.Subscribe(subj, func(msg *nats.Msg) {
|
||||
|
||||
// Due to the fact that above CONNECT events are generated and sent from
|
||||
// a system go routine, it is possible that by the time we create the
|
||||
// subscriptions below, the interest would exist and messages be sent,
|
||||
// which was causing issues since wg.Done() was called too many times.
|
||||
// Add a little delay to minimize risk, but also use counter to decide
|
||||
// when to call wg.Done() to avoid panic due to negative number.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
count := int32(0)
|
||||
// We expect a total of 6 messages
|
||||
expected := int32(6)
|
||||
subscribe := func(nc *nats.Conn, subj string) {
|
||||
t.Helper()
|
||||
_, err := nc.Subscribe(subj, func(msg *nats.Msg) {
|
||||
if msg.Reply != _EMPTY_ {
|
||||
msg.Respond(msg.Data)
|
||||
}
|
||||
wg.Done()
|
||||
if atomic.AddInt32(&count, 1) == expected {
|
||||
wg.Done()
|
||||
}
|
||||
})
|
||||
require_NoError(t, err)
|
||||
nc.Flush()
|
||||
return &wg, sub
|
||||
}
|
||||
//Subscribe to CONNS events
|
||||
wg1, s1 := subscribe(nc1, 2, "$SYS.SERVER.ACCOUNT.accI1.CONNS")
|
||||
defer s1.Unsubscribe()
|
||||
wg2, s2 := subscribe(nc2, 2, "$SYS.SERVER.ACCOUNT.accI2.CONNS")
|
||||
defer s2.Unsubscribe()
|
||||
subscribe(nc1, "$SYS.SERVER.ACCOUNT.accI1.CONNS")
|
||||
subscribe(nc2, "$SYS.SERVER.ACCOUNT.accI2.CONNS")
|
||||
// Trigger 2 CONNS event
|
||||
nc3 := connect("u1")
|
||||
nc3.Close()
|
||||
@@ -1920,8 +1933,7 @@ func TestParseExport(t *testing.T) {
|
||||
// test service
|
||||
ncE := connect("ue")
|
||||
defer ncE.Close()
|
||||
wge, se := subscribe(ncE, 2, "foo.*")
|
||||
defer se.Unsubscribe()
|
||||
subscribe(ncE, "foo.*")
|
||||
request := func(nc *nats.Conn, msg string) {
|
||||
if m, err := nc.Request("foo", []byte(msg), time.Second); err != nil {
|
||||
t.Fatal("Failed request ", msg, err)
|
||||
@@ -1933,9 +1945,7 @@ func TestParseExport(t *testing.T) {
|
||||
}
|
||||
request(nc1, "1")
|
||||
request(nc2, "1")
|
||||
for _, wg := range []*sync.WaitGroup{wge, wg1, wg2} {
|
||||
wg.Wait()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestAccountUsersLoadedProperly(t *testing.T) {
|
||||
|
||||
@@ -1852,10 +1852,14 @@ func TestServiceLatencyMissingResults(t *testing.T) {
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
// Create sync listener for latency.
|
||||
lsub, _ := nc1.SubscribeSync("service.weather.latency")
|
||||
latSubj := "service.weather.latency"
|
||||
lsub, _ := nc1.SubscribeSync(latSubj)
|
||||
defer lsub.Unsubscribe()
|
||||
nc1.Flush()
|
||||
|
||||
// Make sure the subscription propagates to s2 server.
|
||||
checkSubInterest(t, s2, "weather", latSubj, time.Second)
|
||||
|
||||
// Create requestor on s2.
|
||||
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%s@%s:%d", "one", "password", opts2.Host, opts2.Port), nats.UseOldRequestStyle())
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user