Allow stream and mirror direct get abilities to be updated.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-07-08 07:13:34 -07:00
parent 61fe6bcffb
commit 59a0a3ba5c
2 changed files with 96 additions and 0 deletions

View File

@@ -5001,6 +5001,83 @@ func TestJetStreamClusterStreamDirectGetMsg(t *testing.T) {
require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_)
}
func TestJetStreamClusterStreamAndMirrorDirectGetMsgUpdate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, _ := jsClientConnect(t, s)
defer nc.Close()
cfg := &StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: MemoryStorage,
Replicas: 3,
}
addStream(t, nc, cfg)
sendStreamMsg(t, nc, "foo", "bar")
getSubj := fmt.Sprintf(JSDirectMsgGetT, "TEST")
req := []byte(`{"last_by_subj": "foo"}`)
_, err := nc.Request(getSubj, req, 50*time.Millisecond)
require_Error(t, err, nats.ErrTimeout)
cfg.AllowDirect = true
jreq, err := json.Marshal(cfg)
require_NoError(t, err)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, "TEST"), jreq, time.Second)
require_NoError(t, err)
var scResp JSApiStreamCreateResponse
err = json.Unmarshal(resp.Data, &scResp)
require_NoError(t, err)
require_True(t, scResp.Error == nil)
_, err = nc.Request(getSubj, req, 50*time.Millisecond)
require_NoError(t, err)
// Make sure we can turn it off as well.
cfg.AllowDirect = false
jreq, err = json.Marshal(cfg)
require_NoError(t, err)
resp, err = nc.Request(fmt.Sprintf(JSApiStreamUpdateT, "TEST"), jreq, time.Second)
require_NoError(t, err)
err = json.Unmarshal(resp.Data, &scResp)
require_NoError(t, err)
require_True(t, scResp.Error == nil)
_, err = nc.Request(getSubj, req, 50*time.Millisecond)
require_Error(t, err, nats.ErrTimeout)
// Now test mirrors as well.
cfg = &StreamConfig{
Name: "M",
Mirror: &StreamSource{Name: "TEST"},
Storage: MemoryStorage,
}
addStream(t, nc, cfg)
_, err = nc.Request(getSubj, req, 50*time.Millisecond)
require_Error(t, err, nats.ErrTimeout)
cfg.MirrorDirect = true
jreq, err = json.Marshal(cfg)
require_NoError(t, err)
resp, err = nc.Request(fmt.Sprintf(JSApiStreamUpdateT, "M"), jreq, time.Second)
require_NoError(t, err)
err = json.Unmarshal(resp.Data, &scResp)
require_NoError(t, err)
require_True(t, scResp.Error == nil)
_, err = nc.Request(getSubj, req, 50*time.Millisecond)
require_NoError(t, err)
}
func TestJetStreamClusterStreamPerf(t *testing.T) {
// Comment out to run, holding place for now.
skip(t)

View File

@@ -1419,6 +1419,25 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if mset.isLeader() && sendAdvisory {
mset.sendUpdateAdvisoryLocked()
}
// Check for AllowDirect
if mset.cfg.AllowDirect {
mset.subscribeToDirect()
} else if mset.directSub != nil {
mset.unsubscribe(mset.directSub)
mset.directSub = nil
}
// Check for mirror. If set but we are not a mirror just ignore for now.
if mset.cfg.MirrorDirect && mset.cfg.Mirror != nil && mset.mirror.dsub == nil {
if err := mset.subscribeToMirrorDirect(); err != nil {
// Disable since we had problems above.
mset.cfg.MirrorDirect = false
}
} else if !mset.cfg.MirrorDirect && mset.mirror != nil && mset.mirror.dsub != nil {
mset.unsubscribe(mset.mirror.dsub)
mset.mirror.dsub = nil
}
mset.mu.Unlock()
if js != nil {