mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixes based on PR feedback, cleanup
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -540,7 +540,7 @@ func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, ca *consum
|
||||
// Since we are here this means we have a potentially new durable so we should update here.
|
||||
// Check that configs are the same.
|
||||
if !configsEqualSansDelivery(o.config, eo.config) {
|
||||
o.name = _EMPTY_ // Precent removal since same name.
|
||||
o.name = _EMPTY_ // Prevent removal since same name.
|
||||
o.deleteWithoutAdvisory()
|
||||
return nil, fmt.Errorf("consumer replacement durable config not the same")
|
||||
}
|
||||
@@ -895,14 +895,14 @@ func (o *Consumer) deleteNotActive() {
|
||||
if ca := js.consumerAssignment(acc, stream, name); ca != nil {
|
||||
// We copy and clear the reply since this removal is internal.
|
||||
jsa.mu.Lock()
|
||||
cca := *ca
|
||||
cca.Reply = _EMPTY_
|
||||
js := jsa.js
|
||||
jsa.mu.Unlock()
|
||||
|
||||
if js != nil {
|
||||
js.mu.RLock()
|
||||
if cc := js.cluster; cc != nil {
|
||||
cca := *ca
|
||||
cca.Reply = _EMPTY_
|
||||
meta, removeEntry := cc.meta, encodeDeleteConsumerAssignment(&cca)
|
||||
meta.ForwardProposal(removeEntry)
|
||||
|
||||
|
||||
@@ -1099,11 +1099,12 @@ func (n *raft) runAsLeader() {
|
||||
n.Lock()
|
||||
// For forwarded proposals.
|
||||
fsub, err := n.subscribe(n.psubj, n.handleForwardedProposal)
|
||||
if err != nil {
|
||||
n.warn("Error subscribing to forwarded proposals: %v", err)
|
||||
}
|
||||
n.Unlock()
|
||||
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Error subscribing to forwarded proposals: %v", err))
|
||||
}
|
||||
|
||||
// Cleanup our subscription when we leave.
|
||||
defer func() {
|
||||
n.RLock()
|
||||
|
||||
@@ -761,12 +761,8 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) {
|
||||
}
|
||||
|
||||
c.stopAll()
|
||||
// doLog, doDebug = true, true
|
||||
fmt.Printf("\n\n#########\n\n")
|
||||
c.restartAll()
|
||||
|
||||
//time.Sleep(time.Second)
|
||||
|
||||
s = c.randomServer()
|
||||
nc, js = jsClientConnect(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
Reference in New Issue
Block a user