mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
Test that shows message disappear from filestore
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -8856,3 +8856,86 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamStoredMsgsDontDisappear(t *testing.T) {
|
||||
sc := &server.StreamConfig{
|
||||
Name: "MY_STREAM",
|
||||
Storage: server.FileStorage,
|
||||
Subjects: []string{"foo.>"},
|
||||
Retention: server.InterestPolicy,
|
||||
}
|
||||
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer os.RemoveAll(config.StoreDir)
|
||||
}
|
||||
|
||||
// mset, err := s.GlobalAccount().AddStream(sc)
|
||||
mset, err := s.GlobalAccount().AddStreamWithStore(sc, &server.FileStoreConfig{BlockSize: 128, CacheExpire: 15 * time.Millisecond})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding stream: %v", err)
|
||||
}
|
||||
defer mset.Delete()
|
||||
|
||||
nc1 := clientConnectWithOldRequest(t, s)
|
||||
defer nc1.Close()
|
||||
|
||||
// Create a durable consumers
|
||||
sub, _ := nc1.SubscribeSync(nats.NewInbox())
|
||||
defer sub.Unsubscribe()
|
||||
nc1.Flush()
|
||||
|
||||
o, err := mset.AddConsumer(&server.ConsumerConfig{
|
||||
Durable: "dur",
|
||||
DeliverSubject: sub.Subject,
|
||||
FilterSubject: "foo.bar",
|
||||
DeliverPolicy: server.DeliverNew,
|
||||
AckPolicy: server.AckExplicit,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding consumer: %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
nc2 := clientConnectWithOldRequest(t, s)
|
||||
defer nc2.Close()
|
||||
|
||||
sendStreamMsg(t, nc2, "foo.bar", "msg1")
|
||||
msg, err := sub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Did not get message: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "msg1" {
|
||||
t.Fatalf("Unexpected message: %q", msg.Data)
|
||||
}
|
||||
|
||||
nc1.Close()
|
||||
|
||||
// Get the message from the stream
|
||||
getMsgSeq := func(seq uint64) {
|
||||
t.Helper()
|
||||
mreq := &server.JSApiMsgGetRequest{Seq: seq}
|
||||
req, err := json.Marshal(mreq)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
smsgj, err := nc2.Request(fmt.Sprintf(server.JSApiMsgGetT, sc.Name), req, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not retrieve stream message: %v", err)
|
||||
}
|
||||
if strings.Contains(string(smsgj.Data), "code") {
|
||||
t.Fatalf("Error: %q", smsgj.Data)
|
||||
}
|
||||
}
|
||||
getMsgSeq(1)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
sendStreamMsg(t, nc2, "foo.bar", "msg2")
|
||||
sendStreamMsg(t, nc2, "foo.bar", "msg3")
|
||||
|
||||
getMsgSeq(1)
|
||||
getMsgSeq(2)
|
||||
getMsgSeq(3)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user