Merge pull request #1989 from nats-io/sources

Make source and mirror consumers direct.
This commit is contained in:
Ivan Kozlovic
2021-03-10 20:39:54 -07:00
committed by GitHub
6 changed files with 73 additions and 72 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.0-RC.7.3"
VERSION = "2.2.0-RC.7.5"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -63,6 +63,9 @@ type ConsumerConfig struct {
MaxAckPending int `json:"max_ack_pending,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
// Don't add to general clients.
Direct bool `json:"direct,omitempty"`
}
type CreateConsumerRequest struct {
@@ -233,7 +236,7 @@ const (
JsDeleteWaitTimeDefault = 5 * time.Second
// JsFlowControlMaxPending specifies default pending bytes during flow control that can be
// outstanding.
JsFlowControlMaxPending = 32 * 1024 * 1024
JsFlowControlMaxPending = 16 * 1024 * 1024
)
func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
@@ -302,6 +305,19 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}
// Direct need to be non-mapped ephemerals.
if config.Direct {
if config.DeliverSubject == _EMPTY_ {
return nil, fmt.Errorf("consumer direct requires a push based consumer")
}
if isDurableConsumer(config) {
return nil, fmt.Errorf("consumer direct requires an ephemeral consumer")
}
if ca != nil {
return nil, fmt.Errorf("consumer direct on a mapped consumer")
}
}
// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
@@ -585,7 +601,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
mset.setConsumer(o)
mset.mu.Unlock()
if !s.JetStreamIsClustered() && s.standAloneMode() {
if config.Direct || (!s.JetStreamIsClustered() && s.standAloneMode()) {
o.setLeader(true)
}
@@ -2075,14 +2091,8 @@ func (o *consumer) needFlowControl() bool {
return false
}
// Decide whether to send a flow control message which we will need the user to respond.
// We send if we are at the limit or over, and at 25%, 50% and 75%.
if o.pbytes >= o.maxpb {
return true
} else if o.pfcs == 0 && o.pbytes > o.maxpb/4 {
return true
} else if o.pfcs == 1 && o.pbytes > o.maxpb/2 {
return true
} else if o.pfcs == 2 && o.pbytes > o.maxpb*3/4 {
// We send when we are over 50% of the current window.
if o.pfcs == 0 && o.pbytes > o.maxpb/2 {
return true
}
return false

View File

@@ -2018,6 +2018,12 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
}
}
e.RUnlock()
// Since callers may just check if the sublist result is nil or not,
// make sure that if what is returned by sl.Match() is the emptyResult, then
// we return nil to the caller.
if r == emptyResult {
r = nil
}
}
return psi, r
}

View File

@@ -2842,23 +2842,6 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
}
}
var streamName string
if expectDurable {
streamName = tokenAt(subject, 6)
@@ -2866,17 +2849,42 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
streamName = tokenAt(subject, 5)
}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req CreateConsumerRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = jsInvalidJSONErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
if req.Config.Direct {
// Check to see if we have this stream and are the stream leader.
if !acc.JetStreamIsStreamLeader(streamName) {
return
}
} else {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
if cc.meta != nil && cc.meta.GroupLeader() == _EMPTY_ {
resp.Error = jsClusterNotAvailErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
}
}
}
if !acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if streamName != req.Stream {
resp.Error = jsStreamMismatchErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
@@ -2914,7 +2922,7 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, subject, reply s
}
}
if s.JetStreamIsClustered() {
if s.JetStreamIsClustered() && !req.Config.Direct {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
return
}

View File

@@ -2615,7 +2615,10 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
o.store.Update(state)
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
ourID := js.cluster.meta.ID()
var ourID string
if js.cluster != nil && js.cluster.meta != nil {
ourID = js.cluster.meta.ID()
}
js.mu.RUnlock()
if peer := string(e.Data); peer == ourID {
o.stopWithFlags(true, false, false)

View File

@@ -992,7 +992,7 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo {
return mset.sourceInfo(mset.mirror)
}
const sourceHealthCheckInterval = 10 * time.Second
const sourceHealthCheckInterval = 2 * time.Second
// Will run as a Go routine to process mirror consumer messages.
func (mset *stream) processMirrorMsgs() {
@@ -1031,7 +1031,7 @@ func (mset *stream) processMirrorMsgs() {
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval
mset.mu.RUnlock()
if stalled {
mset.resetMirrorConsumer()
mset.retryMirrorConsumer()
}
}
}
@@ -1196,6 +1196,7 @@ func (mset *stream) setupMirrorConsumer() error {
}
mset.mirror.sub = sub
mset.mirror.last = time.Now()
if !mset.mirror.grr {
mset.mirror.grr = true
@@ -1216,8 +1217,9 @@ func (mset *stream) setupMirrorConsumer() error {
AckPolicy: AckNone,
AckWait: 48 * time.Hour,
MaxDeliver: 1,
Heartbeat: 10 * time.Second,
Heartbeat: sourceHealthCheckInterval,
FlowControl: true,
Direct: true,
},
}
@@ -1264,8 +1266,6 @@ func (mset *stream) setupMirrorConsumer() error {
case ccr := <-respCh:
if ccr.Error != nil {
mset.cancelMirrorConsumer()
// We will retry every 10 seconds or so
time.AfterFunc(10*time.Second, mset.retryMirrorConsumer)
} else {
// Capture consumer name.
mset.mu.Lock()
@@ -1275,8 +1275,8 @@ func (mset *stream) setupMirrorConsumer() error {
mset.mu.Unlock()
}
mset.setMirrorErr(ccr.Error)
case <-time.After(5 * time.Second):
mset.resetMirrorConsumer()
case <-time.After(10 * time.Second):
return
}
}()
@@ -1332,6 +1332,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
mset.removeInternalConsumer(si)
si.sseq, si.dseq = 0, 0
si.last = time.Now()
ssi := mset.streamSource(sname)
// Determine subjects etc.
@@ -1367,8 +1368,9 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
AckPolicy: AckNone,
AckWait: 48 * time.Hour,
MaxDeliver: 1,
Heartbeat: 10 * time.Second,
Heartbeat: sourceHealthCheckInterval,
FlowControl: true,
Direct: true,
},
}
// If starting, check any configs.
@@ -1422,7 +1424,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
si.err = ccr.Error
// We will retry every 10 seconds or so
mset.cancelSourceConsumer(sname)
time.AfterFunc(10*time.Second, func() { mset.retrySourceConsumer(sname) })
} else {
// Capture consumer name.
si.cname = ccr.ConsumerInfo.Name
@@ -1430,12 +1431,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
}
mset.mu.Unlock()
case <-time.After(10 * time.Second):
// Make sure things have not changed.
mset.mu.Lock()
if si := mset.sources[sname]; si != nil && si.cname == _EMPTY_ {
mset.setSourceConsumer(sname, seq)
}
mset.mu.Unlock()
return
}
}()
}
@@ -1753,24 +1749,6 @@ func (mset *stream) removeInternalConsumer(si *sourceInfo) {
if si == nil || si.cname == _EMPTY_ {
return
}
var ext *ExternalStream
if si == mset.mirror {
ext = mset.cfg.Mirror.External
} else {
ssi := mset.streamSource(si.name)
if ssi == nil {
return
}
ext = ssi.External
}
subject := fmt.Sprintf(JSApiConsumerDeleteT, si.name, si.cname)
if ext != nil {
subject = strings.Replace(subject, JSApiPrefix, ext.ApiPrefix, 1)
subject = strings.ReplaceAll(subject, "..", ".")
}
mset.outq.send(&jsPubMsg{subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
si.cname = _EMPTY_
}
@@ -2561,10 +2539,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
}
}
if deleteFlag {
mset.stopSourceConsumers()
}
// Send stream delete advisory after the consumers.
if deleteFlag && advisory {
mset.sendDeleteAdvisoryLocked()