mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -3,8 +3,8 @@ dist: focal
|
||||
|
||||
language: go
|
||||
go:
|
||||
- 1.16.x
|
||||
- 1.15.x
|
||||
- 1.14.x
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
@@ -29,7 +29,7 @@ script:
|
||||
- set -e
|
||||
- go test -i ./...
|
||||
- go test -v -run=TestNoRace --failfast -p=1 ./...
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.14 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast ./...; fi
|
||||
- if [[ "$TRAVIS_GO_VERSION" =~ 1.15 ]]; then ./scripts/cov.sh TRAVIS; else go test -v -race -p=1 --failfast ./...; fi
|
||||
- set +e
|
||||
|
||||
deploy:
|
||||
@@ -38,4 +38,4 @@ deploy:
|
||||
script: curl -sL http://git.io/goreleaser | bash
|
||||
on:
|
||||
tags: true
|
||||
condition: $TRAVIS_GO_VERSION =~ 1.14
|
||||
condition: $TRAVIS_GO_VERSION =~ 1.16
|
||||
|
||||
@@ -2041,6 +2041,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
|
||||
}
|
||||
js.mu.RLock()
|
||||
s := js.srv
|
||||
hadLeader := sa.Group.node == nil || sa.Group.node.GroupLeader() != noLeader
|
||||
js.mu.RUnlock()
|
||||
|
||||
acc, err := s.LookupAccount(sa.Client.serviceAccount())
|
||||
@@ -2056,14 +2057,14 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
|
||||
if err != nil {
|
||||
resp.Error = jsNotFoundError(err)
|
||||
} else if mset != nil {
|
||||
err = mset.stop(true, wasLeader)
|
||||
err = mset.stop(true, false)
|
||||
}
|
||||
|
||||
if sa.Group.node != nil {
|
||||
sa.Group.node.Delete()
|
||||
}
|
||||
|
||||
if !isMember || !wasLeader && sa.Group.node != nil && sa.Group.node.GroupLeader() != noLeader {
|
||||
if !isMember || !wasLeader && hadLeader {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -821,6 +821,7 @@ func TestJetStreamClusterRestoreSingleConsumer(t *testing.T) {
|
||||
|
||||
c.stopAll()
|
||||
c.restartAll()
|
||||
c.waitOnLeader()
|
||||
|
||||
s = c.randomServer()
|
||||
nc, js = jsClientConnect(t, s)
|
||||
@@ -2048,6 +2049,7 @@ func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
|
||||
}
|
||||
}
|
||||
nc.Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Snapshot consumer info.
|
||||
ci, err := jsub.ConsumerInfo()
|
||||
@@ -2819,6 +2821,7 @@ func TestJetStreamClusterCreateResponseAdvisoriesHaveSubject(t *testing.T) {
|
||||
}
|
||||
|
||||
checkSubsPending(t, sub, 6)
|
||||
|
||||
for m, err := sub.NextMsg(0); err == nil; m, err = sub.NextMsg(0) {
|
||||
var audit JSAPIAudit
|
||||
if err := json.Unmarshal(m.Data, &audit); err != nil {
|
||||
@@ -4217,7 +4220,7 @@ func jsClientConnect(t *testing.T, s *Server) (*nats.Conn, nats.JetStreamContext
|
||||
|
||||
func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) {
|
||||
t.Helper()
|
||||
checkFor(t, 3*time.Second, 10*time.Millisecond, func() error {
|
||||
checkFor(t, 4*time.Second, 20*time.Millisecond, func() error {
|
||||
if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected {
|
||||
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
|
||||
}
|
||||
|
||||
@@ -1449,6 +1449,10 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _, reply st
|
||||
|
||||
func (n *raft) runAsLeader() {
|
||||
n.RLock()
|
||||
if n.state == Closed {
|
||||
n.RUnlock()
|
||||
return
|
||||
}
|
||||
psubj, rpsubj := n.psubj, n.rpsubj
|
||||
n.RUnlock()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user