mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Make direct message retrieval from stream cluster aware
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1863,7 +1863,56 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
|
||||
return
|
||||
}
|
||||
|
||||
stream := tokenAt(subject, 6)
|
||||
|
||||
var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
|
||||
|
||||
// If we are in clustered mode we need to be the stream leader to proceed.
|
||||
if s.JetStreamIsClustered() {
|
||||
// Check to make sure the stream is assigned.
|
||||
js, cc := s.getJetStreamCluster()
|
||||
if js == nil || cc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
|
||||
resp.Error = jsClusterNotAvailErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
js.mu.RLock()
|
||||
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
|
||||
js.mu.RUnlock()
|
||||
|
||||
if isLeader && sa == nil {
|
||||
// We can't find the stream, so mimic what would be the errors below.
|
||||
if !acc.JetStreamEnabled() {
|
||||
resp.Error = jsNotEnabledErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
// No stream present.
|
||||
resp.Error = jsNotFoundError(ErrJetStreamStreamNotFound)
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
} else if sa == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check to see if we are a member of the group and if the group has no leader.
|
||||
if js.isGroupLeaderless(sa.Group) {
|
||||
resp.Error = jsClusterNotAvailErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
// We have the stream assigned and a leader, so only the stream leader should answer.
|
||||
if !acc.JetStreamIsStreamLeader(stream) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if !acc.JetStreamEnabled() {
|
||||
resp.Error = jsNotEnabledErr
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
|
||||
@@ -1881,7 +1930,6 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, subject, reply st
|
||||
return
|
||||
}
|
||||
|
||||
stream := tokenAt(subject, 6)
|
||||
mset, err := acc.LookupStream(stream)
|
||||
if err != nil {
|
||||
resp.Error = jsNotFoundError(err)
|
||||
|
||||
@@ -3216,6 +3216,45 @@ func TestJetStreamClusterPurgeReplayAfterRestart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterStreamGetMsg(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3F", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API
|
||||
s := c.randomServer()
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
|
||||
t.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
|
||||
mreq := &server.JSApiMsgGetRequest{Seq: 1}
|
||||
req, err := json.Marshal(mreq)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
rmsg, err := nc.Request(fmt.Sprintf(server.JSApiMsgGetT, "TEST"), req, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not retrieve stream message: %v", err)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Could not retrieve stream message: %v", err)
|
||||
}
|
||||
|
||||
var resp server.JSApiMsgGetResponse
|
||||
err = json.Unmarshal(rmsg.Data, &resp)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not parse stream message: %v", err)
|
||||
}
|
||||
if resp.Message == nil || resp.Error != nil {
|
||||
t.Fatalf("Did not receive correct response: %+v", resp.Error)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterSuperClusterBasics(t *testing.T) {
|
||||
sc := createJetStreamSuperCluster(t, 3, 3)
|
||||
defer sc.shutdown()
|
||||
|
||||
Reference in New Issue
Block a user