From 78f549b35b5aa236655b571ad3c1e3304c1b2d67 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 5 Feb 2021 06:37:49 -0800 Subject: [PATCH] Make direct message retrieval from stream cluster aware Signed-off-by: Derek Collison --- server/jetstream_api.go | 50 +++++++++++++++++++++++++++++++++- test/jetstream_cluster_test.go | 39 ++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f2704447..ff01d073 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1863,7 +1863,56 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st return } + stream := tokenAt(subject, 6) + var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}} + + // If we are in clustered mode we need to be the stream leader to proceed. + if s.JetStreamIsClustered() { + // Check to make sure the stream is assigned. + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + js.mu.RLock() + isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream) + js.mu.RUnlock() + + if isLeader && sa == nil { + // We can't find the stream, so mimic what would be the errors below. + if !acc.JetStreamEnabled() { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // No stream present. + resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } else if sa == nil { + return + } + + // Check to see if we are a member of the group and if the group has no leader. + if js.isGroupLeaderless(sa.Group) { + resp.Error = jsClusterNotAvailErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + // We have the stream assigned and a leader, so only the stream leader should answer. + if !acc.JetStreamIsStreamLeader(stream) { + return + } + } + if !acc.JetStreamEnabled() { resp.Error = jsNotEnabledErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -1881,7 +1930,6 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st return } - stream := tokenAt(subject, 6) mset, err := acc.LookupStream(stream) if err != nil { resp.Error = jsNotFoundError(err) diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 1f8fc7c5..2247bfeb 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -3216,6 +3216,45 @@ func TestJetStreamClusterPurgeReplayAfterRestart(t *testing.T) { } } +func TestJetStreamClusterStreamGetMsg(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.Publish("TEST", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + + mreq := &server.JSApiMsgGetRequest{Seq: 1} + req, err := json.Marshal(mreq) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + rmsg, err := nc.Request(fmt.Sprintf(server.JSApiMsgGetT, "TEST"), req, time.Second) + if err != nil { + t.Fatalf("Could not retrieve stream message: %v", err) + } + if err != nil { + t.Fatalf("Could not retrieve stream message: %v", err) + } + + var resp server.JSApiMsgGetResponse + err = json.Unmarshal(rmsg.Data, &resp) + if err != nil { + t.Fatalf("Could not parse stream message: %v", err) + } + if resp.Message == nil || resp.Error != nil { + t.Fatalf("Did not receive correct response: %+v", resp.Error) + } +} + func TestJetStreamClusterSuperClusterBasics(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 3) defer sc.shutdown()