Separate out consumer API for Durable vs Ephemeral

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-01-20 10:34:57 -08:00
parent cd052a42d6
commit 5c99355fa5
2 changed files with 113 additions and 7 deletions

View File

@@ -126,10 +126,16 @@ const (
JetStreamDeleteMsg = "$JS.STREAM.*.MSG.DELETE"
JetStreamDeleteMsgT = "$JS.STREAM.%s.MSG.DELETE"
// JetStreamCreateConsumer is the endpoint to create consumers for streams.
// JetStreamCreateConsumer is the endpoint to create durable consumers for streams.
// You need to include the stream and consumer name in the subject.
// Will return +OK on success and -ERR on failure.
JetStreamCreateConsumer = "$JS.STREAM.*.CONSUMER.CREATE"
JetStreamCreateConsumerT = "$JS.STREAM.%s.CONSUMER.CREATE"
JetStreamCreateConsumer = "$JS.STREAM.*.CONSUMER.*.CREATE"
JetStreamCreateConsumerT = "$JS.STREAM.%s.CONSUMER.%s.CREATE"
// JetStreamCreateEphemeralConsumer is the endpoint to create ephemeral consumers for streams.
// Will return +OK <consumer name> on success and -ERR on failure.
JetStreamCreateEphemeralConsumer = "$JS.STREAM.*.EPHEMERAL.CONSUMER.CREATE"
JetStreamCreateEphemeralConsumerT = "$JS.STREAM.%s.EPHEMERAL.CONSUMER.CREATE"
// JetStreamConsumers is the endpoint to list all consumers for the stream.
// Will return json list of string on success and -ERR on failure.
@@ -214,6 +220,7 @@ var allJsExports = []string{
JetStreamPurgeStream,
JetStreamDeleteMsg,
JetStreamCreateConsumer,
JetStreamCreateEphemeralConsumer,
JetStreamConsumers,
JetStreamConsumerInfo,
JetStreamDeleteConsumer,
@@ -313,6 +320,9 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error {
if _, err := s.sysSubscribe(JetStreamCreateConsumer, s.jsCreateConsumerRequest); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
if _, err := s.sysSubscribe(JetStreamCreateEphemeralConsumer, s.jsCreateEphemeralConsumerRequest); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
if _, err := s.sysSubscribe(JetStreamConsumers, s.jsConsumersRequest); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
@@ -1270,7 +1280,7 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, subject, rep
s.sendInternalAccountMsg(c.acc, reply, OK)
}
// Request to create an consumer.
// Request to create a durable consumer.
func (s *Server) jsCreateConsumerRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c == nil || c.acc == nil {
return
@@ -1294,6 +1304,55 @@ func (s *Server) jsCreateConsumerRequest(sub *subscription, c *client, subject,
s.sendInternalAccountMsg(c.acc, reply, protoErr(err))
return
}
// Now check we do not have a durable.
if req.Config.Durable == _EMPTY_ {
s.sendInternalAccountMsg(c.acc, reply, protoErr("consumer expected to be durable but a durable name was not set"))
return
}
consumerName := subjectToken(subject, 4)
if consumerName != req.Config.Durable {
s.sendInternalAccountMsg(c.acc, reply, protoErr("consumer name in subject does not match durable name in request"))
return
}
var response = OK
if o, err := stream.AddConsumer(&req.Config); err != nil {
response = protoErr(err)
} else if !o.isDurable() {
// If the consumer is ephemeral add in the name
response = OK + " " + o.Name()
}
s.sendInternalAccountMsg(c.acc, reply, response)
}
// Request to create an ephemeral consumer.
func (s *Server) jsCreateEphemeralConsumerRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c == nil || c.acc == nil {
return
}
if !c.acc.JetStreamEnabled() {
s.sendInternalAccountMsg(c.acc, reply, JetStreamNotEnabled)
return
}
var req CreateConsumerRequest
if err := json.Unmarshal(msg, &req); err != nil {
s.sendInternalAccountMsg(c.acc, reply, JetStreamBadRequest)
return
}
streamName := subjectToken(subject, 2)
if streamName != req.Stream {
s.sendInternalAccountMsg(c.acc, reply, protoErr("stream name in subject does not match request"))
return
}
stream, err := c.acc.LookupStream(req.Stream)
if err != nil {
s.sendInternalAccountMsg(c.acc, reply, protoErr(err))
return
}
// Now check we do not have a durable.
if req.Config.Durable != _EMPTY_ {
s.sendInternalAccountMsg(c.acc, reply, protoErr("consumer expected to be ephemeral but a durable name was set"))
return
}
var response = OK
if o, err := stream.AddConsumer(&req.Config); err != nil {
response = protoErr(err)

View File

@@ -3509,7 +3509,7 @@ func TestJetStreamRequestAPI(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name), req, time.Second)
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateEphemeralConsumerT, msetCfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -3521,7 +3521,7 @@ func TestJetStreamRequestAPI(t *testing.T) {
sub, _ := nc.SubscribeSync(delivery)
nc.Flush()
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name), req, time.Second)
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateEphemeralConsumerT, msetCfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -3537,7 +3537,7 @@ func TestJetStreamRequestAPI(t *testing.T) {
})
// Check that we get an error if the stream name in the subject does not match the config.
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, "BOB"), req, time.Second)
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateEphemeralConsumerT, "BOB"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -3591,6 +3591,53 @@ func TestJetStreamRequestAPI(t *testing.T) {
resp, _ = nc.Request(fmt.Sprintf(server.JetStreamDeleteConsumerT, msetCfg.Name, onames[0]), nil, time.Second)
expectOKResponse(t, resp)
// Make sure we can't create a durable using the ephemeral API endpoint.
obsReq = server.CreateConsumerRequest{
Stream: msetCfg.Name,
Config: server.ConsumerConfig{Durable: "myd", Delivery: delivery, DeliverAll: true},
}
req, err = json.Marshal(obsReq)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateEphemeralConsumerT, msetCfg.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Since we are using the ephemeral endpoint we can not configure a durable name.
if !strings.HasPrefix(string(resp.Data), "-ERR 'consumer expected to be ephemeral but a durable name was set'") {
t.Fatalf("Got wrong error response: %q", resp.Data)
}
// Now make sure we can create a durable on the subject with the proper name.
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name, obsReq.Config.Durable), req, time.Second)
expectOKResponse(t, resp)
// Make sure empty in cfg does not work
obsReq2 := server.CreateConsumerRequest{
Stream: msetCfg.Name,
Config: server.ConsumerConfig{Delivery: delivery, DeliverAll: true},
}
req2, err := json.Marshal(obsReq2)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name, obsReq.Config.Durable), req2, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.HasPrefix(string(resp.Data), "-ERR 'consumer expected to be durable but a durable name was not set'") {
t.Fatalf("Got wrong error response: %q", resp.Data)
}
// Now make sure we can't fake the consumer name.
resp, err = nc.Request(fmt.Sprintf(server.JetStreamCreateConsumerT, msetCfg.Name, "WRONG_CONSUMER_NAME"), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.HasPrefix(string(resp.Data), "-ERR 'consumer name in subject does not match durable name in request'") {
t.Fatalf("Got wrong error response: %q", resp.Data)
}
// Now delete a msg.
resp, _ = nc.Request(fmt.Sprintf(server.JetStreamDeleteMsgT, msetCfg.Name), []byte("2"), time.Second)
expectOKResponse(t, resp)