Reduce sliding window for direct consumers and catchup stream windows.

Remove another possible wire blocking operation in raft.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-21 09:24:27 -07:00
parent a4119e13c1
commit 0f548edcc6
5 changed files with 30 additions and 7 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.1.RC5"
VERSION = "2.2.1.RC6"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -237,7 +237,7 @@ const (
JsDeleteWaitTimeDefault = 5 * time.Second
// JsFlowControlMaxPending specifies default pending bytes during flow control that can be
// outstanding.
JsFlowControlMaxPending = 4 * 1024 * 1024
JsFlowControlMaxPending = 1 * 1024 * 1024
)
func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {

View File

@@ -4416,7 +4416,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
s := mset.srv
defer s.grWG.Done()
const maxOutBytes = int64(4 * 1024 * 1024) // 4MB for now.
const maxOutBytes = int64(2 * 1024 * 1024) // 2MB for now.
const maxOutMsgs = int32(16384)
outb := int64(0)
outm := int32(0)

View File

@@ -5157,6 +5157,7 @@ func (sc *supercluster) leader() *Server {
}
func (sc *supercluster) waitOnLeader() {
sc.t.Helper()
expires := time.Now().Add(5 * time.Second)
for time.Now().Before(expires) {
for _, c := range sc.clusters {
@@ -5167,7 +5168,6 @@ func (sc *supercluster) waitOnLeader() {
}
time.Sleep(25 * time.Millisecond)
}
sc.t.Fatalf("Expected a cluster leader, got none")
}

View File

@@ -617,8 +617,14 @@ func (n *raft) ProposeRemovePeer(peer string) error {
n.RLock()
propc, subj := n.propc, n.rpsubj
isUs, isLeader := peer == n.id, n.state == Leader
werr := n.werr
n.RUnlock()
// Error if we had a previous write error.
if werr != nil {
return werr
}
if isLeader {
if isUs {
n.StepDown()
@@ -1540,9 +1546,14 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _
peer := string(append(msg[:0:0], msg...))
n.RLock()
propc := n.propc
propc, werr := n.propc, n.werr
n.RUnlock()
// Ignore if we have had a write error previous.
if werr != nil {
return
}
select {
case propc <- &Entry{EntryRemovePeer, []byte(peer)}:
default:
@@ -1558,8 +1569,20 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply st
}
// Need to copy since this is underlying client/route buffer.
msg = append(msg[:0:0], msg...)
if err := n.Propose(msg); err != nil {
n.warn("Got error processing forwarded proposal: %v", err)
n.RLock()
propc, werr := n.propc, n.werr
n.RUnlock()
// Ignore if we have had a write error previous.
if werr != nil {
return
}
select {
case propc <- &Entry{EntryNormal, msg}:
default:
n.warn("Failed to place forwarded proposal onto propose channel")
}
}