Merge pull request #1512 from nats-io/eoupdate

More checks on duplicate window config, signal in PubAck if duplicate
This commit is contained in:
Derek Collison
2020-07-10 07:12:33 -07:00
committed by GitHub
2 changed files with 38 additions and 7 deletions

View File

@@ -55,8 +55,9 @@ type StreamConfig struct {
// PubAck is the detail you get back from a publish to a stream that was successful.
// e.g. +OK {"stream": "Orders", "seq": 22}
type PubAck struct {
Stream string `json:"stream"`
Seq uint64 `json:"seq"`
Stream string `json:"stream"`
Seq uint64 `json:"seq"`
Duplicate bool `json:"duplicate,omitempty"`
}
// StreamInfo shows config and current state for this stream.
@@ -366,8 +367,19 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
cfg.MaxConsumers = -1
}
if cfg.Duplicates == 0 {
cfg.Duplicates = StreamDefaultDuplicatesWindow
if cfg.MaxAge != 0 && cfg.MaxAge < StreamDefaultDuplicatesWindow {
cfg.Duplicates = cfg.MaxAge
} else {
cfg.Duplicates = StreamDefaultDuplicatesWindow
}
} else if cfg.Duplicates < 0 {
return StreamConfig{}, fmt.Errorf("duplicates window can not be negative")
}
// Check that duplicates is not larger then age if set.
if cfg.MaxAge != 0 && cfg.Duplicates > cfg.MaxAge {
return StreamConfig{}, fmt.Errorf("duplicates window can not be larger then max age")
}
if len(cfg.Subjects) == 0 {
cfg.Subjects = append(cfg.Subjects, cfg.Name)
} else {
@@ -765,7 +777,7 @@ func (mset *Stream) processInboundJetStreamMsg(_ *subscription, pc *client, subj
if dde := mset.checkMsgId(msgId); dde != nil {
if doAck && len(reply) > 0 {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, '}')
response = append(response, ", \"duplicate\": true}"...)
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0}
}
mset.mu.Unlock()

View File

@@ -2353,7 +2353,7 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
}
mname := "DeDupe"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage, Subjects: []string{"foo.*"}})
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage, MaxAge: time.Hour, Subjects: []string{"foo.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
@@ -2365,10 +2365,22 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
t.Fatalf("Expected a default of %v, got %v", server.StreamDefaultDuplicatesWindow, duplicates)
}
cfg := mset.Config()
// Make sure can't be negative.
cfg.Duplicates = -25 * time.Millisecond
if err := mset.Update(&cfg); err == nil {
t.Fatalf("Expected an error but got none")
}
// Make sure can't be longer than age if its set.
cfg.Duplicates = 2 * time.Hour
if err := mset.Update(&cfg); err == nil {
t.Fatalf("Expected an error but got none")
}
nc := clientConnectToServer(t, s)
defer nc.Close()
sendMsg := func(seq uint64, id, msg string) {
sendMsg := func(seq uint64, id, msg string) *server.PubAck {
t.Helper()
m := nats.NewMsg(fmt.Sprintf("foo.%d", seq))
m.Header.Add(server.JSPubId, id)
@@ -2387,6 +2399,7 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
if pubAck.Seq != seq {
t.Fatalf("Did not get correct sequence in PubAck, expected %d, got %d", seq, pubAck.Seq)
}
return &pubAck
}
expect := func(n uint64) {
@@ -2408,7 +2421,7 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
sendMsg(4, "ZZ", "Hello DeDupe!")
expect(4)
cfg := mset.Config()
cfg = mset.Config()
cfg.Duplicates = 25 * time.Millisecond
if err := mset.Update(&cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -2478,6 +2491,12 @@ func TestJetStreamPublishDeDupe(t *testing.T) {
t.Fatalf("Expected 5 restored messages, got %d", nms)
}
nmids(5)
// Check we set duplicate properly.
pa := sendMsg(10, "AAAA", "Hello DeDupe!")
if !pa.Duplicate {
t.Fatalf("Expected duplicate to be set")
}
}
func TestJetStreamPullConsumerRemoveInterest(t *testing.T) {