mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Updates based on PR feedback
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -337,14 +337,6 @@ func (a *Account) removeRemoteServer(sid string) {
|
||||
a.mu.Unlock()
|
||||
}
|
||||
|
||||
// Return our server.
|
||||
func (a *Account) server() *Server {
|
||||
a.mu.RLock()
|
||||
s := a.srv
|
||||
a.mu.RUnlock()
|
||||
return s
|
||||
}
|
||||
|
||||
// When querying for subject interest this is the number of
|
||||
// expected responses. We need to actually check that the entry
|
||||
// has active connections.
|
||||
|
||||
@@ -687,9 +687,11 @@ func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, erro
|
||||
return seq, ts, nil
|
||||
}
|
||||
|
||||
// skipMsg will update message block for a skipped message. If first
|
||||
// just meta data, but if interior an empty message record with erase bit.
|
||||
// fs lock will be held.
|
||||
// skipMsg will update this message block for a skipped message.
|
||||
// If we do not have any messages, just update the metadata, otherwise
|
||||
// we will place and empty record marking the sequence as used. The
|
||||
// sequence will be marked erased.
|
||||
// fs lock should be held.
|
||||
func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
|
||||
if mb == nil {
|
||||
return
|
||||
@@ -1111,7 +1113,8 @@ func (mb *msgBlock) expireCache() {
|
||||
mb.mu.Lock()
|
||||
defer mb.mu.Unlock()
|
||||
|
||||
if mb.cache == nil {
|
||||
if mb.cache == nil && mb.ctmr != nil {
|
||||
mb.ctmr.Stop()
|
||||
mb.ctmr = nil
|
||||
return
|
||||
}
|
||||
|
||||
@@ -376,7 +376,7 @@ func TestFileStoreWriteExpireWrite(t *testing.T) {
|
||||
}
|
||||
|
||||
// Wait for write cache portion to go to zero.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
checkFor(t, time.Second, 20*time.Millisecond, func() error {
|
||||
if csz := fs.cacheSize(); csz != 0 {
|
||||
return fmt.Errorf("cache size not 0, got %s", FriendlyBytes(int64(csz)))
|
||||
}
|
||||
|
||||
@@ -210,13 +210,12 @@ func (a *Account) AddStreamWithStore(config *StreamConfig, fsConfig *FileStoreCo
|
||||
func (mset *Stream) maxMsgSize() uint64 {
|
||||
maxMsgSize := mset.config.MaxMsgSize
|
||||
if maxMsgSize <= 0 {
|
||||
// Pull from server.
|
||||
// Pull from the account.
|
||||
if mset.jsa != nil {
|
||||
if acc := mset.jsa.acc(); acc != nil {
|
||||
if s := acc.server(); s != nil {
|
||||
opts := s.getOpts()
|
||||
maxMsgSize = opts.MaxPayload
|
||||
}
|
||||
acc.mu.RLock()
|
||||
maxMsgSize = acc.mpay
|
||||
acc.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
// If all else fails use default.
|
||||
|
||||
Reference in New Issue
Block a user