mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Ensure we add in new consumers from a meta snapshot from the leader.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1220,6 +1220,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error {
|
||||
// Now walk the ones to check and process consumers.
|
||||
var caAdd, caDel []*consumerAssignment
|
||||
for _, sa := range saChk {
|
||||
// Make sure to add in all the new ones from sa.
|
||||
for _, ca := range sa.consumers {
|
||||
caAdd = append(caAdd, ca)
|
||||
}
|
||||
|
||||
if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil {
|
||||
for _, ca := range osa.consumers {
|
||||
if sa.consumers[ca.Name] == nil {
|
||||
|
||||
@@ -2369,3 +2369,92 @@ func TestJetStreamClusterMemLeaderRestart(t *testing.T) {
|
||||
t.Fatalf("expected a current leader after old leader restarted")
|
||||
}
|
||||
}
|
||||
|
||||
// Customer reported R1 consumers that seemed to be ghosted after server restart.
|
||||
func TestJetStreamClusterLostConsumers(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "GHOST", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"events.>"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
for j := 0; j < 10; j++ {
|
||||
_, err := js.Publish(fmt.Sprintf("events.%d.%d", i, j), []byte("test"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
s := c.randomServer()
|
||||
s.Shutdown()
|
||||
|
||||
c.waitOnLeader()
|
||||
c.waitOnStreamLeader(globalAccountName, "TEST")
|
||||
|
||||
nc, _ = jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
cc := CreateConsumerRequest{
|
||||
Stream: "TEST",
|
||||
Config: ConsumerConfig{
|
||||
AckPolicy: AckExplicit,
|
||||
},
|
||||
}
|
||||
req, err := json.Marshal(cc)
|
||||
require_NoError(t, err)
|
||||
|
||||
reqSubj := fmt.Sprintf(JSApiConsumerCreateT, "TEST")
|
||||
|
||||
// Now create 50 consumers. We do not wait for the answer.
|
||||
for i := 0; i < 50; i++ {
|
||||
nc.Publish(reqSubj, req)
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
// Grab the meta leader.
|
||||
ml := c.leader()
|
||||
require_NoError(t, ml.JetStreamSnapshotMeta())
|
||||
|
||||
numConsumerAssignments := func(s *Server) int {
|
||||
t.Helper()
|
||||
js := s.getJetStream()
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
cc := js.cluster
|
||||
for _, asa := range cc.streams {
|
||||
for _, sa := range asa {
|
||||
return len(sa.consumers)
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
checkFor(t, time.Second, 100*time.Millisecond, func() error {
|
||||
num := numConsumerAssignments(ml)
|
||||
if num == 50 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Consumers is only %d", num)
|
||||
})
|
||||
|
||||
// Restart the server we shutdown. We snapshotted to the snapshot
|
||||
// has to fill in the new consumers.
|
||||
// The bug would fail to add them to the meta state since the stream
|
||||
// existed.
|
||||
s = c.restartServer(s)
|
||||
|
||||
checkFor(t, time.Second, 100*time.Millisecond, func() error {
|
||||
num := numConsumerAssignments(s)
|
||||
if num == 50 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Consumers is only %d", num)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user