diff --git a/server/consumer.go b/server/consumer.go index 05cd6991..3866166f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -63,6 +63,9 @@ type ConsumerConfig struct { MaxAckPending int `json:"max_ack_pending,omitempty"` Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` FlowControl bool `json:"flow_control,omitempty"` + + // Don't add to general clients. + Direct bool `json:"direct,omitempty"` } type CreateConsumerRequest struct { @@ -302,6 +305,19 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } + // Direct need to be non-mapped ephemerals. + if config.Direct { + if config.DeliverSubject == _EMPTY_ { + return nil, fmt.Errorf("consumer direct requires a push based consumer") + } + if isDurableConsumer(config) { + return nil, fmt.Errorf("consumer direct requires an ephemeral consumer") + } + if ca != nil { + return nil, fmt.Errorf("consumer direct on a mapped consumer") + } + } + // Setup proper default for ack wait if we are in explicit ack mode. if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) { config.AckWait = JsAckWaitDefault @@ -585,7 +601,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri mset.setConsumer(o) mset.mu.Unlock() - if !s.JetStreamIsClustered() && s.standAloneMode() { + if config.Direct || (!s.JetStreamIsClustered() && s.standAloneMode()) { o.setLeader(true) } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b9ae1899..4ebf3489 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2842,23 +2842,6 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - // Determine if we should proceed here when we are in clustered mode. - if s.JetStreamIsClustered() { - js, cc := s.getJetStreamCluster() - if js == nil || cc == nil { - return - } - if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { - resp.Error = jsClusterNotAvailErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Make sure we are meta leader. - if !s.JetStreamIsLeader() { - return - } - } - var streamName string if expectDurable { streamName = tokenAt(subject, 6) @@ -2866,17 +2849,42 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s streamName = tokenAt(subject, 5) } - if !acc.JetStreamEnabled() { - resp.Error = jsNotEnabledErr - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } var req CreateConsumerRequest if err := json.Unmarshal(msg, &req); err != nil { resp.Error = jsInvalidJSONErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + + // Determine if we should proceed here when we are in clustered mode. + if s.JetStreamIsClustered() { + if req.Config.Direct { + // Check to see if we have this stream and are the stream leader. + if !acc.JetStreamIsStreamLeader(streamName) { + return + } + } else { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return + } + if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ { + resp.Error = jsClusterNotAvailErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + } + + if !acc.JetStreamEnabled() { + resp.Error = jsNotEnabledErr + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } if streamName != req.Stream { resp.Error = jsStreamMismatchErr s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -2914,7 +2922,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s } } - if s.JetStreamIsClustered() { + if s.JetStreamIsClustered() && !req.Config.Direct { s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config) return } diff --git a/server/stream.go b/server/stream.go index 3618b0f4..178bb41e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -992,7 +992,7 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo { return mset.sourceInfo(mset.mirror) } -const sourceHealthCheckInterval = 10 * time.Second +const sourceHealthCheckInterval = 2 * time.Second // Will run as a Go routine to process mirror consumer messages. func (mset *stream) processMirrorMsgs() { @@ -1196,6 +1196,7 @@ func (mset *stream) setupMirrorConsumer() error { } mset.mirror.sub = sub + mset.mirror.last = time.Now() if !mset.mirror.grr { mset.mirror.grr = true @@ -1216,8 +1217,9 @@ func (mset *stream) setupMirrorConsumer() error { AckPolicy: AckNone, AckWait: 48 * time.Hour, MaxDeliver: 1, - Heartbeat: 10 * time.Second, + Heartbeat: sourceHealthCheckInterval, FlowControl: true, + Direct: true, }, } @@ -1275,7 +1277,7 @@ func (mset *stream) setupMirrorConsumer() error { mset.mu.Unlock() } mset.setMirrorErr(ccr.Error) - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): mset.resetMirrorConsumer() } }() @@ -1332,6 +1334,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { mset.removeInternalConsumer(si) si.sseq, si.dseq = 0, 0 + si.last = time.Now() ssi := mset.streamSource(sname) // Determine subjects etc. @@ -1367,8 +1370,9 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { AckPolicy: AckNone, AckWait: 48 * time.Hour, MaxDeliver: 1, - Heartbeat: 10 * time.Second, + Heartbeat: sourceHealthCheckInterval, FlowControl: true, + Direct: true, }, } // If starting, check any configs.