Handle msg delete and stream purge for R=1

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-03 16:30:03 -08:00
parent 0dfac6b52f
commit 1c9258f91c
3 changed files with 89 additions and 16 deletions

View File

@@ -1832,7 +1832,7 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply
}
if s.JetStreamIsClustered() {
s.jsClusteredMsgDeleteRequest(ci, stream, subject, reply, req.Seq, rmsg)
s.jsClusteredMsgDeleteRequest(ci, mset, stream, subject, reply, &req, rmsg)
return
}
@@ -1984,7 +1984,7 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
}
if s.JetStreamIsClustered() {
s.jsClusteredStreamPurgeRequest(ci, stream, subject, reply, rmsg)
s.jsClusteredStreamPurgeRequest(ci, mset, stream, subject, reply, rmsg)
return
}

View File

@@ -122,6 +122,7 @@ type streamMsgDelete struct {
Client *ClientInfo `json:"client,omitempty"`
Stream string `json:"stream"`
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
Subject string `json:"subject"`
Reply string `json:"reply"`
}
@@ -1340,7 +1341,13 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool,
panic(err.Error())
}
s, cc := js.server(), js.cluster
removed, err := mset.EraseMsg(md.Seq)
var removed bool
if md.NoErase {
removed, err = mset.RemoveMsg(md.Seq)
} else {
removed, err = mset.EraseMsg(md.Seq)
}
if err != nil {
s.Warnf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", md.Seq, md.Stream, md.Client.Account, err)
}
@@ -2724,7 +2731,7 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, subject,
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
}
func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject, reply string, rmsg []byte) {
func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, mset *Stream, stream, subject, reply string, rmsg []byte) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
@@ -2734,7 +2741,7 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject,
defer js.mu.Unlock()
sa := js.streamAssignment(ci.Account, stream)
if sa == nil || sa.Group == nil || sa.Group.node == nil {
if sa == nil {
resp := JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
acc, err := s.LookupAccount(ci.Account)
if err != nil {
@@ -2746,9 +2753,20 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject,
return
}
n := sa.Group.node
sp := &streamPurge{Stream: stream, Subject: subject, Reply: reply, Client: ci}
n.Propose(encodeStreamPurge(sp))
if n := sa.Group.node; n != nil {
sp := &streamPurge{Stream: stream, Subject: subject, Reply: reply, Client: ci}
n.Propose(encodeStreamPurge(sp))
} else if mset != nil {
var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
purged, err := mset.Purge()
if err != nil {
resp.Error = jsError(err)
} else {
resp.Purged = purged
resp.Success = true
}
s.sendAPIResponse(ci, mset.account(), subject, reply, string(rmsg), s.jsonResponse(resp))
}
}
func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, req *JSApiStreamRestoreRequest, stream, subject, reply string, rmsg []byte) {
@@ -3067,7 +3085,7 @@ func decodeMsgDelete(buf []byte) (*streamMsgDelete, error) {
return &md, err
}
func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, stream, subject, reply string, seq uint64, rmsg []byte) {
func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, mset *Stream, stream, subject, reply string, req *JSApiMsgDeleteRequest, rmsg []byte) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
@@ -3077,13 +3095,32 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, stream, subject, re
defer js.mu.Unlock()
sa := js.streamAssignment(ci.Account, stream)
if sa == nil || sa.Group == nil || sa.Group.node == nil {
// TODO(dlc) - Should respond? Log?
if sa == nil {
s.Debugf("Message delete failed, could not locate stream '%s > %s'", ci.Account, stream)
return
}
n := sa.Group.node
md := &streamMsgDelete{Seq: seq, Stream: stream, Subject: subject, Reply: reply, Client: ci}
n.Propose(encodeMsgDelete(md))
// Check for single replica items.
if n := sa.Group.node; n != nil {
md := &streamMsgDelete{Seq: req.Seq, NoErase: req.NoErase, Stream: stream, Subject: subject, Reply: reply, Client: ci}
n.Propose(encodeMsgDelete(md))
} else if mset != nil {
var err error
var removed bool
if req.NoErase {
removed, err = mset.RemoveMsg(req.Seq)
} else {
removed, err = mset.EraseMsg(req.Seq)
}
var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
if err != nil {
resp.Error = jsError(err)
} else if !removed {
resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("sequence [%d] not found", req.Seq)}
} else {
resp.Success = true
}
s.sendAPIResponse(ci, mset.account(), subject, reply, string(rmsg), s.jsonResponse(resp))
}
}
func encodeAddStreamAssignment(sa *streamAssignment) []byte {

View File

@@ -1277,9 +1277,8 @@ func TestJetStreamClusterStreamNormalCatchup(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.randomServer()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
@@ -1403,6 +1402,43 @@ func TestJetStreamClusterStreamSnapshotCatchup(t *testing.T) {
c.waitOnStreamCurrent(sl, "$G", "TEST")
}
func TestJetStreamClusterDeleteMsg(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
// R=1 make sure delete works.
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
toSend := 10
for i := 1; i <= toSend; i++ {
msg := []byte(fmt.Sprintf("HELLO JSC-%d", i))
if _, err = js.Publish("TEST", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
deleteMsg := func(seq uint64) {
if err := js.DeleteMsg("TEST", seq); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
deleteMsg(1)
// Also make sure purge of R=1 works too.
if err := js.PurgeStream("TEST"); err != nil {
t.Fatalf("Unexpected purge error: %v", err)
}
}
func TestJetStreamClusterStreamSnapshotCatchupWithPurge(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()