diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 973f65d9..985fbc0a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1190,6 +1190,10 @@ func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error { s, cc := js.srv, js.cluster + if cc == nil || cc.meta == nil { + return ErrJetStreamNotClustered + } + // If this is a single peer raft group or we are not a member return. if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) { // Nothing to do here. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 52f1ba9f..ffa08064 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -6863,6 +6863,45 @@ func TestJetStreamClusterCrossAccountInterop(t *testing.T) { }) } +// https://github.com/nats-io/nats-server/issues/2242 +func TestJetStreamClusterMsgIdDuplicateBug(t *testing.T) { + c := createJetStreamClusterExplicit(t, "MSL", 3) + defer c.shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sendMsgID := func(id string) (*nats.PubAck, error) { + t.Helper() + m := nats.NewMsg("foo") + m.Header.Add(JSMsgId, id) + m.Data = []byte("HELLO WORLD") + return js.PublishMsg(m) + } + + if _, err := sendMsgID("1"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // This should fail with duplicate detected. + if pa, _ := sendMsgID("1"); pa == nil || !pa.Duplicate { + t.Fatalf("Expected duplicate but got none: %+v", pa) + } + // This should be fine. + if _, err := sendMsgID("2"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } +} + func TestJetStreamClusterNilMsgWithHeaderThroughSourcedStream(t *testing.T) { tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1) c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 12232, true) diff --git a/server/stream.go b/server/stream.go index 8d74a163..208c6463 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2480,7 +2480,10 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subje } } -var errLastSeqMismatch = errors.New("last sequence mismatch") +var ( + errLastSeqMismatch = errors.New("last sequence mismatch") + errMsgIdDuplicate = errors.New("msgid is duplicate") +) // processJetStreamMsg is where we try to actually process the stream msg. func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error { @@ -2555,7 +2558,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, response = append(response, ",\"duplicate\": true}"...) outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil}) } - return errors.New("msgid is duplicate") + return errMsgIdDuplicate } // Expected stream. @@ -2681,6 +2684,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Assume this will succeed. olmsgId := mset.lmsgId mset.lmsgId = msgId + clfs := mset.clfs mset.lseq++ // We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions. @@ -2692,7 +2696,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if lseq == 0 && ts == 0 { seq, ts, err = store.StoreMsg(subject, hdr, msg) } else { - seq = lseq + 1 + // Make sure to take into account any message assignments that we had to skip (clfs). + seq = lseq + 1 - clfs err = store.StoreRawMsg(subject, hdr, msg, seq, ts) }