Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-01-04 22:59:09 -08:00
2 changed files with 94 additions and 0 deletions

View File

@@ -1220,6 +1220,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error {
// Now walk the ones to check and process consumers.
var caAdd, caDel []*consumerAssignment
for _, sa := range saChk {
// Make sure to add in all the new ones from sa.
for _, ca := range sa.consumers {
caAdd = append(caAdd, ca)
}
if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil {
for _, ca := range osa.consumers {
if sa.consumers[ca.Name] == nil {

View File

@@ -2369,3 +2369,92 @@ func TestJetStreamClusterMemLeaderRestart(t *testing.T) {
t.Fatalf("expected a current leader after old leader restarted")
}
}
// Customer reported R1 consumers that seemed to be ghosted after server restart.
func TestJetStreamClusterLostConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "GHOST", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
Replicas: 3,
})
require_NoError(t, err)
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
_, err := js.Publish(fmt.Sprintf("events.%d.%d", i, j), []byte("test"))
require_NoError(t, err)
}
}
s := c.randomServer()
s.Shutdown()
c.waitOnLeader()
c.waitOnStreamLeader(globalAccountName, "TEST")
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()
cc := CreateConsumerRequest{
Stream: "TEST",
Config: ConsumerConfig{
AckPolicy: AckExplicit,
},
}
req, err := json.Marshal(cc)
require_NoError(t, err)
reqSubj := fmt.Sprintf(JSApiConsumerCreateT, "TEST")
// Now create 50 consumers. We do not wait for the answer.
for i := 0; i < 50; i++ {
nc.Publish(reqSubj, req)
}
nc.Flush()
// Grab the meta leader.
ml := c.leader()
require_NoError(t, ml.JetStreamSnapshotMeta())
numConsumerAssignments := func(s *Server) int {
t.Helper()
js := s.getJetStream()
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
for _, asa := range cc.streams {
for _, sa := range asa {
return len(sa.consumers)
}
}
return 0
}
checkFor(t, time.Second, 100*time.Millisecond, func() error {
num := numConsumerAssignments(ml)
if num == 50 {
return nil
}
return fmt.Errorf("Consumers is only %d", num)
})
// Restart the server we shutdown. We snapshotted to the snapshot
// has to fill in the new consumers.
// The bug would fail to add them to the meta state since the stream
// existed.
s = c.restartServer(s)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
num := numConsumerAssignments(s)
if num == 50 {
return nil
}
return fmt.Errorf("Consumers is only %d", num)
})
}