mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Fix raft issue where pindex of follower was off by 1 (#3277)
introduced by 57395bba02
Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -4008,6 +4008,51 @@ func TestJetStreamClusterDoubleStreamMove(t *testing.T) {
|
||||
moveAndCheck(srvMoveList[3], srvMoveList[0], srvMoveList[2], srvMoveList[1], srvMoveList[0])
|
||||
}
|
||||
|
||||
func TestJetStreamClusterConsumerScaleUp(t *testing.T) {
|
||||
c := createJetStreamCluster(t, jsClusterTempl, "HUB", _EMPTY_, 3, 22020, true)
|
||||
defer c.shutdown()
|
||||
|
||||
// Client based API
|
||||
srv := c.randomNonLeader()
|
||||
nc, js := jsClientConnect(t, srv)
|
||||
defer nc.Close()
|
||||
|
||||
scfg := nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Replicas: 1,
|
||||
}
|
||||
_, err := js.AddStream(&scfg)
|
||||
require_NoError(t, err)
|
||||
defer js.DeleteStream("TEST")
|
||||
|
||||
dcfg := nats.ConsumerConfig{
|
||||
Durable: "DUR",
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
Replicas: 0}
|
||||
_, err = js.AddConsumer("TEST", &dcfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err = js.Publish("foo", nil)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
scfg.Replicas = 2
|
||||
_, err = js.UpdateStream(&scfg)
|
||||
require_NoError(t, err)
|
||||
|
||||
// The scale up issue shows itself as permanent loss of consumer leadership
|
||||
// So give it some time for the change to propagate to new consumer peers and the quorum to disrupt
|
||||
// 2 seconds is a value arrived by experimentally, no sleep or a sleep of 1sec always had the test pass a lot.
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
c.waitOnStreamLeader("$G", "TEST")
|
||||
|
||||
// There is also a timing component to the issue triggering.
|
||||
c.waitOnConsumerLeader("$G", "TEST", "DUR")
|
||||
}
|
||||
|
||||
func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) {
|
||||
s := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
|
||||
func(serverName, clusterName, storeDir, conf string) string {
|
||||
|
||||
@@ -2218,7 +2218,7 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
|
||||
var state StreamState
|
||||
n.wal.FastState(&state)
|
||||
if snap.lastIndex < state.FirstSeq && state.FirstSeq != 0 {
|
||||
snap.lastIndex = state.FirstSeq
|
||||
snap.lastIndex = state.FirstSeq - 1
|
||||
ae.pindex = snap.lastIndex
|
||||
}
|
||||
encoding, err := ae.encode(nil)
|
||||
|
||||
Reference in New Issue
Block a user