Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-04-03 15:33:12 -07:00
7 changed files with 38 additions and 43 deletions

View File

@@ -1,6 +1,11 @@
name: "NATS Server Nightly: DEV"
on:
workflow_dispatch: {}
workflow_dispatch:
inputs:
target:
description: "Override image branch (optional)"
type: string
required: false
schedule:
- cron: "40 4 * * *"
@@ -13,7 +18,7 @@ jobs:
uses: actions/checkout@v3
with:
path: src/github.com/nats-io/nats-server
ref: dev
ref: ${{ inputs.target || 'dev' }}
- uses: ./src/github.com/nats-io/nats-server/.github/actions/nightly-release
with:

View File

@@ -1,11 +1,15 @@
name: "NATS Server Nightly: MAIN"
on:
workflow_dispatch: {}
workflow_dispatch:
inputs:
target:
description: "Override image branch (optional)"
type: string
required: false
schedule:
- cron: "40 4 * * *"
jobs:
nightly_main_release:
runs-on: ubuntu-latest
@@ -14,7 +18,7 @@ jobs:
uses: actions/checkout@v3
with:
path: src/github.com/nats-io/nats-server
ref: main
ref: ${{ inputs.target || 'main' }}
- uses: ./src/github.com/nats-io/nats-server/.github/actions/nightly-release
with:
@@ -22,4 +26,4 @@ jobs:
workdir: src/github.com/nats-io/nats-server
label: nightly-main
hub_username: "${{ secrets.DOCKER_USERNAME }}"
hub_password: "${{ secrets.DOCKER_PASSWORD }}"
hub_password: "${{ secrets.DOCKER_PASSWORD }}"

View File

@@ -436,11 +436,9 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
// Read lock should be held.
func (js *jetStream) isStreamHealthy(account, stream string) bool {
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
if cc == nil {
// Non-clustered mode
return true
@@ -480,11 +478,9 @@ func (js *jetStream) isStreamHealthy(account, stream string) bool {
// isConsumerCurrent will determine if the consumer is up to date.
// For R1 it will make sure the consunmer is present on this server.
// Read lock should be held.
func (js *jetStream) isConsumerCurrent(account, stream, consumer string) bool {
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
if cc == nil {
// Non-clustered mode
return true
@@ -1943,9 +1939,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
return
}
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
defer n.Stop()
// Make sure only one is running.
if mset != nil {
if mset.checkInMonitor() {
@@ -1954,6 +1947,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
defer mset.clearMonitorRunning()
}
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
// This should be below the checkInMonitor call though to avoid stopping it out
// from underneath the one that is running since it will be the same raft node.
defer n.Stop()
qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID()
s.Debugf("Starting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
@@ -4187,15 +4185,17 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
return
}
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
defer n.Stop()
// Make sure only one is running.
if o.checkInMonitor() {
return
}
defer o.clearMonitorRunning()
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
// This should be below the checkInMonitor call though to avoid stopping it out
// from underneath the one that is running since it will be the same raft node.
defer n.Stop()
qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID()
s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())

View File

@@ -2242,14 +2242,15 @@ func TestJetStreamClusterMemLeaderRestart(t *testing.T) {
// Make sure that we have a META leader (there can always be a re-election)
c.waitOnLeader()
c.waitOnStreamLeader(globalAccountName, "foo")
// Should still have quorum and a new leader
checkFor(t, time.Second, 200*time.Millisecond, func() error {
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
osi, err = jsc.StreamInfo("foo")
if err != nil {
return fmt.Errorf("expected healthy stream asset, got %s", err.Error())
}
if osi.Cluster.Leader == "" {
if osi.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("expected healthy stream asset with new leader")
}
if osi.State.Msgs != uint64(toSend) {

View File

@@ -3133,6 +3133,11 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
// Range across all accounts, the streams assigned to them, and the consumers.
// If they are assigned to this server check their status.
ourID := meta.ID()
// TODO(dlc) - Might be better here to not hold the lock the whole time.
js.mu.RLock()
defer js.mu.RUnlock()
for acc, asa := range cc.streams {
for stream, sa := range asa {
if sa.Group.isMember(ourID) {

View File

@@ -2511,13 +2511,6 @@ func (n *raft) applyCommit(index uint64) error {
// We pass these up as well.
committed = append(committed, e)
case EntryLeaderTransfer:
if n.state == Leader {
n.debug("Stepping down")
n.stepdown.push(noLeader)
}
// No-op
}
}
// Pass to the upper layers if we have normal entries.
@@ -3056,6 +3049,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
for _, e := range ae.entries {
switch e.Type {
case EntryLeaderTransfer:
// Only process these if they are new, so no replays or catchups.
if isNew {
maybeLeader := string(e.Data)
if maybeLeader == n.id && !n.observer && !n.paused {
@@ -3156,7 +3150,7 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry {
// Determine if we should store an entry.
func (ae *appendEntry) shouldStore() bool {
return ae != nil && len(ae.entries) > 0 && ae.entries[0].Type != EntryLeaderTransfer
return ae != nil && len(ae.entries) > 0
}
// Store our append entry to our WAL.

View File

@@ -3654,21 +3654,7 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(rmsg)
// If we are not receiving directly from a client we should move this to another Go routine.
// Make sure to grab no stream or js locks.
if c.kind != CLIENT {
mset.queueInboundMsg(subject, reply, hdr, msg)
return
}
// This is directly from a client so process inline.
// If we are clustered we need to propose this message to the underlying raft group.
if mset.IsClustered() {
mset.processClusteredInboundMsg(subject, reply, hdr, msg)
} else {
mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0)
}
mset.queueInboundMsg(subject, reply, hdr, msg)
}
var (