Merge pull request #1842 from nats-io/flappers

LDM Support, Raft updates, fixes
This commit is contained in:
Derek Collison
2021-01-25 19:24:46 -07:00
committed by GitHub
7 changed files with 129 additions and 24 deletions

View File

@@ -5,7 +5,7 @@ jobs:
test:
strategy:
matrix:
go: [1.14, 1.15]
go: [1.15]
env:
GOPATH: /home/runner/work/nats-server
@@ -26,9 +26,6 @@ jobs:
- name: Install deps
shell: bash --noprofile --norc -x -eo pipefail {0}
run: |
go get github.com/nats-io/nats.go/
go get github.com/nats-io/nkeys
go get github.com/nats-io/jwt
go get -u honnef.co/go/tools/cmd/staticcheck
go get -u github.com/client9/misspell/cmd/misspell

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.0-beta.50"
VERSION = "2.2.0-beta.52"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -468,13 +468,14 @@ type JSApiStreamTemplateNamesResponse struct {
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"
var (
jsNotEnabledErr = &ApiError{Code: 503, Description: "JetStream not enabled for account"}
jsBadRequestErr = &ApiError{Code: 400, Description: "bad request"}
jsNotEmptyRequestErr = &ApiError{Code: 400, Description: "expected an empty request payload"}
jsInvalidJSONErr = &ApiError{Code: 400, Description: "invalid JSON request"}
jsInsufficientErr = &ApiError{Code: 503, Description: "insufficient Resources"}
jsNoConsumerErr = &ApiError{Code: 404, Description: "consumer not found"}
jsStreamMismatchErr = &ApiError{Code: 400, Description: "stream name in subject does not match request"}
jsNotEnabledErr = &ApiError{Code: 503, Description: "JetStream not enabled for account"}
jsBadRequestErr = &ApiError{Code: 400, Description: "bad request"}
jsNotEmptyRequestErr = &ApiError{Code: 400, Description: "expected an empty request payload"}
jsInvalidJSONErr = &ApiError{Code: 400, Description: "invalid JSON request"}
jsInsufficientErr = &ApiError{Code: 503, Description: "insufficient Resources"}
jsNoConsumerErr = &ApiError{Code: 404, Description: "consumer not found"}
jsStreamMismatchErr = &ApiError{Code: 400, Description: "stream name in subject does not match request"}
jsNoClusterSupportErr = &ApiError{Code: 503, Description: "not currently supported in clustered mode"}
)
// For easier handling of exports and imports.
@@ -624,6 +625,14 @@ func (s *Server) jsTemplateCreateRequest(sub *subscription, c *client, subject,
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Not supported for now.
if s.JetStreamIsClustered() {
resp.Error = jsNoClusterSupportErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var cfg StreamTemplateConfig
if err := json.Unmarshal(msg, &cfg); err != nil {
resp.Error = jsInvalidJSONErr
@@ -671,6 +680,14 @@ func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, subject, r
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Not supported for now.
if s.JetStreamIsClustered() {
resp.Error = jsNoClusterSupportErr
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var offset int
if !isEmptyRequest(msg) {
var req JSApiStreamTemplatesRequest

View File

@@ -369,6 +369,31 @@ func (s *Server) lookupRaftNode(group string) RaftNode {
return n
}
func (s *Server) transferRaftLeaders() bool {
if s == nil {
return false
}
var nodes []RaftNode
s.rnMu.RLock()
if len(s.raftNodes) > 0 {
s.Noticef("Transferring any raft leaders")
}
for _, n := range s.raftNodes {
nodes = append(nodes, n)
}
s.rnMu.RUnlock()
var didTransfer bool
for _, node := range nodes {
if node.Leader() {
node.StepDown()
didTransfer = true
}
}
return didTransfer
}
func (s *Server) shutdownRaftNodes() {
if s == nil {
return
@@ -380,6 +405,7 @@ func (s *Server) shutdownRaftNodes() {
nodes = append(nodes, n)
}
s.rnMu.RUnlock()
for _, node := range nodes {
if node.Leader() {
node.StepDown()
@@ -692,8 +718,6 @@ func (n *raft) campaign() error {
if n.state == Leader {
return errAlreadyLeader
}
// Pre-place our vote for ourselves.
n.vote = n.id
n.resetElect(randCampaignTimeout())
return nil
}
@@ -1534,9 +1558,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if n.state == Candidate {
n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader)
n.term = ae.term
n.Unlock()
n.vote = noVote
n.writeTermVote()
n.stepdown <- ae.leader
return
}
// Catching up state.
@@ -1594,9 +1618,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.writeTermVote()
if n.state != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.state, ae.leader)
n.Unlock()
n.stepdown <- ae.leader
return
}
}

View File

@@ -3060,6 +3060,16 @@ func (s *Server) lameDuckMode() {
}
s.mu.Unlock()
// If we are running any raftNodes transfer leaders.
if hadTransfers := s.transferRaftLeaders(); hadTransfers {
// They will tranfer leadership quickly, but wait here for a second.
select {
case <-time.After(time.Second):
case <-s.quitCh:
return
}
}
// Wait for accept loops to be done to make sure that no new
// client can connect
for i := 0; i < expected; i++ {

View File

@@ -184,7 +184,7 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) {
}
// Now durable consumer.
c.waitOnNewConsumerLeader("$G", "TEST", "dlc")
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
if _, err = js.ConsumerInfo("TEST", "dlc"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -469,6 +469,9 @@ func TestJetStreamClusterConsumerState(t *testing.T) {
m.Ack()
}
// Let state propagate for exact comparison below.
time.Sleep(200 * time.Millisecond)
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error getting consumer info: %v", err)
@@ -1378,8 +1381,7 @@ func TestJetStreamClusterStreamSnapshotCatchupWithPurge(t *testing.T) {
if err := nsl.JetStreamSnapshotStream("$G", "TEST"); err != nil {
t.Fatalf("Error snapshotting stream: %v", err)
}
time.Sleep(200 * time.Millisecond)
time.Sleep(250 * time.Millisecond)
sl = c.restartServer(sl)
c.checkClusterFormed()
@@ -2151,6 +2153,63 @@ func TestJetStreamClusterStreamInterestOnlyPolicy(t *testing.T) {
})
}
// These are disabled for now.
func TestJetStreamClusterStreamTemplates(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, _ := jsClientConnect(t, s)
defer nc.Close()
// List API
var tListResp server.JSApiStreamTemplateNamesResponse
resp, err := nc.Request(server.JSApiTemplates, nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err := json.Unmarshal(resp.Data, &tListResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if tListResp.Error == nil {
t.Fatalf("Expected an unsupported error, got none")
}
if !strings.Contains(tListResp.Error.Description, "not currently supported in clustered mode") {
t.Fatalf("Did not get correct error response: %+v", tListResp.Error)
}
// Create
// Now do templates.
mcfg := &server.StreamConfig{
Subjects: []string{"kv.*"},
Storage: server.MemoryStorage,
}
template := &server.StreamTemplateConfig{
Name: "kv",
Config: mcfg,
MaxStreams: 4,
}
req, err := json.Marshal(template)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var stResp server.JSApiStreamTemplateCreateResponse
resp, err = nc.Request(fmt.Sprintf(server.JSApiTemplateCreateT, template.Name), req, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err = json.Unmarshal(resp.Data, &stResp); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if stResp.Error == nil {
t.Fatalf("Expected an unsupported error, got none")
}
if !strings.Contains(stResp.Error.Description, "not currently supported in clustered mode") {
t.Fatalf("Did not get correct error response: %+v", stResp.Error)
}
}
func TestJetStreamClusterStreamPerf(t *testing.T) {
// Comment out to run, holding place for now.
skip(t)
@@ -2203,7 +2262,7 @@ func TestJetStreamClusterStreamPerf(t *testing.T) {
}
// Wait for Go routines.
time.Sleep(200 * time.Millisecond)
time.Sleep(250 * time.Millisecond)
start := time.Now()
close(startCh)

View File

@@ -6244,7 +6244,7 @@ func TestJetStreamConsumerReplayRate(t *testing.T) {
gap := time.Since(start)
// 15ms is high but on macs time.Sleep(delay) does not sleep only delay.
// Also on travis if things get bogged down this could be delayed.
gl, gh := gaps[i]-5*time.Millisecond, gaps[i]+15*time.Millisecond
gl, gh := gaps[i]-10*time.Millisecond, gaps[i]+15*time.Millisecond
if gap < gl || gap > gh {
t.Fatalf("Gap is off for %d, expected %v got %v", i, gaps[i], gap)
}
@@ -6869,7 +6869,7 @@ func TestJetStreamSimpleFileRecovery(t *testing.T) {
}
mset, err := acc.AddStream(&msetConfig)
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
t.Fatalf("Unexpected error adding stream %q: %v", msetName, err)
}
defer mset.Delete()