Provide exactly once semantics

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-07-08 16:23:31 -07:00
parent 33deee8a64
commit b74c2eb2c4
3 changed files with 300 additions and 4 deletions

View File

@@ -2292,6 +2292,150 @@ func TestJetStreamWorkQueueTerminateDelivery(t *testing.T) {
}
}
func TestJetStreamConsumerAckAck(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "ACK-ACK"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: "worker", AckPolicy: server.AckExplicit})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
rqn := o.RequestNextMsgSubject()
defer o.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// 5 for number of ack protocols to test them all.
for i := 0; i < 5; i++ {
sendStreamMsg(t, nc, mname, "Hello World!")
}
testAck := func(ackType []byte) {
m, err := nc.Request(rqn, nil, 10*time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send a request for the ack and make sure the server "ack's" the ack.
if _, err := nc.Request(m.Reply, ackType, 10*time.Millisecond); err != nil {
t.Fatalf("Unexpected error on ack/ack: %v", err)
}
}
testAck(server.AckAck)
testAck(server.AckNak)
testAck(server.AckProgress)
testAck(server.AckNext)
testAck(server.AckTerm)
}
func TestJetStreamPublishDeDupe(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "DeDupe"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.MemoryStorage, Subjects: []string{"foo.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
// Check Duplicates setting.
duplicates := mset.Config().Duplicates
if duplicates != server.StreamDefaultDuplicatesWindow {
t.Fatalf("Expected a default of %v, got %v", server.StreamDefaultDuplicatesWindow, duplicates)
}
nc := clientConnectToServer(t, s)
defer nc.Close()
sendMsg := func(seq uint64, id, msg string) {
t.Helper()
m := nats.NewMsg(fmt.Sprintf("foo.%d", seq))
m.Header.Add(server.JSPubId, id)
m.Data = []byte(msg)
resp, _ := nc.RequestMsg(m, 100*time.Millisecond)
if resp == nil {
t.Fatalf("No response for %q, possible timeout?", msg)
}
if !bytes.HasPrefix(resp.Data, []byte("+OK {")) {
t.Fatalf("Expected a JetStreamPubAck, got %q", resp.Data)
}
var pubAck server.PubAck
if err := json.Unmarshal(resp.Data[3:], &pubAck); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if pubAck.Seq != seq {
t.Fatalf("Did not get correct sequence in PubAck, expected %d, got %d", seq, pubAck.Seq)
}
}
expect := func(n uint64) {
t.Helper()
state := mset.State()
if state.Msgs != n {
t.Fatalf("Expected %d messages, got %d", n, state.Msgs)
}
}
sendMsg(1, "AA", "Hello DeDupe!")
sendMsg(2, "BB", "Hello DeDupe!")
sendMsg(3, "CC", "Hello DeDupe!")
sendMsg(4, "ZZ", "Hello DeDupe!")
expect(4)
sendMsg(1, "AA", "Hello DeDupe!")
sendMsg(2, "BB", "Hello DeDupe!")
sendMsg(4, "ZZ", "Hello DeDupe!")
expect(4)
cfg := mset.Config()
cfg.Duplicates = 25 * time.Millisecond
if err := mset.Update(&cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nmids := func(expected int) {
t.Helper()
checkFor(t, 200*time.Millisecond, 10*time.Millisecond, func() error {
if nids := mset.NumMsgIds(); nids != expected {
return fmt.Errorf("Expected %d message ids, got %d", expected, nids)
}
return nil
})
}
nmids(4)
time.Sleep(cfg.Duplicates * 2)
sendMsg(5, "AAA", "Hello DeDupe!")
sendMsg(6, "BBB", "Hello DeDupe!")
sendMsg(7, "CCC", "Hello DeDupe!")
sendMsg(8, "ZZZ", "Hello DeDupe!")
nmids(4)
// Eventually will drop to zero.
nmids(0)
}
func TestJetStreamPullConsumerRemoveInterest(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()