Merge pull request #2245 from nats-io/msgid

[FIXED] #2242.
This commit is contained in:
Derek Collison
2021-05-24 09:29:43 -07:00
committed by GitHub
3 changed files with 51 additions and 3 deletions

View File

@@ -1190,6 +1190,10 @@ func (js *jetStream) createRaftGroup(rg *raftGroup, storage StorageType) error {
s, cc := js.srv, js.cluster
if cc == nil || cc.meta == nil {
return ErrJetStreamNotClustered
}
// If this is a single peer raft group or we are not a member return.
if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) {
// Nothing to do here.

View File

@@ -6863,6 +6863,45 @@ func TestJetStreamClusterCrossAccountInterop(t *testing.T) {
})
}
// https://github.com/nats-io/nats-server/issues/2242
func TestJetStreamClusterMsgIdDuplicateBug(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSL", 3)
defer c.shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sendMsgID := func(id string) (*nats.PubAck, error) {
t.Helper()
m := nats.NewMsg("foo")
m.Header.Add(JSMsgId, id)
m.Data = []byte("HELLO WORLD")
return js.PublishMsg(m)
}
if _, err := sendMsgID("1"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// This should fail with duplicate detected.
if pa, _ := sendMsgID("1"); pa == nil || !pa.Duplicate {
t.Fatalf("Expected duplicate but got none: %+v", pa)
}
// This should be fine.
if _, err := sendMsgID("2"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
func TestJetStreamClusterNilMsgWithHeaderThroughSourcedStream(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 12232, true)

View File

@@ -2480,7 +2480,10 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, subje
}
}
var errLastSeqMismatch = errors.New("last sequence mismatch")
var (
errLastSeqMismatch = errors.New("last sequence mismatch")
errMsgIdDuplicate = errors.New("msgid is duplicate")
)
// processJetStreamMsg is where we try to actually process the stream msg.
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {
@@ -2555,7 +2558,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
response = append(response, ",\"duplicate\": true}"...)
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
return errors.New("msgid is duplicate")
return errMsgIdDuplicate
}
// Expected stream.
@@ -2681,6 +2684,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Assume this will succeed.
olmsgId := mset.lmsgId
mset.lmsgId = msgId
clfs := mset.clfs
mset.lseq++
// We hold the lock to this point to make sure nothing gets between us since we check for pre-conditions.
@@ -2692,7 +2696,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
} else {
seq = lseq + 1
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
}