mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -382,28 +382,36 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to formulate similar errors.
|
||||
badStart := func(dp, start string) error {
|
||||
return fmt.Errorf("consumer delivery policy is deliver %s, but optional start %s is also set", dp, start)
|
||||
}
|
||||
notSet := func(dp, notSet string) error {
|
||||
return fmt.Errorf("consumer delivery policy is deliver %s, but optional %s is not set", dp, notSet)
|
||||
}
|
||||
|
||||
// Check on start position conflicts.
|
||||
switch config.DeliverPolicy {
|
||||
case DeliverAll:
|
||||
if config.OptStartSeq > 0 {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver all, but optional start sequence is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("all", "sequence"))
|
||||
}
|
||||
if config.OptStartTime != nil {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver all, but optional start time is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("all", "time"))
|
||||
}
|
||||
case DeliverLast:
|
||||
if config.OptStartSeq > 0 {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last, but optional start sequence is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("last", "sequence"))
|
||||
}
|
||||
if config.OptStartTime != nil {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last, but optional start time is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("last", "time"))
|
||||
}
|
||||
case DeliverLastPerSubject:
|
||||
if config.OptStartSeq > 0 {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but optional start sequence is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("last per subject", "sequence"))
|
||||
}
|
||||
if config.OptStartTime != nil {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but optional start time is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("last per subject", "time"))
|
||||
}
|
||||
badConfig := config.FilterSubject == _EMPTY_
|
||||
if !badConfig {
|
||||
@@ -413,28 +421,28 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
}
|
||||
}
|
||||
if badConfig {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but filter subject is not set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(notSet("deliver last per subject", "filter subject"))
|
||||
}
|
||||
case DeliverNew:
|
||||
if config.OptStartSeq > 0 {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver new, but optional start sequence is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("new", "sequence"))
|
||||
}
|
||||
if config.OptStartTime != nil {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver new, but optional start time is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("new", "time"))
|
||||
}
|
||||
case DeliverByStartSequence:
|
||||
if config.OptStartSeq == 0 {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver by start sequence, but optional start sequence is not set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(notSet("deliver by start sequence", "start sequence"))
|
||||
}
|
||||
if config.OptStartTime != nil {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver by start sequence, but optional start time is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("deliver by start sequence", "time"))
|
||||
}
|
||||
case DeliverByStartTime:
|
||||
if config.OptStartTime == nil {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver by start time, but optional start time is not set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(notSet("deliver by start time", "start time"))
|
||||
}
|
||||
if config.OptStartSeq != 0 {
|
||||
return nil, NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver by start time, but optional start sequence is also set"))
|
||||
return nil, NewJSConsumerInvalidPolicyError(badStart("deliver by start time", "start sequence"))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,23 +471,15 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
return nil, errors.New("invalid stream")
|
||||
}
|
||||
|
||||
// If this one is durable and already exists, we let that be ok as long as the configs match.
|
||||
// If this one is durable and already exists, we let that be ok as long as only updating what should be allowed.
|
||||
if isDurableConsumer(config) {
|
||||
if eo, ok := mset.consumers[config.Durable]; ok {
|
||||
mset.mu.Unlock()
|
||||
ocfg := eo.config()
|
||||
if reflect.DeepEqual(&ocfg, config) {
|
||||
err := eo.updateConfig(config)
|
||||
if err == nil {
|
||||
return eo, nil
|
||||
} else {
|
||||
// If we are a push mode and not active and the only difference
|
||||
// is deliver subject then update and return.
|
||||
if configsEqualSansDelivery(ocfg, *config) && eo.hasNoLocalInterest() {
|
||||
eo.updateDeliverSubject(config.DeliverSubject)
|
||||
return eo, nil
|
||||
} else {
|
||||
return nil, NewJSConsumerNameExistError()
|
||||
}
|
||||
}
|
||||
return nil, NewJSConsumerCreateError(err, Unless(err))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -566,24 +566,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we have a rate limit set.
|
||||
if config.RateLimit != 0 {
|
||||
// TODO(dlc) - Make sane values or error if not sane?
|
||||
// We are configured in bits per sec so adjust to bytes.
|
||||
rl := rate.Limit(config.RateLimit / 8)
|
||||
// Burst should be set to maximum msg size for this account, etc.
|
||||
var burst int
|
||||
if mset.cfg.MaxMsgSize > 0 {
|
||||
burst = int(mset.cfg.MaxMsgSize)
|
||||
} else if mset.jsa.account.limits.mpay > 0 {
|
||||
burst = int(mset.jsa.account.limits.mpay)
|
||||
} else {
|
||||
s := mset.jsa.account.srv
|
||||
burst = int(s.getOpts().MaxPayload)
|
||||
}
|
||||
o.rlimit = rate.NewLimiter(rl, burst)
|
||||
}
|
||||
|
||||
// Check if we have filtered subject that is a wildcard.
|
||||
if config.FilterSubject != _EMPTY_ && subjectHasWildcard(config.FilterSubject) {
|
||||
o.filterWC = true
|
||||
@@ -667,6 +649,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
o.setConsumerAssignment(ca)
|
||||
}
|
||||
|
||||
// Check if we have a rate limit set.
|
||||
if config.RateLimit != 0 {
|
||||
o.setRateLimit(config.RateLimit)
|
||||
}
|
||||
|
||||
mset.setConsumer(o)
|
||||
mset.mu.Unlock()
|
||||
|
||||
@@ -1138,13 +1125,146 @@ func (o *consumer) forceExpirePending() {
|
||||
o.signalNewMessages()
|
||||
}
|
||||
|
||||
// Acquire proper locks and update rate limit.
|
||||
// Will use what is in config.
|
||||
func (o *consumer) setRateLimitNeedsLocks() {
|
||||
o.mu.RLock()
|
||||
mset := o.mset
|
||||
o.mu.RUnlock()
|
||||
|
||||
if mset == nil {
|
||||
return
|
||||
}
|
||||
|
||||
mset.mu.RLock()
|
||||
o.mu.Lock()
|
||||
o.setRateLimit(o.cfg.RateLimit)
|
||||
o.mu.Unlock()
|
||||
mset.mu.RUnlock()
|
||||
}
|
||||
|
||||
// Set the rate limiter
|
||||
// Both mset and consumer lock should be held.
|
||||
func (o *consumer) setRateLimit(bps uint64) {
|
||||
if bps == 0 {
|
||||
o.rlimit = nil
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(dlc) - Make sane values or error if not sane?
|
||||
// We are configured in bits per sec so adjust to bytes.
|
||||
rl := rate.Limit(bps / 8)
|
||||
mset := o.mset
|
||||
|
||||
// Burst should be set to maximum msg size for this account, etc.
|
||||
var burst int
|
||||
if mset.cfg.MaxMsgSize > 0 {
|
||||
burst = int(mset.cfg.MaxMsgSize)
|
||||
} else if mset.jsa.account.limits.mpay > 0 {
|
||||
burst = int(mset.jsa.account.limits.mpay)
|
||||
} else {
|
||||
s := mset.jsa.account.srv
|
||||
burst = int(s.getOpts().MaxPayload)
|
||||
}
|
||||
|
||||
o.rlimit = rate.NewLimiter(rl, burst)
|
||||
}
|
||||
|
||||
// Check if new consumer config allowed vs old.
|
||||
func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
|
||||
if reflect.DeepEqual(cfg, ncfg) {
|
||||
return nil
|
||||
}
|
||||
// Something different, so check since we only allow certain things to be updated.
|
||||
if cfg.FilterSubject != ncfg.FilterSubject {
|
||||
return errors.New("filter subject can not be updated")
|
||||
}
|
||||
if cfg.DeliverPolicy != ncfg.DeliverPolicy {
|
||||
return errors.New("deliver policy can not be updated")
|
||||
}
|
||||
if cfg.OptStartSeq != ncfg.OptStartSeq {
|
||||
return errors.New("start sequence can not be updated")
|
||||
}
|
||||
if cfg.OptStartTime != ncfg.OptStartTime {
|
||||
return errors.New("start time can not be updated")
|
||||
}
|
||||
if cfg.AckPolicy != ncfg.AckPolicy {
|
||||
return errors.New("ack policy can not be updated")
|
||||
}
|
||||
if cfg.ReplayPolicy != ncfg.ReplayPolicy {
|
||||
return errors.New("replay policy can not be updated")
|
||||
}
|
||||
if cfg.Heartbeat != ncfg.Heartbeat {
|
||||
return errors.New("heart beats can not be updated")
|
||||
}
|
||||
if cfg.FlowControl != ncfg.FlowControl {
|
||||
return errors.New("flow control can not be updated")
|
||||
}
|
||||
|
||||
// Deliver Subject is conditional on if its bound.
|
||||
if cfg.DeliverSubject != ncfg.DeliverSubject {
|
||||
if cfg.DeliverSubject == _EMPTY_ {
|
||||
return errors.New("can not update pull consumer to push based")
|
||||
}
|
||||
rr := acc.sl.Match(cfg.DeliverSubject)
|
||||
if len(rr.psubs)+len(rr.qsubs) != 0 {
|
||||
return NewJSConsumerNameExistError()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the config based on the new config, or error if update not allowed.
|
||||
func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
if err := o.acc.checkNewConsumerConfig(&o.cfg, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// DeliverSubject
|
||||
if cfg.DeliverSubject != o.cfg.DeliverSubject {
|
||||
o.updateDeliverSubjectLocked(cfg.DeliverSubject)
|
||||
}
|
||||
|
||||
// MaxAckPending
|
||||
if cfg.MaxAckPending != o.cfg.MaxAckPending {
|
||||
o.maxp = cfg.MaxAckPending
|
||||
o.signalNewMessages()
|
||||
}
|
||||
// AckWait
|
||||
if cfg.AckWait != o.cfg.AckWait {
|
||||
if o.ptmr != nil {
|
||||
o.ptmr.Reset(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
// Rate Limit
|
||||
if cfg.RateLimit != o.cfg.RateLimit {
|
||||
// We need both locks here so do in Go routine.
|
||||
go o.setRateLimitNeedsLocks()
|
||||
}
|
||||
|
||||
// Record new config for others that do not need special handling.
|
||||
// Allowed but considered no-op, [Description, MaxDeliver, SampleFrequency, MaxWaiting, HeadersOnly]
|
||||
o.cfg = *cfg
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is a config change for the delivery subject for a
|
||||
// push based consumer.
|
||||
func (o *consumer) updateDeliverSubject(newDeliver string) {
|
||||
// Update the config and the dsubj
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
o.updateDeliverSubjectLocked(newDeliver)
|
||||
}
|
||||
|
||||
// This is a config change for the delivery subject for a
|
||||
// push based consumer.
|
||||
func (o *consumer) updateDeliverSubjectLocked(newDeliver string) {
|
||||
if o.closed || o.isPullMode() || o.cfg.DeliverSubject == newDeliver {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -30,73 +30,6 @@ import (
|
||||
"github.com/nats-io/nkeys"
|
||||
)
|
||||
|
||||
func require_True(t *testing.T, b bool) {
|
||||
t.Helper()
|
||||
if !b {
|
||||
t.Fatalf("require true, but got false")
|
||||
}
|
||||
}
|
||||
|
||||
func require_False(t *testing.T, b bool) {
|
||||
t.Helper()
|
||||
if b {
|
||||
t.Fatalf("require no false, but got true")
|
||||
}
|
||||
}
|
||||
|
||||
func require_NoError(t testing.TB, err error) {
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
t.Fatalf("require no error, but got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func require_Contains(t *testing.T, s string, subStrs ...string) {
|
||||
t.Helper()
|
||||
for _, subStr := range subStrs {
|
||||
if !strings.Contains(s, subStr) {
|
||||
t.Fatalf("require %q to be contained in %q", subStr, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func require_Error(t *testing.T, err error, expected ...error) {
|
||||
t.Helper()
|
||||
if err == nil {
|
||||
t.Fatalf("require error, but got none")
|
||||
}
|
||||
if len(expected) == 0 {
|
||||
return
|
||||
}
|
||||
for _, e := range expected {
|
||||
if err == e || strings.Contains(e.Error(), err.Error()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatalf("Expected one of %+v, got '%v'", expected, err)
|
||||
}
|
||||
|
||||
func require_Equal(t *testing.T, a, b string) {
|
||||
t.Helper()
|
||||
if strings.Compare(a, b) != 0 {
|
||||
t.Fatalf("require equal, but got: %v != %v", a, b)
|
||||
}
|
||||
}
|
||||
|
||||
func require_NotEqual(t *testing.T, a, b [32]byte) {
|
||||
t.Helper()
|
||||
if bytes.Equal(a[:], b[:]) {
|
||||
t.Fatalf("require not equal, but got: %v != %v", a, b)
|
||||
}
|
||||
}
|
||||
|
||||
func require_Len(t *testing.T, a, b int) {
|
||||
t.Helper()
|
||||
if a != b {
|
||||
t.Fatalf("require len, but got: %v != %v", a, b)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShardedDirStoreWriteAndReadonly(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir := createDir(t, "jwtstore_test")
|
||||
|
||||
@@ -1696,7 +1696,7 @@ func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) {
|
||||
js.mu.Unlock()
|
||||
|
||||
for _, ca := range consumers {
|
||||
js.processClusterCreateConsumer(ca, nil)
|
||||
js.processClusterCreateConsumer(ca, nil, false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2544,7 +2544,6 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
|
||||
ourID = cc.meta.ID()
|
||||
}
|
||||
var isMember bool
|
||||
|
||||
if ca.Group != nil && ourID != _EMPTY_ {
|
||||
isMember = ca.Group.isMember(ourID)
|
||||
}
|
||||
@@ -2579,15 +2578,15 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
|
||||
return
|
||||
}
|
||||
|
||||
// Track if this existed already.
|
||||
var wasExisting bool
|
||||
|
||||
// Check if we have an existing consumer assignment.
|
||||
js.mu.Lock()
|
||||
if sa.consumers == nil {
|
||||
sa.consumers = make(map[string]*consumerAssignment)
|
||||
} else if oca := sa.consumers[ca.Name]; oca != nil && !oca.pending {
|
||||
// Copy over private existing state from former CA.
|
||||
ca.Group.node = oca.Group.node
|
||||
ca.responded = oca.responded
|
||||
ca.err = oca.err
|
||||
} else if oca := sa.consumers[ca.Name]; oca != nil {
|
||||
wasExisting = true
|
||||
}
|
||||
|
||||
// Capture the optional state. We will pass it along if we are a member to apply.
|
||||
@@ -2602,7 +2601,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
|
||||
|
||||
// Check if this is for us..
|
||||
if isMember {
|
||||
js.processClusterCreateConsumer(ca, state)
|
||||
js.processClusterCreateConsumer(ca, state, wasExisting)
|
||||
} else {
|
||||
// Check if we have a raft node running, meaning we are no longer part of the group but were.
|
||||
js.mu.Lock()
|
||||
@@ -2649,7 +2648,7 @@ type consumerAssignmentResult struct {
|
||||
}
|
||||
|
||||
// processClusterCreateConsumer is when we are a member of the group and need to create the consumer.
|
||||
func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state *ConsumerState) {
|
||||
func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state *ConsumerState, wasExisting bool) {
|
||||
if ca == nil {
|
||||
return
|
||||
}
|
||||
@@ -2683,38 +2682,47 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
return
|
||||
}
|
||||
|
||||
// Process the raft group and make sure its running if needed.
|
||||
js.createRaftGroup(rg, mset.config().Storage)
|
||||
|
||||
// Check if we already have this consumer running.
|
||||
o := mset.lookupConsumer(ca.Name)
|
||||
if o != nil {
|
||||
if o.isDurable() && o.isPushMode() {
|
||||
ocfg := o.config()
|
||||
if ocfg == *ca.Config || (configsEqualSansDelivery(ocfg, *ca.Config) && o.hasNoLocalInterest()) {
|
||||
o.updateDeliverSubject(ca.Config.DeliverSubject)
|
||||
} else {
|
||||
// This is essentially and update that has failed.
|
||||
js.mu.Lock()
|
||||
result := &consumerAssignmentResult{
|
||||
Account: ca.Client.serviceAccount(),
|
||||
Stream: ca.Stream,
|
||||
Consumer: ca.Name,
|
||||
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
|
||||
}
|
||||
result.Response.Error = NewJSConsumerNameExistError()
|
||||
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
|
||||
js.mu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
o.setConsumerAssignment(ca)
|
||||
s.Debugf("JetStream cluster, consumer was already running")
|
||||
if !alreadyRunning {
|
||||
// Process the raft group and make sure its running if needed.
|
||||
js.createRaftGroup(rg, mset.config().Storage)
|
||||
}
|
||||
|
||||
// Add in the consumer if needed.
|
||||
// Check if we already have this consumer running.
|
||||
var didCreate bool
|
||||
o := mset.lookupConsumer(ca.Name)
|
||||
if o == nil {
|
||||
// Add in the consumer if needed.
|
||||
o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca)
|
||||
didCreate = true
|
||||
} else {
|
||||
if err := o.updateConfig(ca.Config); err != nil {
|
||||
// This is essentially an update that has failed.
|
||||
js.mu.Lock()
|
||||
result := &consumerAssignmentResult{
|
||||
Account: ca.Client.serviceAccount(),
|
||||
Stream: ca.Stream,
|
||||
Consumer: ca.Name,
|
||||
Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}},
|
||||
}
|
||||
result.Response.Error = NewJSConsumerNameExistError()
|
||||
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result)
|
||||
js.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Check if we already had a consumer assignment and its still pending.
|
||||
cca, oca := ca, o.consumerAssignment()
|
||||
js.mu.Lock()
|
||||
if oca != nil && !oca.responded {
|
||||
// We can't over ride info for replying here otherwise leader once elected can not respond.
|
||||
// So just update Config, leave off client and reply to the originals.
|
||||
cac := *oca
|
||||
cac.Config = ca.Config
|
||||
cca = &cac
|
||||
}
|
||||
js.mu.Unlock()
|
||||
// Set CA for our consumer.
|
||||
o.setConsumerAssignment(cca)
|
||||
s.Debugf("JetStream cluster, consumer was already running")
|
||||
}
|
||||
|
||||
// If we have an initial state set apply that now.
|
||||
@@ -2763,15 +2771,27 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
|
||||
s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b)
|
||||
}
|
||||
} else {
|
||||
o.setCreatedTime(ca.Created)
|
||||
if didCreate {
|
||||
o.setCreatedTime(ca.Created)
|
||||
}
|
||||
// Start our monitoring routine.
|
||||
if rg.node != nil {
|
||||
if rg.node == nil {
|
||||
// Single replica consumer, process manually here.
|
||||
js.processConsumerLeaderChange(o, true)
|
||||
} else {
|
||||
if !alreadyRunning {
|
||||
s.startGoRoutine(func() { js.monitorConsumer(o, ca) })
|
||||
}
|
||||
} else {
|
||||
// Single replica consumer, process manually here.
|
||||
js.processConsumerLeaderChange(o, true)
|
||||
// Process if existing.
|
||||
if wasExisting && (o.isLeader() || (!didCreate && rg.node.GroupLeader() == _EMPTY_)) {
|
||||
// This is essentially an update, so make sure to respond if needed.
|
||||
js.mu.RLock()
|
||||
client, subject, reply := ca.Client, ca.Subject, ca.Reply
|
||||
js.mu.RUnlock()
|
||||
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
|
||||
resp.ConsumerInfo = o.info()
|
||||
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4266,52 +4286,71 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
|
||||
cfg.MaxAckPending = JsDefaultMaxAckPending
|
||||
}
|
||||
|
||||
rg := cc.createGroupForConsumer(sa)
|
||||
if rg == nil {
|
||||
resp.Error = NewJSInsufficientResourcesError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
// Pick a preferred leader.
|
||||
rg.setPreferred()
|
||||
|
||||
// We need to set the ephemeral here before replicating.
|
||||
var ca *consumerAssignment
|
||||
var oname string
|
||||
if !isDurableConsumer(cfg) {
|
||||
// We chose to have ephemerals be R=1 unless stream is interest or workqueue.
|
||||
if sa.Config.Retention == LimitsPolicy {
|
||||
rg.Peers = []string{rg.Preferred}
|
||||
rg.Name = groupNameForConsumer(rg.Peers, rg.Storage)
|
||||
}
|
||||
// Make sure name is unique.
|
||||
for {
|
||||
oname = createConsumerName()
|
||||
if sa.consumers != nil {
|
||||
if sa.consumers[oname] != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
} else {
|
||||
|
||||
// See if we have an existing one already under same durable name.
|
||||
if isDurableConsumer(cfg) {
|
||||
oname = cfg.Durable
|
||||
if ca := sa.consumers[oname]; ca != nil && !ca.deleted {
|
||||
isPull := ca.Config.DeliverSubject == _EMPTY_
|
||||
// This can be ok if delivery subject update.
|
||||
shouldErr := isPull || ca.pending || (!reflect.DeepEqual(cfg, ca.Config) && !configsEqualSansDelivery(*cfg, *ca.Config))
|
||||
if !shouldErr {
|
||||
rr := acc.sl.Match(ca.Config.DeliverSubject)
|
||||
shouldErr = len(rr.psubs)+len(rr.qsubs) != 0
|
||||
}
|
||||
if shouldErr {
|
||||
resp.Error = NewJSConsumerNameExistError()
|
||||
if ca = sa.consumers[oname]; ca != nil && !ca.deleted {
|
||||
// Do quick sanity check on new cfg to prevent here if possible.
|
||||
if err := acc.checkNewConsumerConfig(ca.Config, cfg); err != nil {
|
||||
resp.Error = NewJSConsumerCreateError(err, Unless(err))
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
|
||||
// If this is new consumer.
|
||||
if ca == nil {
|
||||
rg := cc.createGroupForConsumer(sa)
|
||||
if rg == nil {
|
||||
resp.Error = NewJSInsufficientResourcesError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
// Pick a preferred leader.
|
||||
rg.setPreferred()
|
||||
|
||||
// We need to set the ephemeral here before replicating.
|
||||
if !isDurableConsumer(cfg) {
|
||||
// We chose to have ephemerals be R=1 unless stream is interest or workqueue.
|
||||
if sa.Config.Retention == LimitsPolicy {
|
||||
rg.Peers = []string{rg.Preferred}
|
||||
rg.Name = groupNameForConsumer(rg.Peers, rg.Storage)
|
||||
}
|
||||
// Make sure name is unique.
|
||||
for {
|
||||
oname = createConsumerName()
|
||||
if sa.consumers != nil {
|
||||
if sa.consumers[oname] != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
ca = &consumerAssignment{
|
||||
Group: rg,
|
||||
Stream: stream,
|
||||
Name: oname,
|
||||
Config: cfg,
|
||||
Subject: subject,
|
||||
Reply: reply,
|
||||
Client: ci,
|
||||
Created: time.Now().UTC(),
|
||||
}
|
||||
} else {
|
||||
// Update config and client info on copy of existing.
|
||||
nca := *ca
|
||||
nca.Config = cfg
|
||||
nca.Client = ci
|
||||
nca.Subject = subject
|
||||
nca.Reply = reply
|
||||
ca = &nca
|
||||
}
|
||||
|
||||
eca := encodeAddConsumerAssignment(ca)
|
||||
|
||||
// Mark this as pending.
|
||||
|
||||
@@ -1426,9 +1426,9 @@ func TestJetStreamClusterDoubleAdd(t *testing.T) {
|
||||
if _, err := js.AddConsumer("TEST", cfg); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
// Check double add fails.
|
||||
if _, err := js.AddConsumer("TEST", cfg); err == nil || err == nats.ErrTimeout {
|
||||
t.Fatalf("Expected error but got none or timeout")
|
||||
// Check double add ok.
|
||||
if _, err := js.AddConsumer("TEST", cfg); err != nil {
|
||||
t.Fatalf("Expected no error but got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8437,7 +8437,7 @@ func TestJetStreamRaceOnRAFTCreate(t *testing.T) {
|
||||
t.Fatalf("Error creating stream: %v", err)
|
||||
}
|
||||
|
||||
js, err = nc.JetStream(nats.MaxWait(time.Second))
|
||||
js, err = nc.JetStream(nats.MaxWait(2 * time.Second))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -8446,12 +8446,12 @@ func TestJetStreamRaceOnRAFTCreate(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(size)
|
||||
for i := 0; i < size; i++ {
|
||||
go func() {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
if _, err := js.PullSubscribe("foo", "shared"); err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
t.Errorf("Unexpected error on %v: %v", i, err)
|
||||
}
|
||||
}()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
@@ -9440,6 +9440,148 @@ func TestJetStreamClusterAccountInfoForSystemAccount(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamConsumerUpdates(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer removeDir(t, config.StoreDir)
|
||||
}
|
||||
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 5)
|
||||
defer c.shutdown()
|
||||
|
||||
testConsumerUpdate := func(t *testing.T, s *Server, replicas int) {
|
||||
nc, js := jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
// Create a stream.
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo", "bar"},
|
||||
Replicas: replicas,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
js.PublishAsync("foo", []byte("OK"))
|
||||
}
|
||||
|
||||
cfg := &nats.ConsumerConfig{
|
||||
Durable: "dlc",
|
||||
Description: "Update TEST",
|
||||
FilterSubject: "foo",
|
||||
DeliverSubject: "d.foo",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
AckWait: time.Minute,
|
||||
MaxDeliver: 5,
|
||||
MaxAckPending: 50,
|
||||
}
|
||||
|
||||
_, err = js.AddConsumer("TEST", cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
// Update delivery subject, which worked before, but upon review had issues unless replica count == clustered size.
|
||||
cfg.DeliverSubject = "d.bar"
|
||||
_, err = js.AddConsumer("TEST", cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
// Bind deliver subject.
|
||||
sub, err := nc.SubscribeSync("d.bar")
|
||||
require_NoError(t, err)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
ncfg := *cfg
|
||||
ncfg.DeliverSubject = "d.baz"
|
||||
|
||||
// Should fail.
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
// Description
|
||||
cfg.Description = "New Description"
|
||||
_, err = js.AddConsumer("TEST", cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
// MaxAckPending
|
||||
checkSubsPending(t, sub, 50)
|
||||
cfg.MaxAckPending = 75
|
||||
_, err = js.AddConsumer("TEST", cfg)
|
||||
require_NoError(t, err)
|
||||
checkSubsPending(t, sub, 75)
|
||||
|
||||
// Drain sub, do not ack first ten though so we can test shortening AckWait.
|
||||
for i := 0; i < 100; i++ {
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
if i >= 10 {
|
||||
m.Ack()
|
||||
}
|
||||
}
|
||||
|
||||
// AckWait
|
||||
checkSubsPending(t, sub, 0)
|
||||
cfg.AckWait = 200 * time.Millisecond
|
||||
_, err = js.AddConsumer("TEST", cfg)
|
||||
require_NoError(t, err)
|
||||
checkSubsPending(t, sub, 10)
|
||||
|
||||
// Rate Limit
|
||||
cfg.RateLimit = 8 * 1024
|
||||
_, err = js.AddConsumer("TEST", cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
cfg.RateLimit = 0
|
||||
_, err = js.AddConsumer("TEST", cfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
// These all should fail.
|
||||
ncfg = *cfg
|
||||
ncfg.FilterSubject = "bar"
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
ncfg = *cfg
|
||||
ncfg.DeliverPolicy = nats.DeliverLastPolicy
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
ncfg = *cfg
|
||||
ncfg.OptStartSeq = 22
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
ncfg = *cfg
|
||||
now := time.Now()
|
||||
ncfg.OptStartTime = &now
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
ncfg = *cfg
|
||||
ncfg.AckPolicy = nats.AckAllPolicy
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
ncfg = *cfg
|
||||
ncfg.ReplayPolicy = nats.ReplayOriginalPolicy
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
ncfg = *cfg
|
||||
ncfg.Heartbeat = time.Second
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
ncfg = *cfg
|
||||
ncfg.FlowControl = true
|
||||
_, err = js.AddConsumer("TEST", &ncfg)
|
||||
require_Error(t, err)
|
||||
|
||||
}
|
||||
|
||||
t.Run("Single", func(t *testing.T) { testConsumerUpdate(t, s, 1) })
|
||||
t.Run("Clustered", func(t *testing.T) { testConsumerUpdate(t, c.randomServer(), 2) })
|
||||
}
|
||||
|
||||
// Support functions
|
||||
|
||||
// Used to setup superclusters for tests.
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
@@ -59,6 +60,73 @@ type cluster struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func require_True(t *testing.T, b bool) {
|
||||
t.Helper()
|
||||
if !b {
|
||||
t.Fatalf("require true, but got false")
|
||||
}
|
||||
}
|
||||
|
||||
func require_False(t *testing.T, b bool) {
|
||||
t.Helper()
|
||||
if b {
|
||||
t.Fatalf("require no false, but got true")
|
||||
}
|
||||
}
|
||||
|
||||
func require_NoError(t testing.TB, err error) {
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
t.Fatalf("require no error, but got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func require_Contains(t *testing.T, s string, subStrs ...string) {
|
||||
t.Helper()
|
||||
for _, subStr := range subStrs {
|
||||
if !strings.Contains(s, subStr) {
|
||||
t.Fatalf("require %q to be contained in %q", subStr, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func require_Error(t *testing.T, err error, expected ...error) {
|
||||
t.Helper()
|
||||
if err == nil {
|
||||
t.Fatalf("require error, but got none")
|
||||
}
|
||||
if len(expected) == 0 {
|
||||
return
|
||||
}
|
||||
for _, e := range expected {
|
||||
if err == e || strings.Contains(e.Error(), err.Error()) {
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatalf("Expected one of %+v, got '%v'", expected, err)
|
||||
}
|
||||
|
||||
func require_Equal(t *testing.T, a, b string) {
|
||||
t.Helper()
|
||||
if strings.Compare(a, b) != 0 {
|
||||
t.Fatalf("require equal, but got: %v != %v", a, b)
|
||||
}
|
||||
}
|
||||
|
||||
func require_NotEqual(t *testing.T, a, b [32]byte) {
|
||||
t.Helper()
|
||||
if bytes.Equal(a[:], b[:]) {
|
||||
t.Fatalf("require not equal, but got: %v != %v", a, b)
|
||||
}
|
||||
}
|
||||
|
||||
func require_Len(t *testing.T, a, b int) {
|
||||
t.Helper()
|
||||
if a != b {
|
||||
t.Fatalf("require len, but got: %v != %v", a, b)
|
||||
}
|
||||
}
|
||||
|
||||
func checkNatsError(t *testing.T, e *ApiError, id ErrorIdentifier) {
|
||||
t.Helper()
|
||||
ae, ok := ApiErrors[id]
|
||||
|
||||
Reference in New Issue
Block a user