mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
[FIXED] JetStream: Mirrors would fail to be recovered
This is a continuation of PR #3060, but extends to clustering. Verified with manual test that a mirror created with v2.7.4 has the duplicates window set and on restart with main would still complain about use of dedup in cluster mode. The mirror stream was recovered but showing as R1. With this fix, a restart of the cluster - with existing data - will properly recover the stream as an R3 and messages that were published while in a bad state are synchronized. Signed-off-by: Ivan Kozlovic <ivan@synadia.com> Signed-off-by: Matthias Hanel mh@synadia.com
This commit is contained in:
@@ -1164,9 +1164,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
|
||||
}
|
||||
|
||||
// We had a bug that set a default de dupe window on mirror, despite that being not a valid config
|
||||
if cfg.Mirror != nil && cfg.StreamConfig.Duplicates != 0 {
|
||||
cfg.StreamConfig.Duplicates = 0
|
||||
}
|
||||
fixCfgMirrorWithDedupWindow(&cfg.StreamConfig)
|
||||
|
||||
// We had a bug that could allow subjects in that had prefix or suffix spaces. We check for that here
|
||||
// and will patch them on the fly for now. We will warn about them.
|
||||
@@ -2582,3 +2580,13 @@ func validateJetStreamOptions(o *Options) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// We had a bug that set a default de dupe window on mirror, despite that being not a valid config
|
||||
func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) {
|
||||
if cfg == nil || cfg.Mirror == nil {
|
||||
return
|
||||
}
|
||||
if cfg.Duplicates != 0 {
|
||||
cfg.Duplicates = 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1018,6 +1018,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, isRecovering bool) error {
|
||||
// Build our new version here outside of js.
|
||||
streams := make(map[string]map[string]*streamAssignment)
|
||||
for _, wsa := range wsas {
|
||||
fixCfgMirrorWithDedupWindow(wsa.Config)
|
||||
as := streams[wsa.Client.serviceAccount()]
|
||||
if as == nil {
|
||||
as = make(map[string]*streamAssignment)
|
||||
@@ -5226,6 +5227,10 @@ func encodeDeleteStreamAssignment(sa *streamAssignment) []byte {
|
||||
func decodeStreamAssignment(buf []byte) (*streamAssignment, error) {
|
||||
var sa streamAssignment
|
||||
err := json.Unmarshal(buf, &sa)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fixCfgMirrorWithDedupWindow(sa.Config)
|
||||
return &sa, err
|
||||
}
|
||||
|
||||
|
||||
@@ -14075,3 +14075,89 @@ func TestJetStreamMirrorSourceLoop(t *testing.T) {
|
||||
test(t, c.randomServer(), 2)
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterMirrorDeDupWindow(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
si, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "S",
|
||||
Subjects: []string{"foo"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
require_True(t, si.Cluster != nil)
|
||||
require_True(t, si.Config.Replicas == 3)
|
||||
require_True(t, len(si.Cluster.Replicas) == 2)
|
||||
|
||||
send := func(count int) {
|
||||
t.Helper()
|
||||
for i := 0; i < count; i++ {
|
||||
_, err := js.Publish("foo", []byte("msg"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Send 100 messages
|
||||
send(100)
|
||||
|
||||
// First check that we can't create with a duplicates window
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "M",
|
||||
Replicas: 3,
|
||||
Mirror: &nats.StreamSource{Name: "S"},
|
||||
Duplicates: time.Hour,
|
||||
})
|
||||
require_Error(t, err)
|
||||
|
||||
// Now create a valid one.
|
||||
si, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "M",
|
||||
Replicas: 3,
|
||||
Mirror: &nats.StreamSource{Name: "S"},
|
||||
})
|
||||
require_NoError(t, err)
|
||||
require_True(t, si.Cluster != nil)
|
||||
require_True(t, si.Config.Replicas == 3)
|
||||
require_True(t, len(si.Cluster.Replicas) == 2)
|
||||
|
||||
check := func(expected int) {
|
||||
t.Helper()
|
||||
// Wait for all messages to be in mirror
|
||||
checkFor(t, 15*time.Second, 50*time.Millisecond, func() error {
|
||||
si, err := js.StreamInfo("M")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n := si.State.Msgs; int(n) != expected {
|
||||
return fmt.Errorf("Expected %v msgs, got %v", expected, n)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
check(100)
|
||||
|
||||
// Restart cluster
|
||||
nc.Close()
|
||||
c.stopAll()
|
||||
c.restartAll()
|
||||
c.waitOnLeader()
|
||||
c.waitOnStreamLeader(globalAccountName, "S")
|
||||
c.waitOnStreamLeader(globalAccountName, "M")
|
||||
|
||||
nc, js = jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
si, err = js.StreamInfo("M")
|
||||
require_NoError(t, err)
|
||||
require_True(t, si.Cluster != nil)
|
||||
require_True(t, si.Config.Replicas == 3)
|
||||
require_True(t, len(si.Cluster.Replicas) == 2)
|
||||
|
||||
// Send 100 messages
|
||||
send(100)
|
||||
check(200)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user