Add test case for concurrent expected last subject sequence

Signed-off-by: Byron Ruth <byron@nats.io>
This commit is contained in:
Byron Ruth
2023-07-18 11:10:51 -04:00
committed by Derek Collison
parent 3173d435b1
commit 16a336ead3

View File

@@ -11282,6 +11282,93 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) {
}
}
func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc0, js0 := jsClientConnect(t, c.randomServer())
defer nc0.Close()
nc1, js1 := jsClientConnect(t, c.randomServer())
defer nc1.Close()
cfg := StreamConfig{
Name: "KV",
Subjects: []string{"kv.>"},
Storage: st,
Replicas: 3,
}
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Do manually for now.
m, err := nc0.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
require_NoError(t, err)
si, err := js0.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data))
}
if si == nil || si.Config.Name != "KV" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
pub := func(js nats.JetStreamContext, subj, data, seq string) {
t.Helper()
m := nats.NewMsg(subj)
m.Data = []byte(data)
m.Header.Set(JSExpectedLastSubjSeq, seq)
js.PublishMsg(m)
}
ready := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
<-ready
pub(js0, "kv.foo", "0-0", "0")
pub(js0, "kv.foo", "0-1", "1")
pub(js0, "kv.foo", "0-2", "2")
wg.Done()
}()
go func() {
<-ready
pub(js1, "kv.foo", "1-0", "0")
pub(js1, "kv.foo", "1-1", "1")
pub(js1, "kv.foo", "1-2", "2")
wg.Done()
}()
time.Sleep(10 * time.Millisecond)
close(ready)
wg.Wait()
// Read the messages.
sub, err := js0.PullSubscribe("", "", nats.BindStream("KV"))
require_NoError(t, err)
msgs, err := sub.Fetch(10)
require_NoError(t, err)
if len(msgs) != 3 {
t.Errorf("Expected 3 messages, got %d", len(msgs))
}
for i, m := range msgs {
md, _ := m.Metadata()
t.Logf("Seq: %d", md.Sequence.Stream)
t.Logf("Data: %s", m.Data)
t.Logf("Header: %v", m.Header)
if m.Header.Get(JSExpectedLastSubjSeq) != fmt.Sprint(i) {
t.Errorf("Expected %d for last sequence, got %q", i, m.Header.Get(JSExpectedLastSubjSeq))
}
}
})
}
}
func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()