Merge pull request #2334 from nats-io/lastupdate

Fix for #2329.
This commit is contained in:
Derek Collison
2021-06-30 21:26:59 -07:00
committed by GitHub
3 changed files with 85 additions and 26 deletions

View File

@@ -334,13 +334,8 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return nil, fmt.Errorf("could not create hash: %v", err)
}
// Determine if we should be tracking multiple subjects etc.
fs.tms = !(len(cfg.Subjects) == 1 && subjectIsLiteral(cfg.Subjects[0]))
// Raft usage of filestore has no subjects or mirrors or sources. No need to track.
if fs.tms && len(cfg.Subjects) == 0 && cfg.Mirror == nil && len(cfg.Sources) == 0 {
fs.tms = false
}
// Always track per subject information.
fs.tms = true
// Recover our message state.
if err := fs.recoverMsgs(); err != nil {
@@ -374,7 +369,6 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
if fs.isClosed() {
return ErrStoreClosed
}
if cfg.Name == _EMPTY_ {
return fmt.Errorf("name required")
}
@@ -836,12 +830,13 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
// Do per subject info.
if mb.fss != nil {
subj := string(data[:slen])
if ss := mb.fss[subj]; ss != nil {
ss.Msgs++
ss.Last = seq
} else {
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
if subj := string(data[:slen]); len(subj) > 0 {
if ss := mb.fss[subj]; ss != nil {
ss.Msgs++
ss.Last = seq
} else {
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
}
}
}
}
@@ -3860,11 +3855,10 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
// generatePerSubjectInfo will generate the per subject info via the raw msg block.
func (mb *msgBlock) generatePerSubjectInfo() error {
var shouldExpire bool
mb.mu.Lock()
defer mb.mu.Unlock()
var shouldExpire bool
if !mb.cacheAlreadyLoaded() {
mb.loadMsgsWithLock()
shouldExpire = true
@@ -3874,7 +3868,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
}
fseq, lseq := mb.first.seq, mb.last.seq
for seq := fseq; seq <= lseq; seq++ {
if sm, _ := mb.cacheLookupWithLock(seq); sm != nil {
if sm, _ := mb.cacheLookupWithLock(seq); sm != nil && len(sm.subj) > 0 {
if ss := mb.fss[sm.subj]; ss != nil {
ss.Msgs++
ss.Last = seq
@@ -3903,11 +3897,8 @@ func (mb *msgBlock) readPerSubjectInfo() error {
)
buf, err := ioutil.ReadFile(mb.sfn)
if err != nil || len(buf) < minFileSize {
return mb.generatePerSubjectInfo()
}
if err := checkHeader(buf); err != nil {
if err != nil || len(buf) < minFileSize || checkHeader(buf) != nil {
return mb.generatePerSubjectInfo()
}
@@ -3955,6 +3946,10 @@ func (mb *msgBlock) readPerSubjectInfo() error {
// writePerSubjectInfo will write out per subject information if we are tracking per subject.
// Lock should be held.
func (mb *msgBlock) writePerSubjectInfo() error {
// Raft groups do not have any subjects.
if len(mb.fss) == 0 {
return nil
}
var scratch [4 * binary.MaxVarintLen64]byte
var b bytes.Buffer
b.WriteByte(magic)

View File

@@ -3419,11 +3419,13 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, subject, re
resp.Error = ApiErrors[JSClusterNotAvailErr]
// Delaying an error response gives the leader a chance to respond before us
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group)
} else if rg := ca.Group; rg != nil && rg.node != nil && rg.isMember(cc.meta.ID()) {
// Check here if we are a member and this is just a new consumer that does not have a leader yet.
if rg.node.GroupLeader() == _EMPTY_ && !rg.node.HadPreviousLeader() {
resp.Error = ApiErrors[JSConsumerNotFoundErr]
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
} else if ca != nil {
if rg := ca.Group; rg != nil && rg.node != nil && rg.isMember(cc.meta.ID()) {
// Check here if we are a member and this is just a new consumer that does not have a leader yet.
if rg.node.GroupLeader() == _EMPTY_ && !rg.node.HadPreviousLeader() {
resp.Error = ApiErrors[JSConsumerNotFoundErr]
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
}
}
}
return

View File

@@ -10971,6 +10971,68 @@ func TestJetStreamGetLastMsgBySubject(t *testing.T) {
}
}
// https://github.com/nats-io/nats-server/issues/2329
func TestJetStreamGetLastMsgBySubjectAfterUpdate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
sc := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 2,
}
if _, err := js.AddStream(sc); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now Update and add in other subjects.
sc.Subjects = append(sc.Subjects, "bar", "baz")
if _, err := js.UpdateStream(sc); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
js.Publish("foo", []byte("OK1")) // 1
js.Publish("bar", []byte("OK1")) // 2
js.Publish("foo", []byte("OK2")) // 3
js.Publish("bar", []byte("OK2")) // 4
// Need to do stream GetMsg by hand for now until Go client support lands.
getLast := func(subject string) *StoredMsg {
t.Helper()
req := &JSApiMsgGetRequest{LastFor: subject}
b, _ := json.Marshal(req)
rmsg, err := nc.Request(fmt.Sprintf(JSApiMsgGetT, "TEST"), b, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var resp JSApiMsgGetResponse
err = json.Unmarshal(rmsg.Data, &resp)
if err != nil {
t.Fatalf("Could not parse stream message: %v", err)
}
if resp.Message == nil || resp.Error != nil {
t.Fatalf("Did not receive correct response: %+v", resp.Error)
}
return resp.Message
}
// Do basic checks.
basicCheck := func(subject string, expectedSeq uint64) {
sm := getLast(subject)
if sm == nil {
t.Fatalf("Expected a message but got none")
} else if sm.Sequence != expectedSeq {
t.Fatalf("Wrong message sequence, wanted %d but got %d", expectedSeq, sm.Sequence)
} else if !subjectIsSubsetMatch(sm.Subject, subject) {
t.Fatalf("Wrong subject, wanted %q but got %q", subject, sm.Subject)
}
}
basicCheck("foo", 3)
basicCheck("bar", 4)
}
func TestJetStreamLastSequenceBySubject(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {