From 748890adb178b59a56ee6a9ad01d4bac8cfc1348 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 3 Aug 2022 12:05:02 -0700 Subject: [PATCH] Auto-set and upgrade AllowDirect when MaxMsgsPerSubject is set. Also allow mirrors to inherit properly. Signed-off-by: Derek Collison --- server/jetstream.go | 4 +- server/jetstream_cluster_test.go | 77 -------------------------- server/jetstream_super_cluster_test.go | 34 ++++++++++++ server/jetstream_test.go | 58 +++++++++++++++++++ server/stream.go | 30 ++++++++-- 5 files changed, 119 insertions(+), 84 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index cd2420e6..b0b068dd 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -948,9 +948,9 @@ func (s *Server) JetStreamReservedResources() (int64, int64, error) { } func (s *Server) getJetStream() *jetStream { - s.mu.Lock() + s.mu.RLock() js := s.js - s.mu.Unlock() + s.mu.RUnlock() return js } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index bd99bf09..27e764f2 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4773,83 +4773,6 @@ func TestJetStreamClusterStreamDirectGetMsg(t *testing.T) { require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_) } -func TestJetStreamClusterStreamAndMirrorDirectGetMsgUpdate(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3F", 3) - defer c.shutdown() - - // Client based API - s := c.randomServer() - nc, _ := jsClientConnect(t, s) - defer nc.Close() - - cfg := &StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Storage: MemoryStorage, - Replicas: 3, - } - addStream(t, nc, cfg) - sendStreamMsg(t, nc, "foo", "bar") - - getSubj := fmt.Sprintf(JSDirectMsgGetT, "TEST") - req := []byte(`{"last_by_subj": "foo"}`) - - _, err := nc.Request(getSubj, req, 50*time.Millisecond) - require_Error(t, err, nats.ErrTimeout) - - cfg.AllowDirect = true - jreq, err := json.Marshal(cfg) - require_NoError(t, err) - - resp, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, "TEST"), jreq, time.Second) - require_NoError(t, err) - var scResp JSApiStreamCreateResponse - err = json.Unmarshal(resp.Data, &scResp) - require_NoError(t, err) - require_True(t, scResp.Error == nil) - - _, err = nc.Request(getSubj, req, 50*time.Millisecond) - require_NoError(t, err) - - // Make sure we can turn it off as well. - cfg.AllowDirect = false - jreq, err = json.Marshal(cfg) - require_NoError(t, err) - - resp, err = nc.Request(fmt.Sprintf(JSApiStreamUpdateT, "TEST"), jreq, time.Second) - require_NoError(t, err) - err = json.Unmarshal(resp.Data, &scResp) - require_NoError(t, err) - require_True(t, scResp.Error == nil) - - _, err = nc.Request(getSubj, req, 50*time.Millisecond) - require_Error(t, err, nats.ErrTimeout) - - // Now test mirrors as well. - cfg = &StreamConfig{ - Name: "M", - Mirror: &StreamSource{Name: "TEST"}, - Storage: MemoryStorage, - } - addStream(t, nc, cfg) - - _, err = nc.Request(getSubj, req, 50*time.Millisecond) - require_Error(t, err, nats.ErrTimeout) - - cfg.MirrorDirect = true - jreq, err = json.Marshal(cfg) - require_NoError(t, err) - - resp, err = nc.Request(fmt.Sprintf(JSApiStreamUpdateT, "M"), jreq, time.Second) - require_NoError(t, err) - err = json.Unmarshal(resp.Data, &scResp) - require_NoError(t, err) - require_True(t, scResp.Error == nil) - - _, err = nc.Request(getSubj, req, 50*time.Millisecond) - require_NoError(t, err) -} - func TestJetStreamClusterStreamPerf(t *testing.T) { // Comment out to run, holding place for now. skip(t) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 3d8086f6..6670c19a 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -3187,3 +3187,37 @@ func TestJetStreamSuperClusterPeerEvacuationAndStreamReassignment(t *testing.T) test(3, []string{"cluster:C2"}, "C2", false, true) }) } + +func TestJetStreamSuperClusterMirrorInheritsAllowDirect(t *testing.T) { + sc := createJetStreamTaggedSuperCluster(t) + defer sc.shutdown() + + nc, js := jsClientConnect(t, sc.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "KV", + Subjects: []string{"key.*"}, + Placement: &nats.Placement{Tags: []string{"cloud:aws", "country:us"}}, + MaxMsgsPerSubject: 1, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + Mirror: &nats.StreamSource{Name: "KV"}, + Placement: &nats.Placement{Tags: []string{"cloud:gcp", "country:uk"}}, + }) + require_NoError(t, err) + + // Do direct grab for now. + resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "M"), nil, time.Second) + require_NoError(t, err) + var si StreamInfo + err = json.Unmarshal(resp.Data, &si) + require_NoError(t, err) + + if !si.Config.MirrorDirect { + t.Fatalf("Expected MirrorDirect to be inherited as true") + } +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 6e6022e1..373e81f6 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -19070,3 +19070,61 @@ func TestJetStreamDirectGetBySubject(t *testing.T) { t.Fatalf("Expected to see the mirror respond at least once") } } + +// v2.9 will move to direct gets, this tests that we autoset them and auto-promote older streams. +// This is keyed off a config setting for MaxMsgsPerSubject. +func TestJetStreamDirectGetAutoSet(t *testing.T) { + s := RunBasicJetStreamServer() + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "KV", + Subjects: []string{"key.*"}, + MaxMsgsPerSubject: 1, + }) + require_NoError(t, err) + + _, err = js.Publish("key.22", []byte("22")) + require_NoError(t, err) + + // Make sure direct get was auto turned on. + subj := fmt.Sprintf(JSDirectMsgGetT, "KV") + req := []byte(`{"last_by_subj": "key.22"}`) + m, err := nc.Request(subj, req, time.Second) + require_NoError(t, err) + require_True(t, string(m.Data) == "22") + + // New direct with key in subject. + subj = fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", "key.22") + m, err = nc.Request(subj, nil, time.Second) + require_NoError(t, err) + require_True(t, string(m.Data) == "22") + + // Make sure mirrors inherit. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "M", + Mirror: &nats.StreamSource{Name: "KV"}, + }) + require_NoError(t, err) + + // Now make sure mirrors are doing right thing with new way as well. + var sawMirror bool + subj = fmt.Sprintf(JSDirectGetLastBySubjectT, "KV", "key.22") + for i := 0; i < 100; i++ { + m, err := nc.Request(subj, nil, time.Second) + require_NoError(t, err) + if shdr := m.Header.Get(JSStream); shdr == "M" { + sawMirror = true + break + } + } + if !sawMirror { + t.Fatalf("Expected to see the mirror respond at least once") + } +} diff --git a/server/stream.go b/server/stream.go index 62f6cdb9..88c1aad6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -61,6 +61,11 @@ type StreamConfig struct { // Allow republish of the message after being sequenced and stored. RePublish *RePublish `json:"republish,omitempty"` + // Allow higher performance, direct access to get individual messages. E.g. KeyValue + AllowDirect bool `json:"allow_direct,omitempty"` + // Allow higher performance and unified direct access for mirrors as well. + MirrorDirect bool `json:"mirror_direct,omitempty"` + // Optional qualifiers. These can not be modified after set to true. // Sealed will seal a stream so no messages can get out or in. @@ -72,11 +77,6 @@ type StreamConfig struct { // AllowRollup allows messages to be placed into the system and purge // all older messages using a special msg header. AllowRollup bool `json:"allow_rollup_hdrs"` - - // Allow higher performance, direct access to get individual messages. E.g. KeyValue - AllowDirect bool `json:"allow_direct,omitempty"` - // Allow higher performance and unified direct access for mirrors as well. - MirrorDirect bool `json:"mirror_direct,omitempty"` } // RePublish is for republishing messages once committed to a stream. @@ -1037,6 +1037,21 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi apiPrefixes = append(apiPrefixes, cfg.Mirror.External.ApiPrefix) } } + // Determine if we are inheriting direct gets. + if exists, ocfg := getStream(cfg.Mirror.Name); exists { + cfg.MirrorDirect = ocfg.AllowDirect + } else if js := s.getJetStream(); js != nil && js.isClustered() { + // Could not find it here. If we are clustered we can look it up. + js.mu.RLock() + if cc := js.cluster; cc != nil { + if as := cc.streams[acc.Name]; as != nil { + if sa := as[cfg.Mirror.Name]; sa != nil { + cfg.MirrorDirect = sa.Config.AllowDirect + } + } + } + js.mu.RUnlock() + } } if len(cfg.Sources) > 0 { for _, src := range cfg.Sources { @@ -1186,6 +1201,11 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } } + // Here we will auto set direct gets if MaxMsgsPerSubject is set. + if cfg.MaxMsgsPer != 0 { + cfg.AllowDirect = true + } + return cfg, nil }