mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Properly recover ephemeral consumers after restart
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1644,6 +1644,22 @@ func (o *Consumer) SetInActiveDeleteThreshold(dthresh time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// switchToEphemeral is called on startup when recovering ephemerals.
|
||||
func (o *Consumer) switchToEphemeral() {
|
||||
o.mu.Lock()
|
||||
o.config.Durable = _EMPTY_
|
||||
store, ok := o.store.(*consumerFileStore)
|
||||
rr := o.acc.sl.Match(o.config.DeliverSubject)
|
||||
o.mu.Unlock()
|
||||
|
||||
// Update interest
|
||||
o.updateDeliveryInterest(len(rr.psubs)+len(rr.qsubs) > 0)
|
||||
// Write out new config
|
||||
if ok {
|
||||
store.updateConfig(o.config)
|
||||
}
|
||||
}
|
||||
|
||||
// RequestNextMsgSubject returns the subject to request the next message when in pull or worker mode.
|
||||
// Returns empty otherwise.
|
||||
func (o *Consumer) RequestNextMsgSubject() string {
|
||||
|
||||
@@ -58,6 +58,7 @@ type FileStreamInfo struct {
|
||||
// File ConsumerInfo is used for creating consumer stores.
|
||||
type FileConsumerInfo struct {
|
||||
Created time.Time
|
||||
Name string
|
||||
ConsumerConfig
|
||||
}
|
||||
|
||||
@@ -2246,6 +2247,14 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Will upodate the config. Only used when recovering ephemerals.
|
||||
func (o *consumerFileStore) updateConfig(cfg ConsumerConfig) error {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
o.cfg = &FileConsumerInfo{ConsumerConfig: cfg}
|
||||
return o.writeConsumerMeta()
|
||||
}
|
||||
|
||||
// Write out the consumer meta data, i.e. state.
|
||||
// Lock should be held.
|
||||
func (cfs *consumerFileStore) writeConsumerMeta() error {
|
||||
|
||||
@@ -551,11 +551,20 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
s.Warnf(" Error unmarshalling Consumer metafile: %v", err)
|
||||
continue
|
||||
}
|
||||
isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig)
|
||||
if isEphemeral {
|
||||
// This is an ephermal consumer and this could fail on restart until
|
||||
// the consumer can reconnect. We will create it as a durable and switch it.
|
||||
cfg.ConsumerConfig.Durable = ofi.Name()
|
||||
}
|
||||
obs, err := mset.AddConsumer(&cfg.ConsumerConfig)
|
||||
if err != nil {
|
||||
s.Warnf(" Error adding Consumer: %v", err)
|
||||
continue
|
||||
}
|
||||
if isEphemeral {
|
||||
obs.switchToEphemeral()
|
||||
}
|
||||
if !cfg.Created.IsZero() {
|
||||
obs.setCreated(cfg.Created)
|
||||
}
|
||||
|
||||
@@ -2208,7 +2208,7 @@ func TestJetStreamPullConsumerRemoveInterest(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) {
|
||||
func TestJetStreamEphemeralConsumerRecoveryAfterServerRestart(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -2216,6 +2216,104 @@ func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) {
|
||||
defer os.RemoveAll(config.StoreDir)
|
||||
}
|
||||
|
||||
mname := "MYS"
|
||||
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.FileStorage})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error adding stream: %v", err)
|
||||
}
|
||||
defer mset.Delete()
|
||||
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
sub, _ := nc.SubscribeSync(nats.NewInbox())
|
||||
defer sub.Unsubscribe()
|
||||
nc.Flush()
|
||||
|
||||
o, err := mset.AddConsumer(&server.ConsumerConfig{
|
||||
DeliverSubject: sub.Subject,
|
||||
AckPolicy: server.AckExplicit,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating consumer: %v", err)
|
||||
}
|
||||
defer o.Delete()
|
||||
|
||||
// Snapshot our name.
|
||||
oname := o.Name()
|
||||
|
||||
// Send 100 messages
|
||||
for i := 0; i < 100; i++ {
|
||||
sendStreamMsg(t, nc, mname, "Hello World!")
|
||||
}
|
||||
if state := mset.State(); state.Msgs != 100 {
|
||||
t.Fatalf("Expected %d messages, got %d", 100, state.Msgs)
|
||||
}
|
||||
|
||||
// Read 6 messages
|
||||
for i := 0; i <= 6; i++ {
|
||||
if m, err := sub.NextMsg(time.Second); err == nil {
|
||||
m.Respond(nil)
|
||||
} else {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Capture port since it was dynamic.
|
||||
u, _ := url.Parse(s.ClientURL())
|
||||
port, _ := strconv.Atoi(u.Port())
|
||||
|
||||
restartServer := func() {
|
||||
t.Helper()
|
||||
// Stop current server.
|
||||
s.Shutdown()
|
||||
// Restart.
|
||||
s = RunJetStreamServerOnPort(port)
|
||||
}
|
||||
|
||||
// Do twice
|
||||
for i := 0; i < 2; i++ {
|
||||
// Restart.
|
||||
restartServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
mset, err = s.GlobalAccount().LookupStream(mname)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to find a stream for %q", mname)
|
||||
}
|
||||
o = mset.LookupConsumer(oname)
|
||||
if o == nil {
|
||||
t.Fatalf("Error looking up consumer %q", oname)
|
||||
}
|
||||
// Make sure config does not have durable.
|
||||
if cfg := o.Config(); cfg.Durable != "" {
|
||||
t.Fatalf("Expected no durable to be set")
|
||||
}
|
||||
// Wait for it to become active
|
||||
checkFor(t, 200*time.Millisecond, 10*time.Millisecond, func() error {
|
||||
if !o.Active() {
|
||||
return fmt.Errorf("Consumer not active")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Now close the connection. Make sure this acts like an ephemeral and goes away.
|
||||
o.SetInActiveDeleteThreshold(10 * time.Millisecond)
|
||||
nc.Close()
|
||||
|
||||
checkFor(t, 200*time.Millisecond, 10*time.Millisecond, func() error {
|
||||
if o := mset.LookupConsumer(oname); o != nil {
|
||||
return fmt.Errorf("Consumer still active")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamConsumerMaxDeliveryAndServerRestart(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
if config := s.JetStreamConfig(); config != nil {
|
||||
defer os.RemoveAll(config.StoreDir)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user