Auto cleanup dangling messages from interest policy streams on server start.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-11-14 13:04:49 -08:00
parent a847570030
commit b6ef2c8910
3 changed files with 122 additions and 0 deletions

View File

@@ -1142,6 +1142,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
}
var consumers []*ce
// Collect any interest policy streams to check for
// https://github.com/nats-io/nats-server/issues/3612
var ipstreams []*stream
// Remember if we should be encrypted and what cipher we think we should use.
encrypted := s.getOpts().JetStreamKey != _EMPTY_
plaintext := true
@@ -1284,6 +1288,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
state := mset.state()
s.Noticef(" Restored %s messages for stream '%s > %s'", comma(int64(state.Msgs)), mset.accName(), mset.name())
// Collect to check for dangling messages.
// TODO(dlc) - Can be removed eventually.
if cfg.StreamConfig.Retention == InterestPolicy {
ipstreams = append(ipstreams, mset)
}
// Now do the consumers.
odir := filepath.Join(sdir, fi.Name(), consumerDir)
consumers = append(consumers, &ce{mset, odir})
@@ -1368,6 +1378,11 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
// Make sure to cleanup any old remaining snapshots.
os.RemoveAll(filepath.Join(jsa.storeDir, snapsDir))
// Check interest policy streams for auto cleanup.
for _, mset := range ipstreams {
mset.checkForOrphanMsgs()
}
s.Debugf("JetStream state for account %q recovered", a.Name)
return nil

View File

@@ -19681,3 +19681,76 @@ func TestJetStreamPullConsumersTimeoutHeaders(t *testing.T) {
t.Logf("Subscription did not receive the pull request response on server shutdown")
}
}
// For issue https://github.com/nats-io/nats-server/issues/3612
// Do auto cleanup.
func TestJetStreamDanglingMessageAutoCleanup(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.InterestPolicy,
})
require_NoError(t, err)
sub, err := js.PullSubscribe("foo", "dlc", nats.MaxAckPending(10))
require_NoError(t, err)
// Send 100 msgs
n := 100
for i := 0; i < n; i++ {
sendStreamMsg(t, nc, "foo", "msg")
}
// Grab and ack 10 messages.
for _, m := range fetchMsgs(t, sub, 10, time.Second) {
m.AckSync()
}
ci, err := sub.ConsumerInfo()
require_NoError(t, err)
require_True(t, ci.AckFloor.Stream == 10)
// Stop current
sd := s.JetStreamConfig().StoreDir
s.Shutdown()
// We will hand move the ackfloor to simulate dangling message condition.
cstore := filepath.Join(sd, "$G", "streams", "TEST", "obs", "dlc", "o.dat")
buf, err := os.ReadFile(cstore)
require_NoError(t, err)
state, err := decodeConsumerState(buf)
require_NoError(t, err)
// Update from 10 for delivered and ack to 90.
state.Delivered.Stream, state.Delivered.Consumer = 90, 90
state.AckFloor.Stream, state.AckFloor.Consumer = 90, 90
err = os.WriteFile(cstore, encodeConsumerState(state), defaultFilePerms)
require_NoError(t, err)
// Restart.
s = RunJetStreamServerOnPort(-1, sd)
defer s.Shutdown()
nc, js = jsClientConnect(t, s)
defer nc.Close()
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs != 10 {
t.Fatalf("Expected auto-cleanup to have worked but got %d msgs vs 10", si.State.Msgs)
}
}

View File

@@ -4859,3 +4859,37 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
}
return mset, nil
}
// This is to check for dangling messages.
// Issue https://github.com/nats-io/nats-server/issues/3612
func (mset *stream) checkForOrphanMsgs() {
// We need to grab the low water mark for all consumers.
var ackFloor uint64
mset.mu.RLock()
for _, o := range mset.consumers {
o.mu.RLock()
if o.store != nil {
if state, err := o.store.BorrowState(); err == nil {
if ackFloor == 0 || state.AckFloor.Stream < ackFloor {
ackFloor = state.AckFloor.Stream
}
}
}
o.mu.RUnlock()
}
// Grabs stream state.
var state StreamState
mset.store.FastState(&state)
s, acc := mset.srv, mset.acc
mset.mu.RUnlock()
if ackFloor > state.FirstSeq {
req := &JSApiStreamPurgeRequest{Sequence: ackFloor + 1}
purged, err := mset.purge(req)
if err != nil {
s.Warnf("stream '%s > %s' could not auto purge orphaned messages: %v", acc, mset.name(), err)
} else {
s.Debugf("stream '%s > %s' auto purged %d messages", acc, mset.name(), purged)
}
}
}