Fixes for data races and some locking.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-05 17:19:51 -08:00
parent ff98984358
commit 0b3c686430
6 changed files with 75 additions and 53 deletions

View File

@@ -2022,6 +2022,10 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
o.dseq++
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq, nil}
if o.maxpb > 0 {
o.pbytes += pmsg.size()
}
mset := o.mset
ap := o.cfg.AckPolicy
@@ -2041,11 +2045,8 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
}
// Flow control.
if o.maxpb > 0 {
o.pbytes += pmsg.size()
if o.needFlowControl() {
o.sendFlowControl()
}
if o.maxpb > 0 && o.needFlowControl() {
o.sendFlowControl()
}
// FIXME(dlc) - Capture errors?

View File

@@ -186,13 +186,13 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
s.Warnf("| || | _| | | \\__ \\ | | | / _| / _ \\| |\\/| |")
s.Warnf(" \\__/|___| |_| |___/ |_| |_|_\\___/_/ \\_\\_| |_|")
s.Warnf("")
s.Warnf(" https://github.com/nats-io/jetstream")
s.Warnf(" https://github.com/nats-io/jetstream")
s.Noticef("")
s.Noticef("----------- JETSTREAM -----------")
s.Noticef("---------------- JETSTREAM ----------------")
s.Noticef(" Max Memory: %s", friendlyBytes(cfg.MaxMemory))
s.Noticef(" Max Storage: %s", friendlyBytes(cfg.MaxStore))
s.Noticef(" Store Directory: %q", cfg.StoreDir)
s.Noticef("---------------------------------")
s.Noticef("-------------------------------------------")
// Setup our internal subscriptions.
if err := s.setJetStreamExportSubs(); err != nil {

View File

@@ -1254,8 +1254,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
const (
compactInterval = 2 * time.Minute
compactSizeMin = 32 * 1024 * 1024
compactNumMin = 4
compactSizeMin = 64 * 1024 * 1024
compactNumMin = 8192
)
t := time.NewTicker(compactInterval)
@@ -1315,8 +1315,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
n.Applied(ce.Index)
ne := ce.Index - lastApplied
// If over our compact min and we have at least min entries to compact, go ahead and snapshot/compact.
if _, b := n.Size(); b > compactSizeMin && ne >= compactNumMin {
// If we have at least min entries to compact, go ahead and snapshot/compact.
if ne >= compactNumMin {
doSnapshot()
}
} else {

View File

@@ -7084,7 +7084,9 @@ func TestJetStreamPushConsumerFlowControl(t *testing.T) {
} else if obs := mset.lookupConsumer("dlc"); obs == nil {
t.Fatalf("Error looking up stream: %v", err)
} else {
obs.mu.Lock()
obs.setMaxPendingBytes(16 * 1024)
obs.mu.Unlock()
}
msgSize := 1024

View File

@@ -341,9 +341,9 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
reqs: make(chan *voteRequest, 8),
votes: make(chan *voteResponse, 32),
propc: make(chan *Entry, 8192),
entryc: make(chan *appendEntry, 8192),
entryc: make(chan *appendEntry, 32768),
respc: make(chan *appendEntryResponse, 32768),
applyc: make(chan *CommittedEntry, 8192),
applyc: make(chan *CommittedEntry, 32768),
leadc: make(chan bool, 8),
stepdown: make(chan string, 8),
}
@@ -1975,6 +1975,7 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
sendHB = len(n.propc) == 0
}
}
n.Unlock()
if sendHB {

View File

@@ -987,12 +987,18 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo {
// processClusteredMirrorMsg will propose the inbound mirrored message to the underlying raft group.
func (mset *stream) processClusteredMirrorMsg(subject string, hdr, msg []byte, seq uint64, ts int64) error {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.mu.RLock()
node := mset.node
mset.mu.RUnlock()
// Do proposal.
return mset.node.Propose(encodeStreamMsg(subject, _EMPTY_, hdr, msg, seq, ts))
if node == nil {
return nil
}
return node.Propose(encodeStreamMsg(subject, _EMPTY_, hdr, msg, seq, ts))
}
const sourceHealthCheckInterval = 10 * time.Second
// Will run as a Go routine to process messages.
func (mset *stream) processMirrorMsgs() {
s := mset.srv
@@ -1005,6 +1011,9 @@ func (mset *stream) processMirrorMsgs() {
qch := mset.qch
mset.mu.RUnlock()
t := time.NewTicker(sourceHealthCheckInterval)
defer t.Stop()
for {
select {
case <-s.quitCh:
@@ -1012,12 +1021,15 @@ func (mset *stream) processMirrorMsgs() {
case <-qch:
return
case <-mch:
for im := mset.pending(msgs); im != nil; {
for im := mset.pending(msgs); im != nil; im = im.next {
mset.processInboundMirrorMsg(im)
// Do this here to nil out below vs up in for loop.
next := im.next
im.next, im.hdr, im.msg = nil, nil, nil
im = next
}
case <-t.C:
mset.mu.RLock()
stalled := mset.mirror != nil && time.Since(mset.mirror.last) > 3*sourceHealthCheckInterval
mset.mu.RUnlock()
if stalled {
mset.resetMirrorConsumer()
}
}
}
@@ -1025,8 +1037,6 @@ func (mset *stream) processMirrorMsgs() {
// processInboundMirrorMsg handles processing messages bound for a stream.
func (mset *stream) processInboundMirrorMsg(m *inMsg) {
sseq, _, _, ts, pending := replyInfo(m.rply)
mset.mu.Lock()
if mset.mirror == nil {
mset.mu.Unlock()
@@ -1044,6 +1054,8 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) {
return
}
sseq, _, _, ts, pending := replyInfo(m.rply)
// Mirror info tracking.
olag := mset.mirror.lag
mset.mirror.lag = pending
@@ -1119,8 +1131,10 @@ func (mset *stream) setupMirrorConsumer() error {
return errors.New("outq required")
}
isReset := mset.mirror != nil
// Reset
if mset.mirror != nil {
if isReset {
if mset.mirror.sub != nil {
mset.unsubscribe(mset.mirror.sub)
mset.mirror.sub = nil
@@ -1139,7 +1153,9 @@ func (mset *stream) setupMirrorConsumer() error {
deliverSubject = syncSubject("$JS.M")
}
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: &inbound{mch: make(chan struct{}, 1)}}
if !isReset {
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name, msgs: &inbound{mch: make(chan struct{}, 1)}}
}
// Process inbound mirror messages from the wire.
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
@@ -1159,7 +1175,10 @@ func (mset *stream) setupMirrorConsumer() error {
}
mset.mirror.sub = sub
mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() })
if !isReset {
mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() })
}
// Now send off request to create/update our consumer. This will be all API based even in single server mode.
// We calculate durable names apriori so we do not need to save them off.
@@ -1179,6 +1198,7 @@ func (mset *stream) setupMirrorConsumer() error {
FlowControl: true,
},
}
// Only use start optionals on first time.
if state.Msgs == 0 {
if mset.cfg.Mirror.OptStartSeq > 0 {
@@ -1229,7 +1249,9 @@ func (mset *stream) setupMirrorConsumer() error {
} else {
// Capture consumer name.
mset.mu.Lock()
mset.mirror.cname = ccr.ConsumerInfo.Name
if mset.mirror != nil {
mset.mirror.cname = ccr.ConsumerInfo.Name
}
mset.mu.Unlock()
}
mset.setMirrorErr(ccr.Error)
@@ -1244,10 +1266,6 @@ func (mset *stream) setupMirrorConsumer() error {
return nil
}
func (mset *stream) sourceDurable(streamName string) string {
return string(getHash(fmt.Sprintf("SOURCE:%s:%s", streamName, mset.cfg.Name)))
}
func (mset *stream) streamSource(sname string) *StreamSource {
for _, ssi := range mset.cfg.Sources {
if ssi.Name == sname {
@@ -1426,6 +1444,9 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
qch := mset.qch
mset.mu.RUnlock()
t := time.NewTicker(sourceHealthCheckInterval)
defer t.Stop()
for {
select {
case <-s.quitCh:
@@ -1433,12 +1454,16 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
case <-qch:
return
case <-mch:
for im := mset.pending(msgs); im != nil; {
for im := mset.pending(msgs); im != nil; im = im.next {
mset.processInboundSourceMsg(si, im)
// Do this here to nil out below vs up in for loop.
next := im.next
im.next, im.hdr, im.msg = nil, nil, nil
im = next
}
case <-t.C:
mset.mu.RLock()
stalled := time.Since(si.last) > 3*sourceHealthCheckInterval
sname := si.name
mset.mu.RUnlock()
if stalled {
mset.retrySourceConsumer(sname)
}
}
}
@@ -1446,11 +1471,6 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
// processInboundSourceMsg handles processing other stream messages bound for this stream.
func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
sseq, dseq, dc, _, pending := replyInfo(m.rply)
if dc > 1 {
return
}
mset.mu.Lock()
@@ -1465,6 +1485,13 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
return
}
sseq, dseq, dc, _, pending := replyInfo(m.rply)
if dc > 1 {
mset.mu.Unlock()
return
}
// Tracking is done here.
if dseq == si.dseq+1 {
si.dseq++
@@ -2386,7 +2413,7 @@ func (mset *stream) internalLoop() {
for {
select {
case <-outq.mch:
for pm := outq.pending(); pm != nil; {
for pm := outq.pending(); pm != nil; pm = pm.next {
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg) + len(pm.hdr)
@@ -2413,25 +2440,16 @@ func (mset *stream) internalLoop() {
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
}
// Do this here to nil out below vs up in for loop.
next := pm.next
pm.next, pm.hdr, pm.msg = nil, nil, nil
pm = next
}
c.flushClients(10 * time.Millisecond)
case <-mch:
for im := mset.pending(mset.msgs); im != nil; {
for im := mset.pending(mset.msgs); im != nil; im = im.next {
// If we are clustered we need to propose this message to the underlying raft group.
if isClustered {
mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg)
} else {
mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0)
}
// Do this here to nil out below vs up in for loop.
next := im.next
im.next, im.hdr, im.msg = nil, nil, nil
im = next
}
case seq := <-rmch:
mset.store.RemoveMsg(seq)