From 308355a2fd1d1d42b28ad152a1b7d7d4a388b80f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 24 May 2021 08:21:41 -0700 Subject: [PATCH] Fix for #2242. When we had a duplicate detected in R>1 mode we set the skip sequence indicator but were not using that when dealing with underlying store. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 4 ++++ server/jetstream_cluster_test.go | 39 ++++++++++++++++++++++++++++++++ server/stream.go | 11 ++++++--- 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 973f65d9..985fbc0a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1190,6 +1190,10 @@ func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error { s, cc := js.srv, js.cluster + if cc == nil || cc.meta == nil { + return ErrJetStreamNotClustered + } + // If this is a single peer raft group or we are not a member return. if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) { // Nothing to do here. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index f5b94cf2..60558f2b 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -6863,6 +6863,45 @@ func TestJetStreamClusterCrossAccountInterop(t *testing.T) { }) } +// https://github.com/nats-io/nats-server/issues/2242 +func TestJetStreamClusterMsgIdDuplicateBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "MSL", 3) + defer c.shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sendMsgID := func(id string) (*nats.PubAck, error) { + t.Helper() + m := nats.NewMsg("foo") + m.Header.Add(JSMsgId, id) + m.Data = []byte("HELLO WORLD") + return js.PublishMsg(m) + } + + if _, err := sendMsgID("1"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // This should fail with duplicate detected. + if pa, _ := sendMsgID("1"); pa == nil || !pa.Duplicate { + t.Fatalf("Expected duplicate but got none: %+v", pa) + } + // This should be fine. + if _, err := sendMsgID("2"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/stream.go b/server/stream.go index 8d74a163..208c6463 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2480,7 +2480,10 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subje } } -var errLastSeqMismatch = errors.New("last sequence mismatch") +var ( + errLastSeqMismatch = errors.New("last sequence mismatch") + errMsgIdDuplicate = errors.New("msgid is duplicate") +) // processJetStreamMsg is where we try to actually process the stream msg. func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error { @@ -2555,7 +2558,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, response = append(response, ",\"duplicate\": true}"...) outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } - return errors.New("msgid is duplicate") + return errMsgIdDuplicate } // Expected stream. @@ -2681,6 +2684,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Assume this will succeed. olmsgId := mset.lmsgId mset.lmsgId = msgId + clfs := mset.clfs mset.lseq++ // We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions. @@ -2692,7 +2696,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if lseq == 0 && ts == 0 { seq, ts, err = store.StoreMsg(subject, hdr, msg) } else { - seq = lseq + 1 + // Make sure to take into account any message assignments that we had to skip (clfs). + seq = lseq + 1 - clfs err = store.StoreRawMsg(subject, hdr, msg, seq, ts) }