Merge pull request #1711 from nats-io/stream_lookup

Allow stream lookup by subject.
This commit is contained in:
Derek Collison
2020-11-16 05:59:13 -08:00
committed by GitHub
5 changed files with 190 additions and 13 deletions

View File

@@ -169,6 +169,15 @@ var (
// ErrNoTransforms signals no subject transforms are available to map this subject.
ErrNoTransforms = errors.New("no matching transforms available")
// ErrJetStreamNotEnabled is returned when JetStream is not enabled.
ErrJetStreamNotEnabled = errors.New("jetstream not enabled")
// ErrJetStreamStreamNotFound is returned when a stream can not be found.
ErrJetStreamStreamNotFound = errors.New("stream not found")
// ErrJetStreamNotEnabledForAccount is returned JetStream is not enabled for this account.
ErrJetStreamNotEnabledForAccount = errors.New("jetstream not enabled for account")
)
// configErr is a configuration error.

View File

@@ -343,7 +343,7 @@ func (s *Server) JetStreamNumAccounts() int {
func (s *Server) JetStreamReservedResources() (int64, int64, error) {
js := s.getJetStream()
if js == nil {
return -1, -1, fmt.Errorf("jetstream not enabled")
return -1, -1, ErrJetStreamNotEnabled
}
js.mu.RLock()
defer js.mu.RUnlock()
@@ -369,7 +369,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
// FIXME(dlc) - cluster mode
js := s.getJetStream()
if js == nil {
return fmt.Errorf("jetstream not enabled")
return ErrJetStreamNotEnabled
}
if s.SystemAccount() == a {
return fmt.Errorf("jetstream can not be enabled on the system account")
@@ -622,23 +622,51 @@ func (a *Account) Streams() []*Stream {
return msets
}
// LookupStream will lookup a stream by name.
func (a *Account) LookupStream(name string) (*Stream, error) {
a.mu.RLock()
jsa := a.js
a.mu.RUnlock()
if jsa == nil {
return nil, fmt.Errorf("jetstream not enabled")
return nil, ErrJetStreamNotEnabled
}
jsa.mu.Lock()
mset, ok := jsa.streams[name]
jsa.mu.Unlock()
if !ok {
return nil, fmt.Errorf("stream not found")
return nil, ErrJetStreamStreamNotFound
}
return mset, nil
}
// LookupStreamBySubject will lookup a stream by a subject.
// The subject needs to be an exact match or a subset. Supersets will not match.
func (a *Account) LookupStreamBySubject(subj string) (*Stream, bool, error) {
a.mu.RLock()
jsa := a.js
a.mu.RUnlock()
if jsa == nil {
return nil, false, ErrJetStreamNotEnabled
}
jsa.mu.Lock()
defer jsa.mu.Unlock()
for _, mset := range jsa.streams {
for _, tsubj := range mset.config.Subjects {
if subj == tsubj {
return mset, len(mset.config.Subjects) > 1, nil
}
if subjectIsSubsetMatch(subj, tsubj) {
return mset, true, nil
}
}
}
return nil, false, ErrJetStreamStreamNotFound
}
// UpdateJetStreamLimits will update the account limits for a JetStream enabled account.
func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error {
a.mu.RLock()
@@ -651,10 +679,10 @@ func (a *Account) UpdateJetStreamLimits(limits *JetStreamAccountLimits) error {
}
js := s.getJetStream()
if js == nil {
return fmt.Errorf("jetstream not enabled")
return ErrJetStreamNotEnabled
}
if jsa == nil {
return fmt.Errorf("jetstream not enabled for account")
return ErrJetStreamNotEnabledForAccount
}
if limits == nil {
@@ -724,7 +752,7 @@ func (a *Account) DisableJetStream() error {
js := s.getJetStream()
if js == nil {
return fmt.Errorf("jetstream not enabled")
return ErrJetStreamNotEnabled
}
// Remove service imports.
@@ -738,7 +766,7 @@ func (a *Account) DisableJetStream() error {
// Disable JetStream for the account.
func (js *jetStream) disableJetStream(jsa *jsAccount) error {
if jsa == nil {
return fmt.Errorf("jetstream not enabled for account")
return ErrJetStreamNotEnabledForAccount
}
js.mu.Lock()
@@ -966,7 +994,7 @@ func (a *Account) checkForJetStream() (*Server, *jsAccount, error) {
}
if jsa == nil {
return nil, nil, fmt.Errorf("jetstream not enabled for account")
return nil, nil, ErrJetStreamNotEnabledForAccount
}
return s, jsa, nil
@@ -1177,7 +1205,7 @@ func (t *StreamTemplate) Delete() error {
t.mu.Unlock()
if jsa == nil {
return fmt.Errorf("jetstream not enabled")
return ErrJetStreamNotEnabled
}
jsa.mu.Lock()

View File

@@ -75,6 +75,10 @@ const (
JSApiStreamInfo = "$JS.API.STREAM.INFO.*"
JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
// JSApiStreamLookup is for obtaining a stream by a target subject.
// Will return JSON response.
JSApiStreamLookup = "$JS.API.STREAM.LOOKUP"
// JSApiStreamDelete is the endpoint to delete streams.
// Will return JSON response.
JSApiStreamDelete = "$JS.API.STREAM.DELETE.*"
@@ -112,7 +116,7 @@ const (
JSApiConsumerCreate = "$JS.API.CONSUMER.CREATE.*"
JSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s"
// JSApiDurableCreate is the endpoint to create ephemeral consumers for streams.
// JSApiDurableCreate is the endpoint to create durable consumers for streams.
// You need to include the stream and consumer name in the subject.
JSApiDurableCreate = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
@@ -262,6 +266,15 @@ type JSApiStreamInfoResponse struct {
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
// JSApiStreamLookupResponse.
type JSApiStreamLookupResponse struct {
ApiResponse
Stream string `json:"stream"`
Filtered bool `json:"is_filtered"`
}
const JSApiStreamLookupResponseType = "io.nats.jetstream.api.v1.stream_lookup_response"
// Maximum entries we will return for streams or consumers lists.
// TODO(dlc) - with header or request support could request chunked response.
const JSApiNamesLimit = 1024
@@ -480,6 +493,7 @@ var allJsExports = []string{
JSApiStreams,
JSApiStreamList,
JSApiStreamInfo,
JSApiStreamLookup,
JSApiStreamDelete,
JSApiStreamPurge,
JSApiStreamSnapshot,
@@ -509,6 +523,7 @@ func (s *Server) setJetStreamExportSubs() error {
{JSApiStreams, s.jsStreamNamesRequest},
{JSApiStreamList, s.jsStreamListRequest},
{JSApiStreamInfo, s.jsStreamInfoRequest},
{JSApiStreamLookup, s.jsStreamLookupRequest},
{JSApiStreamDelete, s.jsStreamDeleteRequest},
{JSApiStreamPurge, s.jsStreamPurgeRequest},
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
@@ -953,6 +968,44 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp))
}
// Request to lookup a stream by target subject.
func (s *Server) jsStreamLookupRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c == nil || c.acc == nil {
return
}
var resp = JSApiStreamLookupResponse{ApiResponse: ApiResponse{Type: JSApiStreamLookupResponseType}}
if !c.acc.JetStreamEnabled() {
resp.Error = jsNotEnabledErr
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if isEmptyRequest(msg) {
resp.Error = &ApiError{Code: 400, Description: "subject required"}
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
subj := string(msg)
if !IsValidSubject(subj) {
resp.Error = &ApiError{Code: 400, Description: "subject argument is not a valid subject"}
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Lookup our stream.
mset, filtered, err := c.acc.LookupStreamBySubject(subj)
if err != nil {
resp.Error = jsNotFoundError(err)
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.Stream = mset.Name()
resp.Filtered = filtered
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(resp))
}
func isEmptyRequest(req []byte) bool {
if len(req) == 0 {
return true

View File

@@ -491,7 +491,7 @@ func (mset *Stream) Delete() error {
jsa := mset.jsa
mset.mu.Unlock()
if jsa == nil {
return fmt.Errorf("jetstream not enabled for account")
return ErrJetStreamNotEnabledForAccount
}
jsa.mu.Lock()
delete(jsa.streams, mset.config.Name)

View File

@@ -404,6 +404,93 @@ func TestJetStreamPubAck(t *testing.T) {
}
}
func TestJetStreamLookupStreamBySubject(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
acc := s.GlobalAccount()
if _, err := acc.AddStream(&server.StreamConfig{Name: "1", Subjects: []string{"foo"}}); err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
if _, err := acc.AddStream(&server.StreamConfig{Name: "2", Subjects: []string{"bar", "baz", "boo"}}); err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
if _, err := acc.AddStream(&server.StreamConfig{Name: "3", Subjects: []string{"foo.*", "bar.*"}}); err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
if _, err := acc.AddStream(&server.StreamConfig{Name: "4", Subjects: []string{"baz.*.*.>"}}); err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
// Check some errors first.
checkError := func(subj string) {
t.Helper()
if _, _, err := acc.LookupStreamBySubject(subj); err != server.ErrJetStreamStreamNotFound {
t.Fatalf("Expected to get a stream not found error, got %v", err)
}
}
checkError("zzz")
checkError("*")
checkError("baz.>")
checkLookup := func(subj, stream string, filtered bool) {
t.Helper()
s, f, err := acc.LookupStreamBySubject(subj)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if s.Name() != stream {
t.Fatalf("Expected stream name of %q, got %q", stream, s.Name())
}
if f != filtered {
t.Fatalf("Expected filtered to be %v, got %v", filtered, f)
}
}
checkLookup("foo", "1", false)
checkLookup("boo", "2", true)
checkLookup("foo.*", "3", true)
checkLookup("foo.1", "3", true)
checkLookup("baz.*.*.>", "4", false)
checkLookup("baz.2.*.>", "4", true)
// Now test API
nc := clientConnectToServer(t, s)
defer nc.Close()
checkAPILookup := func(subj, stream string, filtered bool) {
resp, err := nc.Request(server.JSApiStreamLookup, []byte(subj), time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var lresp server.JSApiStreamLookupResponse
if err := json.Unmarshal(resp.Data, &lresp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if lresp.Error != nil {
t.Fatalf("Got an API error: %+v", lresp.Error)
}
if lresp.Stream != stream {
t.Fatalf("Expected stream name of %q, got %q", stream, lresp.Stream)
}
if lresp.Filtered != filtered {
t.Fatalf("Expected filtered to be %v, got %v", filtered, lresp.Filtered)
}
}
checkAPILookup("foo", "1", false)
checkAPILookup("boo", "2", true)
checkAPILookup("foo.*", "3", true)
checkAPILookup("foo.1", "3", true)
checkAPILookup("baz.*.*.>", "4", false)
checkAPILookup("baz.2.*.>", "4", true)
}
func TestJetStreamConsumerWithStartTime(t *testing.T) {
subj := "my_stream"
cases := []struct {
@@ -7039,7 +7126,7 @@ func TestJetStreamRequestAPI(t *testing.T) {
}
checkNotFound(bResp.Error, "stream not found")
// Now create an consumer.
// Now create a consumer.
delivery := nats.NewInbox()
obsReq := server.CreateConsumerRequest{
Stream: msetCfg.Name,