mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
Merge pull request #3634 from nats-io/fix-concurrent-write
Fix concurrent write in consumer pending map at startup
This commit is contained in:
@@ -6233,6 +6233,9 @@ func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
|
||||
}
|
||||
|
||||
func (o *consumerFileStore) Update(state *ConsumerState) error {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
// Sanity checks.
|
||||
if state.AckFloor.Consumer > state.Delivered.Consumer {
|
||||
return fmt.Errorf("bad ack floor for consumer")
|
||||
@@ -6262,12 +6265,8 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Replace our state.
|
||||
o.mu.Lock()
|
||||
|
||||
// Check to see if this is an outdated update.
|
||||
if state.Delivered.Consumer < o.state.Delivered.Consumer {
|
||||
o.mu.Unlock()
|
||||
return fmt.Errorf("old update ignored")
|
||||
}
|
||||
|
||||
@@ -6276,7 +6275,6 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
|
||||
o.state.Pending = pending
|
||||
o.state.Redelivered = redelivered
|
||||
o.kickFlusher()
|
||||
o.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user