Make source and mirror consumers direct, meaning they are not mapped by the metaleader.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-10 07:12:48 -05:00
parent 5a3c5720b1
commit f95b6481d2
3 changed files with 56 additions and 28 deletions

View File

@@ -63,6 +63,9 @@ type ConsumerConfig struct {
MaxAckPending int `json:"max_ack_pending,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
// Don't add to general clients.
Direct bool `json:"direct,omitempty"`
}
type CreateConsumerRequest struct {
@@ -302,6 +305,19 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}
// Direct need to be non-mapped ephemerals.
if config.Direct {
if config.DeliverSubject == _EMPTY_ {
return nil, fmt.Errorf("consumer direct requires a push based consumer")
}
if isDurableConsumer(config) {
return nil, fmt.Errorf("consumer direct requires an ephemeral consumer")
}
if ca != nil {
return nil, fmt.Errorf("consumer direct on a mapped consumer")
}
}
// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
@@ -585,7 +601,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
mset.setConsumer(o)
mset.mu.Unlock()
if !s.JetStreamIsClustered() && s.standAloneMode() {
if config.Direct || (!s.JetStreamIsClustered() && s.standAloneMode()) {
o.setLeader(true)
}

View File

@@ -2842,23 +2842,6 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
}
}
var streamName string
if expectDurable {
streamName = tokenAt(subject, 6)
@@ -2866,17 +2849,42 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
streamName = tokenAt(subject, 5)
}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req CreateConsumerRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
if req.Config.Direct {
// Check to see if we have this stream and are the stream leader.
if !acc.JetStreamIsStreamLeader(streamName) {
return
}
} else {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
}
}
}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if streamName != req.Stream {
resp.Error = jsStreamMismatchErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -2914,7 +2922,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
}
}
if s.JetStreamIsClustered() {
if s.JetStreamIsClustered() && !req.Config.Direct {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
return
}

View File

@@ -992,7 +992,7 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo {
return mset.sourceInfo(mset.mirror)
}
const sourceHealthCheckInterval = 10 * time.Second
const sourceHealthCheckInterval = 2 * time.Second
// Will run as a Go routine to process mirror consumer messages.
func (mset *stream) processMirrorMsgs() {
@@ -1196,6 +1196,7 @@ func (mset *stream) setupMirrorConsumer() error {
}
mset.mirror.sub = sub
mset.mirror.last = time.Now()
if !mset.mirror.grr {
mset.mirror.grr = true
@@ -1216,8 +1217,9 @@ func (mset *stream) setupMirrorConsumer() error {
AckPolicy: AckNone,
AckWait: 48 * time.Hour,
MaxDeliver: 1,
Heartbeat: 10 * time.Second,
Heartbeat: sourceHealthCheckInterval,
FlowControl: true,
Direct: true,
},
}
@@ -1275,7 +1277,7 @@ func (mset *stream) setupMirrorConsumer() error {
mset.mu.Unlock()
}
mset.setMirrorErr(ccr.Error)
case <-time.After(5 * time.Second):
case <-time.After(10 * time.Second):
mset.resetMirrorConsumer()
}
}()
@@ -1332,6 +1334,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
mset.removeInternalConsumer(si)
si.sseq, si.dseq = 0, 0
si.last = time.Now()
ssi := mset.streamSource(sname)
// Determine subjects etc.
@@ -1367,8 +1370,9 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
AckPolicy: AckNone,
AckWait: 48 * time.Hour,
MaxDeliver: 1,
Heartbeat: 10 * time.Second,
Heartbeat: sourceHealthCheckInterval,
FlowControl: true,
Direct: true,
},
}
// If starting, check any configs.