From 1c9258f91c0a1f46cb4c59489ab620bfd9f9ec7a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 3 Feb 2021 16:30:03 -0800 Subject: [PATCH] Handle msg delete and stream purge for R=1 Signed-off-by: Derek Collison --- server/jetstream_api.go | 4 +-- server/jetstream_cluster.go | 61 +++++++++++++++++++++++++++------- test/jetstream_cluster_test.go | 40 ++++++++++++++++++++-- 3 files changed, 89 insertions(+), 16 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 971313dd..f2704447 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1832,7 +1832,7 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, subject, reply } if s.JetStreamIsClustered() { - s.jsClusteredMsgDeleteRequest(ci, stream, subject, reply, req.Seq, rmsg) + s.jsClusteredMsgDeleteRequest(ci, mset, stream, subject, reply, &req, rmsg) return } @@ -1984,7 +1984,7 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep } if s.JetStreamIsClustered() { - s.jsClusteredStreamPurgeRequest(ci, stream, subject, reply, rmsg) + s.jsClusteredStreamPurgeRequest(ci, mset, stream, subject, reply, rmsg) return } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3094ec3f..2bf05e10 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -122,6 +122,7 @@ type streamMsgDelete struct { Client *ClientInfo `json:"client,omitempty"` Stream string `json:"stream"` Seq uint64 `json:"seq"` + NoErase bool `json:"no_erase,omitempty"` Subject string `json:"subject"` Reply string `json:"reply"` } @@ -1340,7 +1341,13 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool, panic(err.Error()) } s, cc := js.server(), js.cluster - removed, err := mset.EraseMsg(md.Seq) + + var removed bool + if md.NoErase { + removed, err = mset.RemoveMsg(md.Seq) + } else { + removed, err = mset.EraseMsg(md.Seq) + } if err != nil { s.Warnf("JetStream cluster failed to delete msg %d from stream %q for account %q: %v", md.Seq, md.Stream, md.Client.Account, err) } @@ -2724,7 +2731,7 @@ func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, stream, subject, cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } -func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject, reply string, rmsg []byte) { +func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, mset *Stream, stream, subject, reply string, rmsg []byte) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return @@ -2734,7 +2741,7 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject, defer js.mu.Unlock() sa := js.streamAssignment(ci.Account, stream) - if sa == nil || sa.Group == nil || sa.Group.node == nil { + if sa == nil { resp := JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} acc, err := s.LookupAccount(ci.Account) if err != nil { @@ -2746,9 +2753,20 @@ func (s *Server) jsClusteredStreamPurgeRequest(ci *ClientInfo, stream, subject, return } - n := sa.Group.node - sp := &streamPurge{Stream: stream, Subject: subject, Reply: reply, Client: ci} - n.Propose(encodeStreamPurge(sp)) + if n := sa.Group.node; n != nil { + sp := &streamPurge{Stream: stream, Subject: subject, Reply: reply, Client: ci} + n.Propose(encodeStreamPurge(sp)) + } else if mset != nil { + var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} + purged, err := mset.Purge() + if err != nil { + resp.Error = jsError(err) + } else { + resp.Purged = purged + resp.Success = true + } + s.sendAPIResponse(ci, mset.account(), subject, reply, string(rmsg), s.jsonResponse(resp)) + } } func (s *Server) jsClusteredStreamRestoreRequest(ci *ClientInfo, acc *Account, req *JSApiStreamRestoreRequest, stream, subject, reply string, rmsg []byte) { @@ -3067,7 +3085,7 @@ func decodeMsgDelete(buf []byte) (*streamMsgDelete, error) { return &md, err } -func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, stream, subject, reply string, seq uint64, rmsg []byte) { +func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, mset *Stream, stream, subject, reply string, req *JSApiMsgDeleteRequest, rmsg []byte) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return @@ -3077,13 +3095,32 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, stream, subject, re defer js.mu.Unlock() sa := js.streamAssignment(ci.Account, stream) - if sa == nil || sa.Group == nil || sa.Group.node == nil { - // TODO(dlc) - Should respond? Log? + if sa == nil { + s.Debugf("Message delete failed, could not locate stream '%s > %s'", ci.Account, stream) return } - n := sa.Group.node - md := &streamMsgDelete{Seq: seq, Stream: stream, Subject: subject, Reply: reply, Client: ci} - n.Propose(encodeMsgDelete(md)) + // Check for single replica items. + if n := sa.Group.node; n != nil { + md := &streamMsgDelete{Seq: req.Seq, NoErase: req.NoErase, Stream: stream, Subject: subject, Reply: reply, Client: ci} + n.Propose(encodeMsgDelete(md)) + } else if mset != nil { + var err error + var removed bool + if req.NoErase { + removed, err = mset.RemoveMsg(req.Seq) + } else { + removed, err = mset.EraseMsg(req.Seq) + } + var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} + if err != nil { + resp.Error = jsError(err) + } else if !removed { + resp.Error = &ApiError{Code: 400, Description: fmt.Sprintf("sequence [%d] not found", req.Seq)} + } else { + resp.Success = true + } + s.sendAPIResponse(ci, mset.account(), subject, reply, string(rmsg), s.jsonResponse(resp)) + } } func encodeAddStreamAssignment(sa *streamAssignment) []byte { diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 969e4ac3..da73f0a3 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -1277,9 +1277,8 @@ func TestJetStreamClusterStreamNormalCatchup(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() - s := c.randomServer() - // Client based API + s := c.randomServer() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -1403,6 +1402,43 @@ func TestJetStreamClusterStreamSnapshotCatchup(t *testing.T) { c.waitOnStreamCurrent(sl, "$G", "TEST") } +func TestJetStreamClusterDeleteMsg(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // R=1 make sure delete works. + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + toSend := 10 + for i := 1; i <= toSend; i++ { + msg := []byte(fmt.Sprintf("HELLO JSC-%d", i)) + if _, err = js.Publish("TEST", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + deleteMsg := func(seq uint64) { + if err := js.DeleteMsg("TEST", seq); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + deleteMsg(1) + + // Also make sure purge of R=1 works too. + if err := js.PurgeStream("TEST"); err != nil { + t.Fatalf("Unexpected purge error: %v", err) + } +} + func TestJetStreamClusterStreamSnapshotCatchupWithPurge(t *testing.T) { c := createJetStreamClusterExplicit(t, "R5S", 5) defer c.shutdown()