Allow direct msg get for stream to operate in queue group and allows mirrors to opt-in to the same group.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-07-02 13:57:14 -07:00
parent 7cf814de8e
commit 4075721651
3 changed files with 188 additions and 16 deletions

View File

@@ -1303,6 +1303,13 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}
// If we are told to do mirror direct but are not mirroring, error.
if cfg.MirrorDirect && cfg.Mirror == nil {
resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Hand off to cluster for processing.
if s.JetStreamIsClustered() {
s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
@@ -1798,11 +1805,6 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
if clusterWideConsCount > 0 {
resp.StreamInfo.State.Consumers = clusterWideConsCount
}
if mset.isMirror() {
resp.StreamInfo.Mirror = mset.mirrorInfo()
} else if mset.hasSources() {
resp.StreamInfo.Sources = mset.sourcesInfo()
}
// Check if they have asked for subject details.
if subjects != _EMPTY_ {

View File

@@ -2483,3 +2483,103 @@ func TestJetStreamSuperClusterStateOnRestartPreventsConsumerRecovery(t *testing.
t.Fatalf("Consumer was not properly restarted")
}
}
// We allow mirrors to opt-in to direct get in a distributed queue group.
func TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup(t *testing.T) {
sc := createJetStreamTaggedSuperCluster(t)
defer sc.shutdown()
nc, js := jsClientConnect(t, sc.randomServer())
defer nc.Close()
// C1
// Do by hand for now.
cfg := &StreamConfig{
Name: "SOURCE",
Subjects: []string{"kv.>"},
MaxMsgsPer: 1,
Placement: &Placement{Tags: []string{"cloud:aws", "country:us"}},
AllowDirect: true,
Replicas: 3,
Storage: MemoryStorage,
}
addStream(t, nc, cfg)
num := 100
for i := 0; i < num; i++ {
js.PublishAsync(fmt.Sprintf("kv.%d", i), []byte("VAL"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// C2
cfg = &StreamConfig{
Name: "M1",
Mirror: &StreamSource{Name: "SOURCE"},
Placement: &Placement{Tags: []string{"cloud:gcp", "country:uk"}},
MirrorDirect: true,
Storage: MemoryStorage,
}
addStream(t, nc, cfg)
// C3 (clustered)
cfg = &StreamConfig{
Name: "M2",
Mirror: &StreamSource{Name: "SOURCE"},
Replicas: 3,
Placement: &Placement{Tags: []string{"country:jp"}},
MirrorDirect: true,
Storage: MemoryStorage,
}
addStream(t, nc, cfg)
// Since last one was an R3, check and wait for subscription.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
sl := sc.clusterForName("C3").streamLeader("$G", "M2")
if mset, err := sl.GlobalAccount().lookupStream("M2"); err == nil {
mset.mu.RLock()
ok := mset.mirror.dsub != nil
mset.mu.RUnlock()
if ok {
return nil
}
}
return fmt.Errorf("No dsub yet")
})
// Always do a direct get to the source, but check that we are getting answers from the mirrors when connected to their cluster.
getSubj := fmt.Sprintf(JSDirectMsgGetT, "SOURCE")
req := []byte(`{"last_by_subj":"kv.22"}`)
getMsg := func(c *nats.Conn) *nats.Msg {
m, err := c.Request(getSubj, req, time.Second)
require_NoError(t, err)
require_True(t, string(m.Data) == "VAL")
require_True(t, m.Header.Get(JSSequence) == "23")
require_True(t, m.Header.Get(JSSubject) == "kv.22")
return m
}
// C1 -> SOURCE
nc, _ = jsClientConnect(t, sc.clusterForName("C1").randomServer())
defer nc.Close()
m := getMsg(nc)
require_True(t, m.Header.Get(JSStream) == "SOURCE")
// C2 -> M1
nc, _ = jsClientConnect(t, sc.clusterForName("C2").randomServer())
defer nc.Close()
m = getMsg(nc)
require_True(t, m.Header.Get(JSStream) == "M1")
// C3 -> M2
nc, _ = jsClientConnect(t, sc.clusterForName("C3").randomServer())
defer nc.Close()
m = getMsg(nc)
require_True(t, m.Header.Get(JSStream) == "M2")
}

View File

@@ -72,8 +72,11 @@ type StreamConfig struct {
// AllowRollup allows messages to be placed into the system and purge
// all older messages using a special msg header.
AllowRollup bool `json:"allow_rollup_hdrs"`
// Allow higher peformance, direct access to get individual messages.
// Allow higher performance, direct access to get individual messages. E.g. KeyValue
AllowDirect bool `json:"allow_direct,omitempty"`
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct,omitempty"`
}
// RePublish is for republishing messages once committed to a stream.
@@ -231,6 +234,9 @@ type stream struct {
lqsent time.Time
catchups map[string]uint64
uch chan struct{}
// Direct get subscription.
directSub *subscription
}
type sourceInfo struct {
@@ -238,6 +244,7 @@ type sourceInfo struct {
iname string
cname string
sub *subscription
dsub *subscription
msgs *ipQueue // of *inMsg
sseq uint64
dseq uint64
@@ -251,6 +258,12 @@ type sourceInfo struct {
wg sync.WaitGroup
}
// For mirrors and direct get
const (
dgetGroup = "_zz_"
dgetCaughtUpThresh = 10
)
// Headers for published messages.
const (
JSMsgId = "Nats-Msg-Id"
@@ -1512,12 +1525,6 @@ func (mset *stream) isMirror() bool {
return mset.cfg.Mirror != nil
}
func (mset *stream) hasSources() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
return len(mset.sources) > 0
}
func (mset *stream) sourcesInfo() (sis []*StreamSourceInfo) {
mset.mu.RLock()
defer mset.mu.RUnlock()
@@ -1840,6 +1847,15 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
mset.mirror.lag = pending - 1
}
// Check if we allow mirror direct here. If so check they we have mostly caught up.
// The reason we do not require 0 is if the source is active we may always be slightly behind.
if mset.cfg.MirrorDirect && mset.mirror.dsub == nil && pending < dgetCaughtUpThresh {
if err := mset.subscribeToMirrorDirect(); err != nil {
// Disable since we had problems above.
mset.cfg.MirrorDirect = false
}
}
js, stype := mset.js, mset.cfg.Storage
mset.mu.Unlock()
@@ -2219,6 +2235,11 @@ func (mset *stream) cancelSourceInfo(si *sourceInfo) {
mset.unsubscribe(si.sub)
si.sub = nil
}
// In case we had a mirror direct subscription.
if si.dsub != nil {
mset.unsubscribe(si.dsub)
si.dsub = nil
}
mset.removeInternalConsumer(si)
if si.qch != nil {
close(si.qch)
@@ -2843,9 +2864,9 @@ func (mset *stream) subscribeToStream() error {
return err
}
}
// Check for direct get access.
if mset.cfg.AllowDirect {
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)
if _, err := mset.subscribeInternal(dsubj, mset.processDirectGetRequest); err != nil {
if err := mset.subscribeToDirect(); err != nil {
return err
}
}
@@ -2854,6 +2875,36 @@ func (mset *stream) subscribeToStream() error {
return nil
}
// Lock should be held.
func (mset *stream) subscribeToDirect() error {
if mset.directSub != nil {
return nil
}
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)
// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.
if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err != nil {
return err
} else {
mset.directSub = sub
}
return nil
}
// Lock should be held.
func (mset *stream) subscribeToMirrorDirect() error {
if mset.mirror == nil || mset.mirror.dsub != nil {
return nil
}
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.mirror.name)
// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.
if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err != nil {
return err
} else {
mset.mirror.dsub = sub
}
return nil
}
// Stop our source consumers.
// Lock should be held.
func (mset *stream) stopSourceConsumers() {
@@ -2881,12 +2932,15 @@ func (mset *stream) unsubscribeToStream() error {
mset.mirror = nil
}
if len(mset.cfg.Sources) > 0 {
if len(mset.sources) > 0 {
mset.stopSourceConsumers()
}
// In case we had a direct get subscription.
mset.unsubscribeInternal(fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name))
if mset.directSub != nil {
mset.unsubscribe(mset.directSub)
mset.directSub = nil
}
mset.active = false
return nil
@@ -2915,6 +2969,22 @@ func (mset *stream) subscribeInternalUnlocked(subject string, cb msgHandler) (*s
return mset.subscribeInternal(subject, cb)
}
// Lock should be held.
func (mset *stream) queueSubscribeInternal(subject, group string, cb msgHandler) (*subscription, error) {
c := mset.client
if c == nil {
return nil, fmt.Errorf("invalid stream")
}
if cb == nil {
return nil, fmt.Errorf("undefined message handler")
}
mset.sid++
// Now create the subscription
return c.processSub([]byte(subject), []byte(group), []byte(strconv.Itoa(mset.sid)), cb, false)
}
// This will unsubscribe us from the exact subject given.
// We do not currently track the subs so do not have the sid.
// This should be called only on an update.