Merge to fix conflicts

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-09-21 08:16:51 -07:00
5 changed files with 69 additions and 2 deletions

View File

@@ -5178,6 +5178,9 @@ func (o *consumerFileStore) copyRedelivered() map[uint64]uint64 {
return redelivered
}
// Type returns the type of the underlying store.
func (o *consumerFileStore) Type() StorageType { return FileStorage }
// State retrieves the state from the state file.
// This is not expected to be called in high performance code, only on startup.
func (o *consumerFileStore) State() (*ConsumerState, error) {

View File

@@ -2843,11 +2843,17 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
t := time.NewTicker(compactInterval)
defer t.Stop()
st := o.store.Type()
var lastSnap []byte
// Should only to be called from leader.
doSnapshot := func() {
if state, err := o.store.State(); err == nil && state != nil {
// Memory store consumers do not keep state in the store itself.
// Just compact to our applied index.
if st == MemoryStorage {
_, _, applied := n.Progress()
n.Compact(applied)
} else if state, err := o.store.State(); err == nil && state != nil {
// FileStore version.
if snap := encodeConsumerState(state); !bytes.Equal(lastSnap, snap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap = snap

View File

@@ -840,6 +840,9 @@ func (os *consumerMemStore) StreamDelete() error {
func (os *consumerMemStore) State() (*ConsumerState, error) { return nil, nil }
// Type returns the type of the underlying store.
func (os *consumerMemStore) Type() StorageType { return MemoryStorage }
// Templates
type templateMemStore struct{}

View File

@@ -3244,6 +3244,60 @@ func TestNoRaceJetStreamLastSubjSeqAndFilestoreCompact(t *testing.T) {
}
}
// Issue #2548
func TestNoRaceJetStreamClusterMemoryStreamConsumerRaftGrowth(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "memory-leak",
Subjects: []string{"memory-leak"},
Retention: nats.LimitsPolicy,
MaxMsgs: 1000,
Discard: nats.DiscardOld,
MaxAge: time.Minute,
Storage: nats.MemoryStorage,
Replicas: 3,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.QueueSubscribe("memory-leak", "q1", func(msg *nats.Msg) {
time.Sleep(1 * time.Second)
msg.Ack()
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send 10k (Must be > 8192 which is compactNumMin from monitorConsumer.
msg := []byte("NATS is a connective technology that powers modern distributed systems.")
for i := 0; i < 10_000; i++ {
if _, err := js.Publish("memory-leak", msg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// We will verify here that the underlying raft layer for the leader is not > 8192
cl := c.consumerLeader("$G", "memory-leak", "q1")
mset, err := cl.GlobalAccount().lookupStream("memory-leak")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
o := mset.lookupConsumer("q1")
if o == nil {
t.Fatalf("Error looking up consumer %q", "q1")
}
node := o.raftNode().(*raft)
if ms := node.wal.(*memStore); ms.State().Msgs > 8192 {
t.Fatalf("Did not compact the raft memory WAL")
}
}
func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

View File

@@ -156,6 +156,7 @@ type ConsumerStore interface {
UpdateAcks(dseq, sseq uint64) error
Update(*ConsumerState) error
State() (*ConsumerState, error)
Type() StorageType
Stop() error
Delete() error
StreamDelete() error