From 5148bbf89800e1023fe4b235cd35c6ee9fcaa235 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 25 Jan 2021 10:04:21 -0800 Subject: [PATCH] Fixes based on PR feedback, cleanup Signed-off-by: Derek Collison --- server/consumer.go | 6 +++--- server/raft.go | 7 ++++--- test/jetstream_cluster_test.go | 4 ---- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 5f313213..7533234e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -540,7 +540,7 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, ca *consum // Since we are here this means we have a potentially new durable so we should update here. // Check that configs are the same. if !configsEqualSansDelivery(o.config, eo.config) { - o.name = _EMPTY_ // Precent removal since same name. + o.name = _EMPTY_ // Prevent removal since same name. o.deleteWithoutAdvisory() return nil, fmt.Errorf("consumer replacement durable config not the same") } @@ -895,14 +895,14 @@ func (o *Consumer) deleteNotActive() { if ca := js.consumerAssignment(acc, stream, name); ca != nil { // We copy and clear the reply since this removal is internal. jsa.mu.Lock() - cca := *ca - cca.Reply = _EMPTY_ js := jsa.js jsa.mu.Unlock() if js != nil { js.mu.RLock() if cc := js.cluster; cc != nil { + cca := *ca + cca.Reply = _EMPTY_ meta, removeEntry := cc.meta, encodeDeleteConsumerAssignment(&cca) meta.ForwardProposal(removeEntry) diff --git a/server/raft.go b/server/raft.go index 2aa79a5c..f3bfaf13 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1099,11 +1099,12 @@ func (n *raft) runAsLeader() { n.Lock() // For forwarded proposals. fsub, err := n.subscribe(n.psubj, n.handleForwardedProposal) - if err != nil { - n.warn("Error subscribing to forwarded proposals: %v", err) - } n.Unlock() + if err != nil { + panic(fmt.Sprintf("Error subscribing to forwarded proposals: %v", err)) + } + // Cleanup our subscription when we leave. defer func() { n.RLock() diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index b8f20797..4ab8146a 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -761,12 +761,8 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) { } c.stopAll() - // doLog, doDebug = true, true - fmt.Printf("\n\n#########\n\n") c.restartAll() - //time.Sleep(time.Second) - s = c.randomServer() nc, js = jsClientConnect(t, s) defer nc.Close()