Merge pull request #2131 from nats-io/updates

General Updates and Stability Improvements
This commit is contained in:
Derek Collison
2021-04-20 13:52:39 -07:00
committed by GitHub
7 changed files with 523 additions and 158 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.2-beta.5"
VERSION = "2.2.2-beta.8"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -2735,10 +2735,14 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error {
o.active = false
o.unsubscribe(o.ackSub)
o.unsubscribe(o.reqSub)
o.unsubscribe(o.infoSub)
o.unsubscribe(o.fcSub)
o.ackSub = nil
o.reqSub = nil
o.infoSub = nil
o.fcSub = nil
if o.infoSub != nil {
o.srv.sysUnsubscribe(o.infoSub)
o.infoSub = nil
}
c := o.client
o.client = nil
sysc := o.sysc

View File

@@ -221,13 +221,16 @@ func (s *Server) JetStreamSnapshotMeta() error {
return ErrJetStreamNotEnabled
}
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
if !cc.isLeader() {
isLeader := cc.isLeader()
meta := cc.meta
js.mu.RUnlock()
if !isLeader {
return errNotLeader
}
return cc.meta.InstallSnapshot(js.metaSnapshot())
return meta.InstallSnapshot(js.metaSnapshot())
}
func (s *Server) JetStreamStepdownStream(account, stream string) error {
@@ -1370,7 +1373,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
doSnapshot()
}
} else if err == errLastSeqMismatch {
if mset.isMirror() {
mset.mu.RLock()
isLeader := mset.isLeader()
mset.mu.RUnlock()
if mset.isMirror() && isLeader {
mset.retryMirrorConsumer()
} else {
s.Warnf("Got stream sequence mismatch for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
@@ -1553,6 +1560,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if mset == nil {
continue
}
s := js.srv
subject, reply, hdr, msg, lseq, ts, err := decodeStreamMsg(buf[1:])
if err != nil {
@@ -1562,8 +1570,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// We can skip if we know this is less than what we already have.
last := mset.lastSeq()
if lseq < last {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
continue
}
// Skip by hand here since first msg special case.
// Reason is sequence is unsigned and for lseq being 0
// the lseq under stream would have be -1.
@@ -1574,8 +1584,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Check for flowcontrol here.
mset.checkForFlowControl(lseq + 1)
s := js.srv
// Messages to be skipped have no subject or timestamp.
if subject == _EMPTY_ && ts == 0 {
// Skip and update our lseq.
@@ -4166,6 +4174,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
s, js, jsa, st, rf, outq := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq
maxMsgSize := int(mset.cfg.MaxMsgSize)
msetName := mset.cfg.Name
lseq := mset.lseq
mset.mu.RUnlock()
// Check here pre-emptively if we have exceeded this server limits.
@@ -4229,7 +4238,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// We only use mset.clseq for clustering and in case we run ahead of actual commits.
// Check if we need to set initial value here
mset.clMu.Lock()
if mset.clseq == 0 {
if mset.clseq == 0 || mset.clseq < lseq {
mset.mu.RLock()
mset.clseq = mset.lseq
mset.mu.RUnlock()
@@ -4241,25 +4250,22 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Do proposal.
err := mset.node.Propose(esm)
if err != nil {
mset.clseq--
}
mset.clMu.Unlock()
if err != nil {
seq = 0
mset.mu.Lock()
mset.clseq--
mset.mu.Unlock()
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.cfg.Name}}
resp.Error = &ApiError{Code: 503, Description: err.Error()}
response, _ = json.Marshal(resp)
// If we errored out respond here.
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
}
// If we errored out respond here.
if err != nil && canRespond {
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
if err != nil && isOutOfSpaceErr(err) {
s.handleOutOfSpace(msetName)
}

View File

@@ -358,7 +358,10 @@ func TestJetStreamClusterDelete(t *testing.T) {
}
// Now delete the stream.
resp, _ = nc.Request(fmt.Sprintf(JSApiStreamDeleteT, cfg.Name), nil, time.Second)
resp, err = nc.Request(fmt.Sprintf(JSApiStreamDeleteT, cfg.Name), nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var dResp JSApiStreamDeleteResponse
if err = json.Unmarshal(resp.Data, &dResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -1767,6 +1770,8 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
}
oldLeader = c.restartServer(oldLeader)
c.checkClusterFormed()
c.waitOnStreamLeader("$G", "TEST")
c.waitOnStreamCurrent(oldLeader, "$G", "TEST")
@@ -1943,12 +1948,12 @@ func TestJetStreamClusterInterestRetention(t *testing.T) {
}
m.Ack()
js, err = nc.JetStream(nats.MaxWait(50 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
waitForZero := func() {
js, err := nc.JetStream(nats.MaxWait(50 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("foo")
if err != nil {
@@ -4717,7 +4722,7 @@ func TestJetStreamFailMirrorsAndSources(t *testing.T) {
})
}
testPrefix("mirror-bad-delierprefix", `prefix "test" overlaps with stream subject "test.>"`, StreamConfig{
testPrefix("mirror-bad-deliverprefix", `prefix "test" overlaps with stream subject "test.>"`, StreamConfig{
Name: "MY_MIRROR_TEST",
Storage: FileStorage,
Mirror: &StreamSource{
@@ -4740,7 +4745,7 @@ func TestJetStreamFailMirrorsAndSources(t *testing.T) {
},
},
})
testPrefix("source-bad-delierprefix", `prefix "test" overlaps with stream subject "test.>"`, StreamConfig{
testPrefix("source-bad-deliverprefix", `prefix "test" overlaps with stream subject "test.>"`, StreamConfig{
Name: "MY_SOURCE_TEST",
Storage: FileStorage,
Sources: []*StreamSource{{
@@ -5666,89 +5671,6 @@ func TestJetStreamClusterStreamInfoDeletedDetails(t *testing.T) {
}
}
func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MMS", 9)
defer c.shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
sendBatch := func(n int) {
t.Helper()
// Send a batch to a given subject.
for i := 0; i < n; i++ {
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}
checkStream := func(stream string, num uint64) {
t.Helper()
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo(stream)
if err != nil {
return err
}
if si.State.Msgs != num {
return fmt.Errorf("Expected %d msgs, got %d", num, si.State.Msgs)
}
return nil
})
}
checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }
// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
MaxAge: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ts := c.streamLeader("$G", "TEST")
ml := c.leader()
// Create mirror now.
for ms := ts; ms == ts || ms == ml; {
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ms = c.streamLeader("$G", "M")
if ts == ms || ms == ml {
// Delete and retry.
js.DeleteStream("M")
}
}
sendBatch(10)
checkMirror(10)
// Now shutdown the server with the mirror.
ms := c.streamLeader("$G", "M")
ms.Shutdown()
// Send more messages but let them expire.
sendBatch(10)
checkTest(0)
c.restartServer(ms)
c.checkClusterFormed()
c.waitOnStreamLeader("$G", "M")
sendBatch(10)
checkMirror(20)
}
func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSE", 3)
defer c.shutdown()
@@ -5758,18 +5680,18 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
defer nc.Close()
// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
})
if err != nil {
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
bi := 1
sendBatch := func(n int) {
t.Helper()
// Send a batch to a given subject.
for i := 0; i < n; i++ {
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
msg := fmt.Sprintf("ID: %d", bi)
bi++
if _, err := js.PublishAsync("TEST", []byte(msg)); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
@@ -5793,34 +5715,131 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }
var err error
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST"},
Replicas: 2,
MaxAge: 100 * time.Millisecond,
MaxAge: 500 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{Name: "TEST"}},
Replicas: 2,
MaxAge: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
// We want this to not be same as TEST leader for this test.
sl := c.streamLeader("$G", "TEST")
for ss := sl; ss == sl; {
_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{Name: "TEST"}},
Replicas: 2,
MaxAge: 500 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ss = c.streamLeader("$G", "S"); ss == sl {
// Delete and retry.
js.DeleteStream("S")
}
}
sendBatch(20)
checkTest(20)
checkMirror(20)
checkSource(20)
sendBatch(100)
checkTest(100)
checkMirror(100)
checkSource(100)
// Make sure they expire.
checkMirror(0)
checkSource(0)
// Now stop the server housing the leader of the source stream.
sl.Shutdown()
c.restartServer(sl)
checkClusterFormed(t, c.servers...)
c.waitOnStreamLeader("$G", "S")
c.waitOnStreamLeader("$G", "M")
// Make sure can process correctly after we have expired all of the messages.
sendBatch(100)
// Need to check both in parallel.
scheck, mcheck := uint64(0), uint64(0)
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
if scheck != 100 {
if si, _ := js.StreamInfo("S"); si != nil {
scheck = si.State.Msgs
}
}
if mcheck != 100 {
if si, _ := js.StreamInfo("M"); si != nil {
mcheck = si.State.Msgs
}
}
if scheck == 100 && mcheck == 100 {
return nil
}
return fmt.Errorf("Both not at 100 yet, S=%d, M=%d", scheck, mcheck)
})
checkTest(200)
}
func TestJetStreamClusterMirrorAndSourceSubLeaks(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSL", 3)
defer c.shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
startSubs := c.stableTotalSubs()
var ss []*nats.StreamSource
// Create 10 origin streams
for i := 0; i < 10; i++ {
sn := fmt.Sprintf("ORDERS-%d", i+1)
ss = append(ss, &nats.StreamSource{Name: sn})
if _, err := js.AddStream(&nats.StreamConfig{Name: sn}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Create mux'd stream that sources all of the origin streams.
_, err := js.AddStream(&nats.StreamConfig{
Name: "MUX",
Replicas: 2,
Sources: ss,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now create a mirror of the mux stream.
_, err = js.AddStream(&nats.StreamConfig{
Name: "MIRROR",
Replicas: 2,
Mirror: &nats.StreamSource{Name: "MUX"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Get stable subs count.
afterSubs := c.stableTotalSubs()
js.DeleteStream("MIRROR")
js.DeleteStream("MUX")
for _, si := range ss {
js.DeleteStream(si.Name)
}
// Some subs take longer to settle out so we give ourselves a small buffer.
if deleteSubs := c.stableTotalSubs(); deleteSubs > startSubs+10 {
t.Fatalf("Expected subs to return to %d from a high of %d, but got %d", startSubs, afterSubs, deleteSubs)
}
}
// Support functions
@@ -5904,10 +5923,10 @@ func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) *
t.Fatalf("Number of clusters must be > 1")
}
const (
startClusterPort = 33222
startGWPort = 11222
)
startClusterPorts := []int{5_022, 7_022, 10_022, 12_022, 16_332, 18_332, 40_332}
startGatewayPorts := []int{6_022, 8_022, 11_022, 17_332, 21_332, 42_332}
startClusterPort := startClusterPorts[rand.Intn(len(startClusterPorts))]
startGWPort := startGatewayPorts[rand.Intn(len(startGatewayPorts))]
// Make the GWs form faster for the tests.
SetGatewaysSolicitDelay(10 * time.Millisecond)
@@ -6173,8 +6192,9 @@ func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers
func createJetStreamClusterWithTemplate(t *testing.T, tmpl string, clusterName string, numServers int) *cluster {
t.Helper()
const startClusterPort = 22332
return createJetStreamCluster(t, tmpl, clusterName, _EMPTY_, numServers, startClusterPort, true)
startPorts := []int{9_022, 11_022, 14_022, 16_022, 20_332, 22_332, 33_332, 44_332}
port := startPorts[rand.Intn(len(startPorts))]
return createJetStreamCluster(t, tmpl, clusterName, _EMPTY_, numServers, port, true)
}
func createJetStreamCluster(t *testing.T, tmpl string, clusterName string, snPre string, numServers int, portStart int, waitOnReady bool) *cluster {
@@ -6589,3 +6609,24 @@ func (c *cluster) restartAll() {
}
c.waitOnClusterReady()
}
func (c *cluster) totalSubs() (total int) {
c.t.Helper()
for _, s := range c.servers {
total += int(s.NumSubscriptions())
}
return total
}
func (c *cluster) stableTotalSubs() (total int) {
nsubs := -1
checkFor(c.t, 2*time.Second, 250*time.Millisecond, func() error {
subs := c.totalSubs()
if subs == nsubs {
return nil
}
nsubs = subs
return fmt.Errorf("Still stabilizing")
})
return nsubs
}

View File

@@ -1507,7 +1507,7 @@ func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) {
Placement: &nats.Placement{Cluster: "C3"},
})
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js2.StreamInfo("M2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -1755,3 +1755,248 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) {
})
}
func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MMS", 9)
defer c.shutdown()
// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
sendBatch := func(n int) {
t.Helper()
// Send a batch to a given subject.
for i := 0; i < n; i++ {
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}
checkStream := func(stream string, num uint64) {
t.Helper()
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo(stream)
if err != nil {
return err
}
if si.State.Msgs != num {
return fmt.Errorf("Expected %d msgs, got %d", num, si.State.Msgs)
}
return nil
})
}
checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }
// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
MaxAge: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ts := c.streamLeader("$G", "TEST")
ml := c.leader()
// Create mirror now.
for ms := ts; ms == ts || ms == ml; {
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ms = c.streamLeader("$G", "M")
if ts == ms || ms == ml {
// Delete and retry.
js.DeleteStream("M")
}
}
sendBatch(10)
checkMirror(10)
// Now shutdown the server with the mirror.
ms := c.streamLeader("$G", "M")
ms.Shutdown()
// Send more messages but let them expire.
sendBatch(10)
checkTest(0)
c.restartServer(ms)
c.checkClusterFormed()
c.waitOnStreamLeader("$G", "M")
sendBatch(10)
checkMirror(20)
}
func TestNoRaceJetStreamClusterSuperClusterRIPStress(t *testing.T) {
// Uncomment to run. Needs to be on a big machine.
skip(t)
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()
// Client based API
s := sc.clusterForName("C2").randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
fmt.Printf("CONNECT is %v\n", s.ClientURL())
scm := make(map[string][]string)
// Create 50 streams per cluster.
for _, cn := range []string{"C1", "C2", "C3"} {
var streams []string
for i := 0; i < 50; i++ {
sn := fmt.Sprintf("%s-S%d", cn, i+1)
streams = append(streams, sn)
_, err := js.AddStream(&nats.StreamConfig{
Name: sn,
Replicas: 3,
Placement: &nats.Placement{Cluster: cn},
MaxAge: 2 * time.Minute,
MaxMsgs: 50_000,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
scm[cn] = streams
}
sourceForCluster := func(cn string) []*nats.StreamSource {
var sns []string
switch cn {
case "C1":
sns = scm["C2"]
case "C2":
sns = scm["C3"]
case "C3":
sns = scm["C1"]
default:
t.Fatalf("Unknown cluster %q", cn)
}
var ss []*nats.StreamSource
for _, sn := range sns {
ss = append(ss, &nats.StreamSource{Name: sn})
}
return ss
}
// Mux all 50 streams from one cluster to a single stream across a GW connection to another cluster.
_, err := js.AddStream(&nats.StreamConfig{
Name: "C1-S-MUX",
Replicas: 2,
Placement: &nats.Placement{Cluster: "C1"},
Sources: sourceForCluster("C2"),
MaxAge: time.Minute,
MaxMsgs: 20_000,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: "C2-S-MUX",
Replicas: 2,
Placement: &nats.Placement{Cluster: "C2"},
Sources: sourceForCluster("C3"),
MaxAge: time.Minute,
MaxMsgs: 20_000,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: "C3-S-MUX",
Replicas: 2,
Placement: &nats.Placement{Cluster: "C3"},
Sources: sourceForCluster("C1"),
MaxAge: time.Minute,
MaxMsgs: 20_000,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now create mirrors for our mux'd streams.
_, err = js.AddStream(&nats.StreamConfig{
Name: "C1-MIRROR",
Replicas: 3,
Placement: &nats.Placement{Cluster: "C1"},
Mirror: &nats.StreamSource{Name: "C3-S-MUX"},
MaxAge: 5 * time.Minute,
MaxMsgs: 10_000,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: "C2-MIRROR",
Replicas: 3,
Placement: &nats.Placement{Cluster: "C2"},
Mirror: &nats.StreamSource{Name: "C2-S-MUX"},
MaxAge: 5 * time.Minute,
MaxMsgs: 10_000,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: "C3-MIRROR",
Replicas: 3,
Placement: &nats.Placement{Cluster: "C3"},
Mirror: &nats.StreamSource{Name: "C1-S-MUX"},
MaxAge: 5 * time.Minute,
MaxMsgs: 10_000,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var jsc []nats.JetStream
// Create 64 clients.
for i := 0; i < 64; i++ {
s := sc.randomCluster().randomServer()
nc, _ := jsClientConnect(t, s)
defer nc.Close()
js, err := nc.JetStream(nats.PublishAsyncMaxPending(8 * 1024))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
jsc = append(jsc, js)
}
msg := make([]byte, 1024)
rand.Read(msg)
// 10 minutes
expires := time.Now().Add(480 * time.Second)
for time.Now().Before(expires) {
for _, sns := range scm {
rand.Shuffle(len(sns), func(i, j int) { sns[i], sns[j] = sns[j], sns[i] })
for _, sn := range sns {
js := jsc[rand.Intn(len(jsc))]
if _, err = js.PublishAsync(sn, msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}
time.Sleep(10 * time.Millisecond)
}
}

View File

@@ -364,7 +364,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
propc: make(chan *Entry, 8192),
entryc: make(chan *appendEntry, 32768),
respc: make(chan *appendEntryResponse, 32768),
applyc: make(chan *CommittedEntry, 8192),
applyc: make(chan *CommittedEntry, 32768),
leadc: make(chan bool, 8),
stepdown: make(chan string, 8),
observer: cfg.Observer,
@@ -2565,7 +2565,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
select {
case n.applyc <- &CommittedEntry{n.commit, ae.entries[:1]}:
default:
n.debug("Failed to place snapshot entry onto our apply channel")
n.warn("Failed to place snapshot entry onto our apply channel")
n.commit--
}
n.Unlock()

View File

@@ -188,6 +188,7 @@ type sourceInfo struct {
lag uint64
err *ApiError
last time.Time
lreq time.Time
grr bool
}
@@ -1227,14 +1228,23 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
}
// Mirror info tracking.
olag, odseq, oclseq := mset.mirror.lag, mset.mirror.dseq, mset.mirror.clseq
if dseq == mset.mirror.dseq+1 {
mset.mirror.dseq++
mset.mirror.sseq = sseq
} else if dseq > mset.mirror.dseq {
if mset.mirror.cname == _EMPTY_ {
mset.mirror.cname = tokenAt(m.rply, 4)
mset.mirror.dseq, mset.mirror.sseq = dseq, sseq
olag, osseq, odseq, oclseq := mset.mirror.lag, mset.mirror.sseq, mset.mirror.dseq, mset.mirror.clseq
if sseq == mset.mirror.sseq+1 {
mset.mirror.dseq = dseq
mset.mirror.sseq++
} else if sseq <= mset.mirror.sseq {
// Ignore older messages.
mset.mu.Unlock()
return true
} else if mset.mirror.cname == _EMPTY_ {
mset.mirror.cname = tokenAt(m.rply, 4)
mset.mirror.dseq, mset.mirror.sseq = dseq, sseq
} else {
// If the deliver sequence matches then the upstream stream has expired or deleted messages.
if dseq == mset.mirror.dseq+1 {
mset.skipMsgs(mset.mirror.sseq+1, sseq-1)
mset.mirror.dseq++
mset.mirror.sseq = sseq
} else {
mset.mu.Unlock()
mset.retryMirrorConsumer()
@@ -1247,7 +1257,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
} else {
mset.mirror.lag = pending - 1
}
mset.mirror.dseq = dseq
mset.mirror.clseq = sseq - 1
js, stype := mset.js, mset.cfg.Storage
mset.mu.Unlock()
@@ -1270,6 +1280,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
if sseq <= mset.lastSeq() {
mset.mu.Lock()
mset.mirror.lag = olag
mset.mirror.sseq = osseq
mset.mirror.dseq = odseq
mset.mirror.clseq = oclseq
mset.mu.Unlock()
@@ -1277,6 +1288,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
} else {
mset.mu.Lock()
mset.mirror.dseq = odseq
mset.mirror.sseq = osseq
mset.mu.Unlock()
mset.retryMirrorConsumer()
}
@@ -1319,6 +1331,28 @@ func (mset *stream) retryMirrorConsumer() error {
return mset.setupMirrorConsumer()
}
// Lock should be held.
func (mset *stream) skipMsgs(start, end uint64) {
node, store := mset.node, mset.store
var entries []*Entry
for seq := start; seq <= end; seq++ {
if node != nil {
entries = append(entries, &Entry{EntryNormal, encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0)})
// So a single message does not get too big.
if len(entries) > 10_000 {
node.ProposeDirect(entries)
entries = entries[:0]
}
} else {
mset.lseq = store.SkipMsg()
}
}
// Send all at once.
if node != nil && len(entries) > 0 {
node.ProposeDirect(entries)
}
}
// Setup our mirror consumer.
// Lock should be held.
func (mset *stream) setupMirrorConsumer() error {
@@ -1364,6 +1398,12 @@ func (mset *stream) setupMirrorConsumer() error {
mset.srv.startGoRoutine(func() { mset.processMirrorMsgs() })
}
// We want to throttle here in terms of how fast we request new consumers.
if time.Since(mset.mirror.lreq) < 2*time.Second {
return nil
}
mset.mirror.lreq = time.Now()
// Now send off request to create/update our consumer. This will be all API based even in single server mode.
// We calculate durable names apriori so we do not need to save them off.
@@ -1444,14 +1484,9 @@ func (mset *stream) setupMirrorConsumer() error {
var state StreamState
mset.store.FastState(&state)
// Check if we need to skip messages.
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
for seq := state.LastSeq + 1; seq <= ccr.ConsumerInfo.Delivered.Stream; seq++ {
if mset.node != nil {
mset.node.Propose(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0))
} else {
mset.lseq = mset.store.SkipMsg()
}
}
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
}
// Capture consumer name.
@@ -1472,6 +1507,7 @@ func (mset *stream) setupMirrorConsumer() error {
mset.mirror.sub = sub
mset.mirror.last = time.Now()
mset.mirror.dseq = 0
mset.mirror.sseq = ccr.ConsumerInfo.Delivered.Stream
}
mset.mu.Unlock()
}
@@ -1513,10 +1549,11 @@ func (mset *stream) retrySourceConsumerAtSeq(sname string, seq uint64) {
return
}
s := mset.srv
s.Debugf("Retrying source consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name)
si := mset.sources[sname]
// No longer configured.
if si == nil {
if si := mset.sources[sname]; si == nil {
return
}
mset.setSourceConsumer(sname, seq)
@@ -1545,7 +1582,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
// Need to delete the old one.
mset.removeInternalConsumer(si)
si.sseq, si.dseq = 0, 0
si.sseq, si.dseq = seq, 0
si.last = time.Now()
ssi := mset.streamSource(sname)
@@ -1564,6 +1601,12 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
mset.srv.startGoRoutine(func() { mset.processSourceMsgs(si) })
}
// We want to throttle here in terms of how fast we request new consumers.
if time.Since(si.lreq) < 2*time.Second {
return
}
si.lreq = time.Now()
req := &CreateConsumerRequest{
Stream: sname,
Config: ConsumerConfig{
@@ -1628,6 +1671,10 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
// We will retry every 10 seconds or so
mset.cancelSourceConsumer(sname)
} else {
if si.sseq != ccr.ConsumerInfo.Delivered.Stream {
si.sseq = ccr.ConsumerInfo.Delivered.Stream + 1
}
// Capture consumer name.
si.cname = ccr.ConsumerInfo.Name
// Now create sub to receive messages.
@@ -1777,6 +1824,9 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
mset.mu.Unlock()
return false
}
} else {
mset.mu.Unlock()
return false
}
if pending == 0 {
@@ -1851,8 +1901,10 @@ func (mset *stream) setStartingSequenceForSource(sname string) {
var state StreamState
mset.store.FastState(&state)
// Do not reset sseq here so we can remember when purge/expiration happens.
if state.Msgs == 0 {
si.sseq, si.dseq = 0, 0
si.dseq = 0
return
}
@@ -1901,6 +1953,10 @@ func (mset *stream) startingSequenceForSources() {
// Stamp our si seq records on the way out.
defer func() {
for sname, seq := range seqs {
// Ignore if not set.
if seq == 0 {
continue
}
if si := mset.sources[sname]; si != nil {
si.sseq = seq
si.dseq = 0
@@ -2370,7 +2426,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
isMisMatch = false
}
}
// Really is a mismatch.
if isMisMatch {
outq := mset.outq
mset.mu.Unlock()
@@ -2455,8 +2511,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
mset.mu.Unlock()
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}
@@ -2800,6 +2856,19 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
for _, o := range mset.consumers {
obs = append(obs, o)
}
// Check if we are a mirror.
if mset.mirror != nil && mset.mirror.sub != nil {
mset.unsubscribe(mset.mirror.sub)
mset.mirror.sub = nil
mset.removeInternalConsumer(mset.mirror)
}
// Now check for sources.
if len(mset.sources) > 0 {
for _, si := range mset.sources {
mset.cancelSourceConsumer(si.name)
}
}
mset.mu.Unlock()
for _, o := range obs {