Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-01-23 17:30:08 -08:00
2 changed files with 61 additions and 9 deletions

View File

@@ -2411,3 +2411,54 @@ func TestJetStreamClusterScaleDownDuringServerOffline(t *testing.T) {
return nil
})
}
// Reported by a customer manually upgrading their streams to support direct gets.
// Worked if single replica but not in clustered mode.
func TestJetStreamClusterDirectGetStreamUpgrade(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "KV_TEST",
Subjects: []string{"$KV.TEST.>"},
Discard: nats.DiscardNew,
MaxMsgsPerSubject: 1,
DenyDelete: true,
Replicas: 3,
})
require_NoError(t, err)
kv, err := js.KeyValue("TEST")
require_NoError(t, err)
_, err = kv.PutString("name", "derek")
require_NoError(t, err)
entry, err := kv.Get("name")
require_NoError(t, err)
require_True(t, string(entry.Value()) == "derek")
// Now simulate a update to the stream to support direct gets.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "KV_TEST",
Subjects: []string{"$KV.TEST.>"},
Discard: nats.DiscardNew,
MaxMsgsPerSubject: 1,
DenyDelete: true,
AllowDirect: true,
Replicas: 3,
})
require_NoError(t, err)
// Rebind to KV to make sure we DIRECT version of Get().
kv, err = js.KeyValue("TEST")
require_NoError(t, err)
// Make sure direct get works.
entry, err = kv.Get("name")
require_NoError(t, err)
require_True(t, string(entry.Value()) == "derek")
}

View File

@@ -1542,16 +1542,16 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
delete(mset.sources, iname)
}
}
}
// If we are not clustered here and we want allow direct, go ahead and inline here.
// use cfg.Replicas vs isClustered() in case we are updating.
if cfg.Replicas == 1 {
// Check direct.
if cfg.AllowDirect {
mset.subscribeToDirect()
} else {
mset.unsubscribeToDirect()
}
// Check for a change in allow direct status.
// These will run on all members, so just update as appropriate here.
// We do make sure we are caught up under monitorStream() during initial startup.
if cfg.AllowDirect != ocfg.AllowDirect {
if cfg.AllowDirect {
mset.subscribeToDirect()
} else {
mset.unsubscribeToDirect()
}
}
@@ -3062,6 +3062,7 @@ func (mset *stream) subscribeToStream() error {
}
}
// Check for direct get access.
// We spin up followers for clustered streams in monitorStream().
if mset.cfg.AllowDirect {
if err := mset.subscribeToDirect(); err != nil {
return err