From 407572165196bb22723fb68be7b3669159be89d6 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 2 Jul 2022 13:57:14 -0700 Subject: [PATCH] Allow direct msg get for stream to operate in queue group and allows mirrors to opt-in to the same group. Signed-off-by: Derek Collison --- server/jetstream_api.go | 12 +-- server/jetstream_super_cluster_test.go | 100 +++++++++++++++++++++++++ server/stream.go | 92 ++++++++++++++++++++--- 3 files changed, 188 insertions(+), 16 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 41ff9efe..4ba29dda 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1303,6 +1303,13 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } + // If we are told to do mirror direct but are not mirroring, error. + if cfg.MirrorDirect && cfg.Mirror == nil { + resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct")) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Hand off to cluster for processing. if s.JetStreamIsClustered() { s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg) @@ -1798,11 +1805,6 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s if clusterWideConsCount > 0 { resp.StreamInfo.State.Consumers = clusterWideConsCount } - if mset.isMirror() { - resp.StreamInfo.Mirror = mset.mirrorInfo() - } else if mset.hasSources() { - resp.StreamInfo.Sources = mset.sourcesInfo() - } // Check if they have asked for subject details. if subjects != _EMPTY_ { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 036fece7..4cc17c69 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2483,3 +2483,103 @@ func TestJetStreamSuperClusterStateOnRestartPreventsConsumerRecovery(t *testing. t.Fatalf("Consumer was not properly restarted") } } + +// We allow mirrors to opt-in to direct get in a distributed queue group. +func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) { + sc := createJetStreamTaggedSuperCluster(t) + defer sc.shutdown() + + nc, js := jsClientConnect(t, sc.randomServer()) + defer nc.Close() + + // C1 + // Do by hand for now. + cfg := &StreamConfig{ + Name: "SOURCE", + Subjects: []string{"kv.>"}, + MaxMsgsPer: 1, + Placement: &Placement{Tags: []string{"cloud:aws", "country:us"}}, + AllowDirect: true, + Replicas: 3, + Storage: MemoryStorage, + } + addStream(t, nc, cfg) + + num := 100 + for i := 0; i < num; i++ { + js.PublishAsync(fmt.Sprintf("kv.%d", i), []byte("VAL")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // C2 + cfg = &StreamConfig{ + Name: "M1", + Mirror: &StreamSource{Name: "SOURCE"}, + Placement: &Placement{Tags: []string{"cloud:gcp", "country:uk"}}, + MirrorDirect: true, + Storage: MemoryStorage, + } + addStream(t, nc, cfg) + + // C3 (clustered) + cfg = &StreamConfig{ + Name: "M2", + Mirror: &StreamSource{Name: "SOURCE"}, + Replicas: 3, + Placement: &Placement{Tags: []string{"country:jp"}}, + MirrorDirect: true, + Storage: MemoryStorage, + } + addStream(t, nc, cfg) + + // Since last one was an R3, check and wait for subscription. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + sl := sc.clusterForName("C3").streamLeader("$G", "M2") + if mset, err := sl.GlobalAccount().lookupStream("M2"); err == nil { + mset.mu.RLock() + ok := mset.mirror.dsub != nil + mset.mu.RUnlock() + if ok { + return nil + } + } + return fmt.Errorf("No dsub yet") + }) + + // Always do a direct get to the source, but check that we are getting answers from the mirrors when connected to their cluster. + getSubj := fmt.Sprintf(JSDirectMsgGetT, "SOURCE") + req := []byte(`{"last_by_subj":"kv.22"}`) + getMsg := func(c *nats.Conn) *nats.Msg { + m, err := c.Request(getSubj, req, time.Second) + require_NoError(t, err) + require_True(t, string(m.Data) == "VAL") + require_True(t, m.Header.Get(JSSequence) == "23") + require_True(t, m.Header.Get(JSSubject) == "kv.22") + return m + } + + // C1 -> SOURCE + nc, _ = jsClientConnect(t, sc.clusterForName("C1").randomServer()) + defer nc.Close() + + m := getMsg(nc) + require_True(t, m.Header.Get(JSStream) == "SOURCE") + + // C2 -> M1 + nc, _ = jsClientConnect(t, sc.clusterForName("C2").randomServer()) + defer nc.Close() + + m = getMsg(nc) + require_True(t, m.Header.Get(JSStream) == "M1") + + // C3 -> M2 + nc, _ = jsClientConnect(t, sc.clusterForName("C3").randomServer()) + defer nc.Close() + + m = getMsg(nc) + require_True(t, m.Header.Get(JSStream) == "M2") +} diff --git a/server/stream.go b/server/stream.go index bde3566a..94110685 100644 --- a/server/stream.go +++ b/server/stream.go @@ -72,8 +72,11 @@ type StreamConfig struct { // AllowRollup allows messages to be placed into the system and purge // all older messages using a special msg header. AllowRollup bool `json:"allow_rollup_hdrs"` - // Allow higher peformance, direct access to get individual messages. + + // Allow higher performance, direct access to get individual messages. E.g. KeyValue AllowDirect bool `json:"allow_direct,omitempty"` + // Allow higher performance and unified direct access for mirrors as well. + MirrorDirect bool `json:"mirror_direct,omitempty"` } // RePublish is for republishing messages once committed to a stream. @@ -231,6 +234,9 @@ type stream struct { lqsent time.Time catchups map[string]uint64 uch chan struct{} + + // Direct get subscription. + directSub *subscription } type sourceInfo struct { @@ -238,6 +244,7 @@ type sourceInfo struct { iname string cname string sub *subscription + dsub *subscription msgs *ipQueue // of *inMsg sseq uint64 dseq uint64 @@ -251,6 +258,12 @@ type sourceInfo struct { wg sync.WaitGroup } +// For mirrors and direct get +const ( + dgetGroup = "_zz_" + dgetCaughtUpThresh = 10 +) + // Headers for published messages. const ( JSMsgId = "Nats-Msg-Id" @@ -1512,12 +1525,6 @@ func (mset *stream) isMirror() bool { return mset.cfg.Mirror != nil } -func (mset *stream) hasSources() bool { - mset.mu.RLock() - defer mset.mu.RUnlock() - return len(mset.sources) > 0 -} - func (mset *stream) sourcesInfo() (sis []*StreamSourceInfo) { mset.mu.RLock() defer mset.mu.RUnlock() @@ -1840,6 +1847,15 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { mset.mirror.lag = pending - 1 } + // Check if we allow mirror direct here. If so check they we have mostly caught up. + // The reason we do not require 0 is if the source is active we may always be slightly behind. + if mset.cfg.MirrorDirect && mset.mirror.dsub == nil && pending < dgetCaughtUpThresh { + if err := mset.subscribeToMirrorDirect(); err != nil { + // Disable since we had problems above. + mset.cfg.MirrorDirect = false + } + } + js, stype := mset.js, mset.cfg.Storage mset.mu.Unlock() @@ -2219,6 +2235,11 @@ func (mset *stream) cancelSourceInfo(si *sourceInfo) { mset.unsubscribe(si.sub) si.sub = nil } + // In case we had a mirror direct subscription. + if si.dsub != nil { + mset.unsubscribe(si.dsub) + si.dsub = nil + } mset.removeInternalConsumer(si) if si.qch != nil { close(si.qch) @@ -2843,9 +2864,9 @@ func (mset *stream) subscribeToStream() error { return err } } + // Check for direct get access. if mset.cfg.AllowDirect { - dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name) - if _, err := mset.subscribeInternal(dsubj, mset.processDirectGetRequest); err != nil { + if err := mset.subscribeToDirect(); err != nil { return err } } @@ -2854,6 +2875,36 @@ func (mset *stream) subscribeToStream() error { return nil } +// Lock should be held. +func (mset *stream) subscribeToDirect() error { + if mset.directSub != nil { + return nil + } + dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name) + // We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis. + if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err != nil { + return err + } else { + mset.directSub = sub + } + return nil +} + +// Lock should be held. +func (mset *stream) subscribeToMirrorDirect() error { + if mset.mirror == nil || mset.mirror.dsub != nil { + return nil + } + dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.mirror.name) + // We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis. + if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err != nil { + return err + } else { + mset.mirror.dsub = sub + } + return nil +} + // Stop our source consumers. // Lock should be held. func (mset *stream) stopSourceConsumers() { @@ -2881,12 +2932,15 @@ func (mset *stream) unsubscribeToStream() error { mset.mirror = nil } - if len(mset.cfg.Sources) > 0 { + if len(mset.sources) > 0 { mset.stopSourceConsumers() } // In case we had a direct get subscription. - mset.unsubscribeInternal(fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)) + if mset.directSub != nil { + mset.unsubscribe(mset.directSub) + mset.directSub = nil + } mset.active = false return nil @@ -2915,6 +2969,22 @@ func (mset *stream) subscribeInternalUnlocked(subject string, cb msgHandler) (*s return mset.subscribeInternal(subject, cb) } +// Lock should be held. +func (mset *stream) queueSubscribeInternal(subject, group string, cb msgHandler) (*subscription, error) { + c := mset.client + if c == nil { + return nil, fmt.Errorf("invalid stream") + } + if cb == nil { + return nil, fmt.Errorf("undefined message handler") + } + + mset.sid++ + + // Now create the subscription + return c.processSub([]byte(subject), []byte(group), []byte(strconv.Itoa(mset.sid)), cb, false) +} + // This will unsubscribe us from the exact subject given. // We do not currently track the subs so do not have the sid. // This should be called only on an update.