diff --git a/server/consumer.go b/server/consumer.go index 7fc1c9c9..fa593c43 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -116,6 +116,59 @@ type SequenceInfo struct { type CreateConsumerRequest struct { Stream string `json:"stream_name"` Config ConsumerConfig `json:"config"` + Action ConsumerAction `json:"action"` +} + +type ConsumerAction int + +const ( + ActionCreateOrUpdate ConsumerAction = iota + ActionUpdate + ActionCreate +) + +const ( + actionUpdateString = "update" + actionCreateString = "create" + actionCreateOrUpdateString = "" +) + +func (a ConsumerAction) String() string { + switch a { + case ActionCreateOrUpdate: + return actionCreateOrUpdateString + case ActionCreate: + return actionCreateString + case ActionUpdate: + return actionUpdateString + } + return actionCreateOrUpdateString +} +func (a ConsumerAction) MarshalJSON() ([]byte, error) { + switch a { + case ActionCreate: + return json.Marshal(actionCreateString) + case ActionUpdate: + return json.Marshal(actionUpdateString) + case ActionCreateOrUpdate: + return json.Marshal(actionCreateOrUpdateString) + default: + return nil, fmt.Errorf("can not marshal %v", a) + } +} + +func (a *ConsumerAction) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString("create"): + *a = ActionCreate + case jsonString("update"): + *a = ActionUpdate + case jsonString(""): + *a = ActionCreateOrUpdate + default: + return fmt.Errorf("unknown consumer action: %v", string(data)) + } + return nil } // ConsumerNakOptions is for optional NAK values, e.g. delay. @@ -620,11 +673,15 @@ func checkConsumerCfg( return nil } -func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { - return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false) +func (mset *stream) addConsumerWithAction(config *ConsumerConfig, action ConsumerAction) (*consumer, error) { + return mset.addConsumerWithAssignment(config, _EMPTY_, nil, false, action) } -func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool) (*consumer, error) { +func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { + return mset.addConsumerWithAction(config, ActionCreateOrUpdate) +} + +func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) { mset.mu.RLock() s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc retention := cfg.Retention @@ -692,6 +749,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if cName != _EMPTY_ { if eo, ok := mset.consumers[cName]; ok { mset.mu.Unlock() + if action == ActionCreate && !reflect.DeepEqual(*config, eo.config()) { + return nil, NewJSConsumerAlreadyExistsError() + } err := eo.updateConfig(config) if err == nil { return eo, nil @@ -699,6 +759,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerCreateError(err, Unless(err)) } } + if action == ActionUpdate { + mset.mu.Unlock() + return nil, NewJSConsumerDoesNotExistError() + } // Check for any limits, if the config for the consumer sets a limit we check against that // but if not we use the value from account limits, if account limits is more restrictive diff --git a/server/errors.json b/server/errors.json index a2d525f2..84c4b338 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1458,5 +1458,25 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerAlreadyExists", + "code": 400, + "error_code": 10148, + "description": "consumer already exists", + "comment": "action CREATE is used for a existing consumer with a different config", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerDoesNotExist", + "code": 400, + "error_code": 10149, + "description": "consumer does not exist", + "comment": "action UPDATE is used for a nonexisting consumer", + "help": "", + "url": "", + "deprecates": "" } -] \ No newline at end of file +] diff --git a/server/jetstream.go b/server/jetstream.go index d7e2b33f..2bdccdc6 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1397,7 +1397,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro // the consumer can reconnect. We will create it as a durable and switch it. cfg.ConsumerConfig.Durable = ofi.Name() } - obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true) + obs, err := e.mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate) if err != nil { s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) continue diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 1fcded12..e66e94ab 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3869,9 +3869,9 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun // during this call, so place in Go routine to not block client. // Router and Gateway API calls already in separate context. if c.kind != ROUTER && c.kind != GATEWAY { - go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config) + go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action) } else { - s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config) + s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action) } return } @@ -3890,7 +3890,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun return } - o, err := stream.addConsumer(&req.Config) + o, err := stream.addConsumerWithAction(&req.Config, req.Action) if err != nil { if IsNatsErr(err, JSConsumerStoreFailedErrF) { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 749c4c94..89a23257 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4058,7 +4058,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state var didCreate, isConfigUpdate, needsLocalResponse bool if o == nil { // Add in the consumer if needed. - if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil { + if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false, ActionCreateOrUpdate); err == nil { didCreate = true } } else { @@ -6793,7 +6793,7 @@ func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *stre } // jsClusteredConsumerRequest is first point of entry to create a consumer with R > 1. -func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig) { +func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig, action ConsumerAction) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return @@ -6894,6 +6894,10 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec oname = cfg.Durable } if ca = sa.consumers[oname]; ca != nil && !ca.deleted { + if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) { + resp.Error = NewJSConsumerAlreadyExistsError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + } // Do quick sanity check on new cfg to prevent here if possible. if err := acc.checkNewConsumerConfig(ca.Config, cfg); err != nil { resp.Error = NewJSConsumerCreateError(err, Unless(err)) @@ -6905,6 +6909,11 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // If this is new consumer. if ca == nil { + if action == ActionUpdate { + resp.Error = NewJSConsumerDoesNotExistError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } rg := cc.createGroupForConsumer(cfg, sa) if rg == nil { resp.Error = NewJSInsufficientResourcesError() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 1ec12116..f99c0956 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4399,6 +4399,93 @@ func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) { // Make sure no other errors showed up require_True(t, len(errCh) == 0) } +func TestJetStreamClusterConsumerActions(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + var err error + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"test"}, + }) + require_NoError(t, err) + + ecSubj := fmt.Sprintf(JSApiConsumerCreateExT, "TEST", "CONSUMER", "test") + crReq := CreateConsumerRequest{ + Stream: "TEST", + Config: ConsumerConfig{ + DeliverPolicy: DeliverLast, + FilterSubject: "test", + AckPolicy: AckExplicit, + }, + } + + // A new consumer. Should not be an error. + crReq.Action = ActionCreate + req, err := json.Marshal(crReq) + require_NoError(t, err) + resp, err := nc.Request(ecSubj, req, 500*time.Millisecond) + require_NoError(t, err) + var ccResp JSApiConsumerCreateResponse + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error != nil { + t.Fatalf("Unexpected error: %v", ccResp.Error) + } + ccResp.Error = nil + + // Consumer exists, but config is the same, so should be ok + resp, err = nc.Request(ecSubj, req, 500*time.Millisecond) + require_NoError(t, err) + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error != nil { + t.Fatalf("Unexpected er response: %v", ccResp.Error) + } + ccResp.Error = nil + // Consumer exists. Config is different, so should error + crReq.Config.Description = "changed" + req, err = json.Marshal(crReq) + require_NoError(t, err) + resp, err = nc.Request(ecSubj, req, 500*time.Millisecond) + require_NoError(t, err) + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error == nil { + t.Fatalf("Unexpected ok response") + } + + ccResp.Error = nil + // Consumer update, so update should be ok + crReq.Action = ActionUpdate + crReq.Config.Description = "changed again" + req, err = json.Marshal(crReq) + require_NoError(t, err) + resp, err = nc.Request(ecSubj, req, 500*time.Millisecond) + require_NoError(t, err) + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error != nil { + t.Fatalf("Unexpected error response: %v", ccResp.Error) + } + + ecSubj = fmt.Sprintf(JSApiConsumerCreateExT, "TEST", "NEW", "test") + ccResp.Error = nil + // Updating new consumer, so should error + crReq.Config.Name = "NEW" + req, err = json.Marshal(crReq) + require_NoError(t, err) + resp, err = nc.Request(ecSubj, req, 500*time.Millisecond) + require_NoError(t, err) + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error == nil { + t.Fatalf("Unexpected ok response") + } +} func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 6c030f73..161f83d2 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -17,6 +17,7 @@ package server import ( + "encoding/json" "fmt" "math/rand" "sort" @@ -449,3 +450,171 @@ func TestJetStreamConsumerMultipleFiltersSequence(t *testing.T) { require_True(t, string(msg.Data) == fmt.Sprintf("%d", i)) } } + +func TestJetStreamConsumerActions(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Name: "TEST", + Retention: LimitsPolicy, + Subjects: []string{"one", "two", "three", "four", "five.>"}, + MaxAge: time.Second * 90, + }) + require_NoError(t, err) + + // Create Consumer. No consumers existed before, so should be fine. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "DUR", + FilterSubjects: []string{"one", "two"}, + AckPolicy: AckExplicit, + DeliverPolicy: DeliverAll, + AckWait: time.Second * 30, + }, ActionCreate) + require_NoError(t, err) + // Create consumer again. Should be ok if action is CREATE but config is exactly the same. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "DUR", + FilterSubjects: []string{"one", "two"}, + AckPolicy: AckExplicit, + DeliverPolicy: DeliverAll, + AckWait: time.Second * 30, + }, ActionCreate) + require_NoError(t, err) + // Create consumer again. Should error if action is CREATE. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "DUR", + FilterSubjects: []string{"one"}, + AckPolicy: AckExplicit, + DeliverPolicy: DeliverAll, + AckWait: time.Second * 30, + }, ActionCreate) + require_Error(t, err) + + // Update existing consumer. Should be fine, as consumer exists. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "DUR", + FilterSubjects: []string{"one"}, + AckPolicy: AckExplicit, + DeliverPolicy: DeliverAll, + AckWait: time.Second * 30, + }, ActionUpdate) + require_NoError(t, err) + + // Update consumer. Should error, as this consumer does not exist. + _, err = mset.addConsumerWithAction(&ConsumerConfig{ + Durable: "NEW", + FilterSubjects: []string{"one"}, + AckPolicy: AckExplicit, + DeliverPolicy: DeliverAll, + AckWait: time.Second * 30, + }, ActionUpdate) + require_Error(t, err) +} + +func TestJetStreamConsumerActionsViaAPI(t *testing.T) { + + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + _, err := acc.addStream(&StreamConfig{ + Name: "TEST", + Retention: LimitsPolicy, + Subjects: []string{"one"}, + MaxAge: time.Second * 90, + }) + require_NoError(t, err) + + // Update non-existing consumer, which should fail. + request, err := json.Marshal(&CreateConsumerRequest{ + Action: ActionUpdate, + Config: ConsumerConfig{ + Durable: "hello", + }, + Stream: "TEST", + }) + require_NoError(t, err) + + resp, err := nc.Request("$JS.API.CONSUMER.DURABLE.CREATE.TEST.hello", []byte(request), time.Second*6) + require_NoError(t, err) + var ccResp JSApiConsumerCreateResponse + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + require_Error(t, ccResp.Error) + + // create non existing consumer - which should be fine. + ccResp.Error = nil + request, err = json.Marshal(&CreateConsumerRequest{ + Action: ActionCreate, + Config: ConsumerConfig{ + Durable: "hello", + }, + Stream: "TEST", + }) + require_NoError(t, err) + + resp, err = nc.Request("$JS.API.CONSUMER.DURABLE.CREATE.TEST.hello", []byte(request), time.Second*6) + require_NoError(t, err) + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error != nil { + t.Fatalf("expected nil, got %v", ccResp.Error) + } + + // re-create existing consumer - which should be an error. + ccResp.Error = nil + request, err = json.Marshal(&CreateConsumerRequest{ + Action: ActionCreate, + Config: ConsumerConfig{ + Durable: "hello", + FilterSubject: "one", + }, + Stream: "TEST", + }) + require_NoError(t, err) + resp, err = nc.Request("$JS.API.CONSUMER.DURABLE.CREATE.TEST.hello", []byte(request), time.Second*6) + require_NoError(t, err) + err = json.Unmarshal(resp.Data, &ccResp) + require_NoError(t, err) + if ccResp.Error == nil { + t.Fatalf("expected err, got nil") + } + +} + +func TestJetStreamConsumerActionsUnmarshal(t *testing.T) { + tests := []struct { + name string + given []byte + expected ConsumerAction + expectErr bool + }{ + {name: "action create", given: []byte(`{"action": "create"}`), expected: ActionCreate}, + {name: "action update", given: []byte(`{"action": "update"}`), expected: ActionUpdate}, + {name: "no action", given: []byte("{}"), expected: ActionCreateOrUpdate}, + {name: "unknown", given: []byte(`{"action": "unknown"}`), expected: ActionCreateOrUpdate, expectErr: true}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + var request CreateConsumerRequest + err := json.Unmarshal(test.given, &request) + fmt.Printf("given: %v, expecetd: %v\n", test.expectErr, err) + if !test.expectErr { + require_NoError(t, err) + } else { + require_Error(t, err) + } + require_True(t, test.expected == request.Action) + }) + } +} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index de7f2fbe..2ca8a6d3 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -44,6 +44,9 @@ const ( // JSClusterUnSupportFeatureErr not currently supported in clustered mode JSClusterUnSupportFeatureErr ErrorIdentifier = 10036 + // JSConsumerAlreadyExists action CREATE is used for a existing consumer with a different config (consumer already exists) + JSConsumerAlreadyExists ErrorIdentifier = 10148 + // JSConsumerBadDurableNameErr durable name can not contain '.', '*', '>' JSConsumerBadDurableNameErr ErrorIdentifier = 10103 @@ -74,6 +77,9 @@ const ( // JSConsumerDirectRequiresPushErr consumer direct requires a push based consumer JSConsumerDirectRequiresPushErr ErrorIdentifier = 10090 + // JSConsumerDoesNotExist action UPDATE is used for a nonexisting consumer (consumer does not exist) + JSConsumerDoesNotExist ErrorIdentifier = 10149 + // JSConsumerDuplicateFilterSubjects consumer cannot have both FilterSubject and FilterSubjects specified JSConsumerDuplicateFilterSubjects ErrorIdentifier = 10136 @@ -459,6 +465,7 @@ var ( JSClusterServerNotMemberErr: {Code: 400, ErrCode: 10044, Description: "server is not a member of the cluster"}, JSClusterTagsErr: {Code: 400, ErrCode: 10011, Description: "tags placement not supported for operation"}, JSClusterUnSupportFeatureErr: {Code: 503, ErrCode: 10036, Description: "not currently supported in clustered mode"}, + JSConsumerAlreadyExists: {Code: 400, ErrCode: 10148, Description: "consumer already exists"}, JSConsumerBadDurableNameErr: {Code: 400, ErrCode: 10103, Description: "durable name can not contain '.', '*', '>'"}, JSConsumerConfigRequiredErr: {Code: 400, ErrCode: 10078, Description: "consumer config required"}, JSConsumerCreateDurableAndNameMismatch: {Code: 400, ErrCode: 10132, Description: "Consumer Durable and Name have to be equal if both are provided"}, @@ -469,6 +476,7 @@ var ( JSConsumerDescriptionTooLongErrF: {Code: 400, ErrCode: 10107, Description: "consumer description is too long, maximum allowed is {max}"}, JSConsumerDirectRequiresEphemeralErr: {Code: 400, ErrCode: 10091, Description: "consumer direct requires an ephemeral consumer"}, JSConsumerDirectRequiresPushErr: {Code: 400, ErrCode: 10090, Description: "consumer direct requires a push based consumer"}, + JSConsumerDoesNotExist: {Code: 400, ErrCode: 10149, Description: "consumer does not exist"}, JSConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10136, Description: "consumer cannot have both FilterSubject and FilterSubjects specified"}, JSConsumerDurableNameNotInSubjectErr: {Code: 400, ErrCode: 10016, Description: "consumer expected to be durable but no durable name set in subject"}, JSConsumerDurableNameNotMatchSubjectErr: {Code: 400, ErrCode: 10017, Description: "consumer name in subject does not match durable name in request"}, @@ -753,6 +761,16 @@ func NewJSClusterUnSupportFeatureError(opts ...ErrorOption) *ApiError { return ApiErrors[JSClusterUnSupportFeatureErr] } +// NewJSConsumerAlreadyExistsError creates a new JSConsumerAlreadyExists error: "consumer already exists" +func NewJSConsumerAlreadyExistsError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerAlreadyExists] +} + // NewJSConsumerBadDurableNameError creates a new JSConsumerBadDurableNameErr error: "durable name can not contain '.', '*', '>'" func NewJSConsumerBadDurableNameError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) @@ -865,6 +883,16 @@ func NewJSConsumerDirectRequiresPushError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerDirectRequiresPushErr] } +// NewJSConsumerDoesNotExistError creates a new JSConsumerDoesNotExist error: "consumer does not exist" +func NewJSConsumerDoesNotExistError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerDoesNotExist] +} + // NewJSConsumerDuplicateFilterSubjectsError creates a new JSConsumerDuplicateFilterSubjects error: "consumer cannot have both FilterSubject and FilterSubjects specified" func NewJSConsumerDuplicateFilterSubjectsError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts)