When consumers were R1 and the same name was reused, server restarts could try to cleanup old ones and effect the new ones.

These changes allow consumer name reuse more effectively during server restarts.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-06-05 12:48:18 -07:00
parent 64e3bf82ed
commit 4ac45ff6f3
5 changed files with 109 additions and 7 deletions

2
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.16.5
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.4.1
github.com/nats-io/nats.go v1.24.0
github.com/nats-io/nats.go v1.26.0
github.com/nats-io/nkeys v0.4.4
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.1

4
go.sum
View File

@@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ=
github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA=
github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE=
github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

View File

@@ -1417,9 +1417,10 @@ func (o *consumer) deleteNotActive() {
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
ca := js.consumerAssignment(acc, stream, name)
nca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
if ca != nil {
// Make sure this is not a new consumer with the same name.
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
} else {

View File

@@ -3902,8 +3902,12 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
var needDelete bool
if accStreams := cc.streams[ca.Client.serviceAccount()]; accStreams != nil {
if sa := accStreams[ca.Stream]; sa != nil && sa.consumers != nil && sa.consumers[ca.Name] != nil {
needDelete = true
delete(sa.consumers, ca.Name)
oca := sa.consumers[ca.Name]
// Make sure this removal is for what we have, otherwise ignore.
if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name {
needDelete = true
delete(sa.consumers, ca.Name)
}
}
}
js.mu.Unlock()

View File

@@ -27,6 +27,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -4398,3 +4399,99 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) {
si.State.FirstSeq, si.State.LastSeq)
}
}
func TestJetStreamClusterConsumerCleanupWithSameName(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "UPDATES",
Subjects: []string{"DEVICE.*"},
Replicas: 3,
})
require_NoError(t, err)
// Create a consumer that will be an R1 that we will auto-recreate but using the same name.
// We want to make sure that the system does not continually try to cleanup the new one from the old one.
// Track the sequence for restart etc.
var seq atomic.Uint64
msgCB := func(msg *nats.Msg) {
msg.AckSync()
meta, err := msg.Metadata()
require_NoError(t, err)
seq.Store(meta.Sequence.Stream)
}
waitOnSeqDelivered := func(expected uint64) {
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
received := seq.Load()
if received == expected {
return nil
}
return fmt.Errorf("Seq is %d, want %d", received, expected)
})
}
doSub := func() {
_, err = js.Subscribe(
"DEVICE.22",
msgCB,
nats.ConsumerName("dlc"),
nats.SkipConsumerLookup(),
nats.StartSequence(seq.Load()+1),
nats.MaxAckPending(1), // One at a time.
nats.ManualAck(),
nats.ConsumerReplicas(1),
nats.ConsumerMemoryStorage(),
nats.MaxDeliver(1),
nats.InactiveThreshold(time.Second),
nats.IdleHeartbeat(250*time.Millisecond),
)
require_NoError(t, err)
}
// Track any errors for consumer not active so we can recreate the consumer.
errCh := make(chan error, 10)
nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
if errors.Is(err, nats.ErrConsumerNotActive) {
s.Unsubscribe()
errCh <- err
doSub()
}
})
doSub()
sendStreamMsg(t, nc, "DEVICE.22", "update-1")
sendStreamMsg(t, nc, "DEVICE.22", "update-2")
sendStreamMsg(t, nc, "DEVICE.22", "update-3")
waitOnSeqDelivered(3)
// Shutdown the consumer's leader.
s := c.consumerLeader(globalAccountName, "UPDATES", "dlc")
s.Shutdown()
c.waitOnStreamLeader(globalAccountName, "UPDATES")
// In case our client connection was to the same server.
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()
sendStreamMsg(t, nc, "DEVICE.22", "update-4")
sendStreamMsg(t, nc, "DEVICE.22", "update-5")
sendStreamMsg(t, nc, "DEVICE.22", "update-6")
// Wait for the consumer not active error.
<-errCh
// Now restart server with the old consumer.
c.restartServer(s)
// Wait on all messages delivered.
waitOnSeqDelivered(6)
// Make sure no other errors showed up
require_True(t, len(errCh) == 0)
}