Merge pull request #3067 from nats-io/js_fix_mirror_dedup

[FIXED] JetStream: Mirrors would fail to be recovered
This commit is contained in:
Ivan Kozlovic
2022-04-21 11:27:04 -06:00
committed by GitHub
3 changed files with 102 additions and 3 deletions

View File

@@ -1164,9 +1164,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
}
// We had a bug that set a default de dupe window on mirror, despite that being not a valid config
if cfg.Mirror != nil && cfg.StreamConfig.Duplicates != 0 {
cfg.StreamConfig.Duplicates = 0
}
fixCfgMirrorWithDedupWindow(&cfg.StreamConfig)
// We had a bug that could allow subjects in that had prefix or suffix spaces. We check for that here
// and will patch them on the fly for now. We will warn about them.
@@ -2582,3 +2580,13 @@ func validateJetStreamOptions(o *Options) error {
}
return nil
}
// We had a bug that set a default de dupe window on mirror, despite that being not a valid config
func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) {
if cfg == nil || cfg.Mirror == nil {
return
}
if cfg.Duplicates != 0 {
cfg.Duplicates = 0
}
}

View File

@@ -1018,6 +1018,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error {
// Build our new version here outside of js.
streams := make(map[string]map[string]*streamAssignment)
for _, wsa := range wsas {
fixCfgMirrorWithDedupWindow(wsa.Config)
as := streams[wsa.Client.serviceAccount()]
if as == nil {
as = make(map[string]*streamAssignment)
@@ -5226,6 +5227,10 @@ func encodeDeleteStreamAssignment(sa *streamAssignment) []byte {
func decodeStreamAssignment(buf []byte) (*streamAssignment, error) {
var sa streamAssignment
err := json.Unmarshal(buf, &sa)
if err != nil {
return nil, err
}
fixCfgMirrorWithDedupWindow(sa.Config)
return &sa, err
}

View File

@@ -14075,3 +14075,89 @@ func TestJetStreamMirrorSourceLoop(t *testing.T) {
test(t, c.randomServer(), 2)
})
}
func TestJetStreamClusterMirrorDeDupWindow(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "S",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
require_True(t, si.Cluster != nil)
require_True(t, si.Config.Replicas == 3)
require_True(t, len(si.Cluster.Replicas) == 2)
send := func(count int) {
t.Helper()
for i := 0; i < count; i++ {
_, err := js.Publish("foo", []byte("msg"))
require_NoError(t, err)
}
}
// Send 100 messages
send(100)
// First check that we can't create with a duplicates window
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Replicas: 3,
Mirror: &nats.StreamSource{Name: "S"},
Duplicates: time.Hour,
})
require_Error(t, err)
// Now create a valid one.
si, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Replicas: 3,
Mirror: &nats.StreamSource{Name: "S"},
})
require_NoError(t, err)
require_True(t, si.Cluster != nil)
require_True(t, si.Config.Replicas == 3)
require_True(t, len(si.Cluster.Replicas) == 2)
check := func(expected int) {
t.Helper()
// Wait for all messages to be in mirror
checkFor(t, 15*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo("M")
if err != nil {
return err
}
if n := si.State.Msgs; int(n) != expected {
return fmt.Errorf("Expected %v msgs, got %v", expected, n)
}
return nil
})
}
check(100)
// Restart cluster
nc.Close()
c.stopAll()
c.restartAll()
c.waitOnLeader()
c.waitOnStreamLeader(globalAccountName, "S")
c.waitOnStreamLeader(globalAccountName, "M")
nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()
si, err = js.StreamInfo("M")
require_NoError(t, err)
require_True(t, si.Cluster != nil)
require_True(t, si.Config.Replicas == 3)
require_True(t, len(si.Cluster.Replicas) == 2)
// Send 100 messages
send(100)
check(200)
}