From 5c99355fa552bbe7593fd9de4e6f47a3dde6651b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 20 Jan 2020 10:34:57 -0800 Subject: [PATCH] Separate out consumer API for Durable vs Ephemeral Signed-off-by: Derek Collison --- server/jetstream.go | 67 +++++++++++++++++++++++++++++++++++++++--- test/jetstream_test.go | 53 +++++++++++++++++++++++++++++++-- 2 files changed, 113 insertions(+), 7 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 4c567eb9..083240ba 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -126,10 +126,16 @@ const ( JetStreamDeleteMsg = "$JS.STREAM.*.MSG.DELETE" JetStreamDeleteMsgT = "$JS.STREAM.%s.MSG.DELETE" - // JetStreamCreateConsumer is the endpoint to create consumers for streams. + // JetStreamCreateConsumer is the endpoint to create durable consumers for streams. + // You need to include the stream and consumer name in the subject. // Will return +OK on success and -ERR on failure. - JetStreamCreateConsumer = "$JS.STREAM.*.CONSUMER.CREATE" - JetStreamCreateConsumerT = "$JS.STREAM.%s.CONSUMER.CREATE" + JetStreamCreateConsumer = "$JS.STREAM.*.CONSUMER.*.CREATE" + JetStreamCreateConsumerT = "$JS.STREAM.%s.CONSUMER.%s.CREATE" + + // JetStreamCreateEphemeralConsumer is the endpoint to create ephemeral consumers for streams. + // Will return +OK on success and -ERR on failure. + JetStreamCreateEphemeralConsumer = "$JS.STREAM.*.EPHEMERAL.CONSUMER.CREATE" + JetStreamCreateEphemeralConsumerT = "$JS.STREAM.%s.EPHEMERAL.CONSUMER.CREATE" // JetStreamConsumers is the endpoint to list all consumers for the stream. // Will return json list of string on success and -ERR on failure. @@ -214,6 +220,7 @@ var allJsExports = []string{ JetStreamPurgeStream, JetStreamDeleteMsg, JetStreamCreateConsumer, + JetStreamCreateEphemeralConsumer, JetStreamConsumers, JetStreamConsumerInfo, JetStreamDeleteConsumer, @@ -313,6 +320,9 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { if _, err := s.sysSubscribe(JetStreamCreateConsumer, s.jsCreateConsumerRequest); err != nil { return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err) } + if _, err := s.sysSubscribe(JetStreamCreateEphemeralConsumer, s.jsCreateEphemeralConsumerRequest); err != nil { + return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err) + } if _, err := s.sysSubscribe(JetStreamConsumers, s.jsConsumersRequest); err != nil { return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err) } @@ -1270,7 +1280,7 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep s.sendInternalAccountMsg(c.acc, reply, OK) } -// Request to create an consumer. +// Request to create a durable consumer. func (s *Server) jsCreateConsumerRequest(sub *subscription, c *client, subject, reply string, msg []byte) { if c == nil || c.acc == nil { return @@ -1294,6 +1304,55 @@ func (s *Server) jsCreateConsumerRequest(sub *subscription, c *client, subject, s.sendInternalAccountMsg(c.acc, reply, protoErr(err)) return } + // Now check we do not have a durable. + if req.Config.Durable == _EMPTY_ { + s.sendInternalAccountMsg(c.acc, reply, protoErr("consumer expected to be durable but a durable name was not set")) + return + } + consumerName := subjectToken(subject, 4) + if consumerName != req.Config.Durable { + s.sendInternalAccountMsg(c.acc, reply, protoErr("consumer name in subject does not match durable name in request")) + return + } + var response = OK + if o, err := stream.AddConsumer(&req.Config); err != nil { + response = protoErr(err) + } else if !o.isDurable() { + // If the consumer is ephemeral add in the name + response = OK + " " + o.Name() + } + s.sendInternalAccountMsg(c.acc, reply, response) +} + +// Request to create an ephemeral consumer. +func (s *Server) jsCreateEphemeralConsumerRequest(sub *subscription, c *client, subject, reply string, msg []byte) { + if c == nil || c.acc == nil { + return + } + if !c.acc.JetStreamEnabled() { + s.sendInternalAccountMsg(c.acc, reply, JetStreamNotEnabled) + return + } + var req CreateConsumerRequest + if err := json.Unmarshal(msg, &req); err != nil { + s.sendInternalAccountMsg(c.acc, reply, JetStreamBadRequest) + return + } + streamName := subjectToken(subject, 2) + if streamName != req.Stream { + s.sendInternalAccountMsg(c.acc, reply, protoErr("stream name in subject does not match request")) + return + } + stream, err := c.acc.LookupStream(req.Stream) + if err != nil { + s.sendInternalAccountMsg(c.acc, reply, protoErr(err)) + return + } + // Now check we do not have a durable. + if req.Config.Durable != _EMPTY_ { + s.sendInternalAccountMsg(c.acc, reply, protoErr("consumer expected to be ephemeral but a durable name was set")) + return + } var response = OK if o, err := stream.AddConsumer(&req.Config); err != nil { response = protoErr(err) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 28dc5119..16d4e82b 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -3509,7 +3509,7 @@ func TestJetStreamRequestAPI(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name), req, time.Second) + resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateEphemeralConsumerT, msetCfg.Name), req, time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3521,7 +3521,7 @@ func TestJetStreamRequestAPI(t *testing.T) { sub, _ := nc.SubscribeSync(delivery) nc.Flush() - resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name), req, time.Second) + resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateEphemeralConsumerT, msetCfg.Name), req, time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3537,7 +3537,7 @@ func TestJetStreamRequestAPI(t *testing.T) { }) // Check that we get an error if the stream name in the subject does not match the config. - resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, "BOB"), req, time.Second) + resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateEphemeralConsumerT, "BOB"), req, time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3591,6 +3591,53 @@ func TestJetStreamRequestAPI(t *testing.T) { resp, _ = nc.Request(fmt.Sprintf(server.JetStreamDeleteConsumerT, msetCfg.Name, onames[0]), nil, time.Second) expectOKResponse(t, resp) + // Make sure we can't create a durable using the ephemeral API endpoint. + obsReq = server.CreateConsumerRequest{ + Stream: msetCfg.Name, + Config: server.ConsumerConfig{Durable: "myd", Delivery: delivery, DeliverAll: true}, + } + req, err = json.Marshal(obsReq) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateEphemeralConsumerT, msetCfg.Name), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Since we are using the ephemeral endpoint we can not configure a durable name. + if !strings.HasPrefix(string(resp.Data), "-ERR 'consumer expected to be ephemeral but a durable name was set'") { + t.Fatalf("Got wrong error response: %q", resp.Data) + } + + // Now make sure we can create a durable on the subject with the proper name. + resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name, obsReq.Config.Durable), req, time.Second) + expectOKResponse(t, resp) + + // Make sure empty in cfg does not work + obsReq2 := server.CreateConsumerRequest{ + Stream: msetCfg.Name, + Config: server.ConsumerConfig{Delivery: delivery, DeliverAll: true}, + } + req2, err := json.Marshal(obsReq2) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name, obsReq.Config.Durable), req2, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.HasPrefix(string(resp.Data), "-ERR 'consumer expected to be durable but a durable name was not set'") { + t.Fatalf("Got wrong error response: %q", resp.Data) + } + // Now make sure we can't fake the consumer name. + resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name, "WRONG_CONSUMER_NAME"), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.HasPrefix(string(resp.Data), "-ERR 'consumer name in subject does not match durable name in request'") { + t.Fatalf("Got wrong error response: %q", resp.Data) + } + // Now delete a msg. resp, _ = nc.Request(fmt.Sprintf(server.JetStreamDeleteMsgT, msetCfg.Name), []byte("2"), time.Second) expectOKResponse(t, resp)