diff --git a/server/consumer.go b/server/consumer.go index 4d81c974..8cef715f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -228,13 +228,13 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { func (mset *Stream) addConsumer(config *ConsumerConfig, oname string, node RaftNode) (*Consumer, error) { mset.mu.RLock() - jsa := mset.jsa + s, jsa := mset.srv, mset.jsa mset.mu.RUnlock() // If we do not have the consumer assigned to us in cluster mode we can not proceed. // Running in single server mode this always returns true. if oname != _EMPTY_ && !jsa.consumerAssigned(mset.Name(), oname) { - return nil, ErrJetStreamNotAssigned + s.Debugf("Consumer %q > %q does not seem to be assigned to this server", mset.Name(), oname) } if config == nil { diff --git a/server/jetstream.go b/server/jetstream.go index 4c5fb634..cf8464ef 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -176,11 +176,14 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { // If we are in clustered mode go ahead and start the meta controller. if !s.standAloneMode() { if err := s.enableJetStreamClustering(); err != nil { - s.Errorf("Could not create JetStream cluster: %v", err) return err } } + return s.enableJetStreamAccounts() +} + +func (s *Server) enableJetStreamAccounts() error { // If we have no configured accounts setup then setup imports on global account. if s.globalAccountOnly() { if err := s.GlobalAccount().EnableJetStream(nil); err != nil { @@ -189,7 +192,6 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { } else if err := s.configAllJetStreamAccounts(); err != nil { return fmt.Errorf("Error enabling jetstream on configured accounts: %v", err) } - return nil } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 914a50e1..5886ae0d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -293,7 +293,7 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { isCurrent := rg.node.Current() if isCurrent { - // Check if we are processing a snapshot catchup. + // Check if we are processing a snapshot and are catching up. acc, err := cc.s.LookupAccount(account) if err != nil { return false @@ -505,8 +505,9 @@ func (jsa *jsAccount) streamAssigned(stream string) bool { return false } js.mu.RLock() - defer js.mu.RUnlock() - return js.cluster.isStreamAssigned(acc, stream) + assigned := js.cluster.isStreamAssigned(acc, stream) + js.mu.RUnlock() + return assigned } // Read lock should be held. @@ -639,6 +640,12 @@ func (js *jetStream) monitorCluster() { case <-qch: return case ce := <-ach: + if ce == nil { + // Signals we have replayed all of our metadata. + // No-op for now. + s.Debugf("Recovered JetStream cluster metadata") + continue + } // FIXME(dlc) - Deal with errors. if hadSnapshot, err := js.applyMetaEntries(ce.Entries); err == nil { n.Applied(ce.Index) @@ -965,6 +972,10 @@ func (js *jetStream) monitorStreamRaftGroup(mset *Stream, sa *streamAssignment) case <-qch: return case ce := <-ach: + // No special processing needed for when we are caught up on restart. + if ce == nil { + continue + } // FIXME(dlc) - capture errors. if hadSnapshot, err := js.applyStreamEntries(mset, ce); err == nil { n.Applied(ce.Index) @@ -1001,10 +1012,10 @@ func (js *jetStream) applyStreamEntries(mset *Stream, ce *CommittedEntry) (bool, if err != nil { panic(err.Error()) } - if lseq == 0 && mset.lastSeq() != 0 { // Very first msg + // Skip by hand here since first msg special case. + if lseq == 0 && mset.lastSeq() != 0 { continue } - if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { js.srv.Debugf("Got error processing JetStream msg: %v", err) } @@ -1211,7 +1222,6 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) { // This is an error condition. if err != nil { js.srv.Debugf("Stream create failed for %q - %q: %v\n", sa.Client.Account, sa.Config.Name, err) - js.mu.Lock() sa.err = err sa.responded = true @@ -1557,6 +1567,10 @@ func (js *jetStream) monitorConsumerRaftGroup(o *Consumer, ca *consumerAssignmen case <-qch: return case ce := <-ach: + // No special processing needed for when we are caught up on restart. + if ce == nil { + continue + } if _, err := js.applyConsumerEntries(o, ce); err == nil { n.Applied(ce.Index) last = ce.Index @@ -1706,7 +1720,7 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client // Check if this failed. // TODO(dlc) - Could have mixed results, should track per peer. if result.Response.Error != nil { - // So while we are delting we will not respond to list/names requests. + // Set sa.err while we are deleting so we will not respond to list/names requests. sa.err = ErrJetStreamNotAssigned cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } diff --git a/server/raft.go b/server/raft.go index 21df98b3..9ca8ee9d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -22,6 +22,7 @@ import ( "os" "path" "sync" + "sync/atomic" "time" ) @@ -118,6 +119,7 @@ type raft struct { hash string s *Server c *client + dflag bool // Subjects for votes, updates, replays. vsubj string @@ -126,9 +128,7 @@ type raft struct { areply string // For when we need to catch up as a follower. - catchup *subscription - cterm uint64 - cindex uint64 + catchup *catchupState // For leader or server catching up a follower. progress map[string]chan uint64 @@ -150,6 +150,17 @@ type raft struct { stepdown chan string } +// cacthupState structure that holds our subscription, and catchup term and index +// as well as starting term and index and how many updates we have seen. +type catchupState struct { + sub *subscription + cterm uint64 + cindex uint64 + pterm uint64 + pindex uint64 + hbs int +} + // lps holds peer state of last time and last index replicated. type lps struct { ts int64 @@ -217,6 +228,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { return nil, ErrNoSysAccount } sendq := s.sys.sendq + sacc := s.sys.account hash := s.sys.shash s.mu.Unlock() @@ -250,6 +262,11 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { leadc: make(chan bool, 4), stepdown: make(chan string), } + n.c.registerWithAccount(sacc) + + if atomic.LoadInt32(&s.logging.debug) > 0 { + n.dflag = true + } if term, vote, err := n.readTermVote(); err != nil && term > 0 { n.term = term @@ -266,18 +283,24 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { } // Replay the log. // Since doing this in place we need to make sure we have enough room on the applyc. - if uint64(cap(n.applyc)) < state.Msgs { - n.applyc = make(chan *CommittedEntry, state.Msgs) + needed := state.Msgs + 1 // 1 is for nil to mark end of replay. + if uint64(cap(n.applyc)) < needed { + n.applyc = make(chan *CommittedEntry, needed) } + for index := state.FirstSeq; index <= state.LastSeq; index++ { ae, err := n.loadEntry(index) if err != nil { - panic("err loading index") + panic("err loading entry from WAL") } n.processAppendEntry(ae, nil) } } + // Send nil entry to signal the upper layers we are done doing replay/restore. + n.applyc <- nil + + // Setup our internal subscriptions. if err := n.createInternalSubs(); err != nil { n.shutdown(true) return nil, err @@ -535,6 +558,12 @@ func (n *raft) isCurrent() bool { if n.state == Leader { return true } + + // Check here on catchup status. + if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { + n.cancelCatchup() + } + // Check to see that we have heard from the current leader lately. if n.leader != noLeader && n.leader != n.id && n.catchup == nil { const okInterval = int64(hbInterval) * 2 @@ -721,13 +750,13 @@ func (n *raft) newInbox(cn string) string { b[i] = digits[l%base] l /= base } - return fmt.Sprintf(raftReplySubj, n.group, n.hash, b[:]) + return fmt.Sprintf(raftReplySubj, b[:]) } const ( - raftVoteSubj = "$SYS.NRG.%s.%s.V" - raftAppendSubj = "$SYS.NRG.%s.%s.A" - raftReplySubj = "$SYS.NRG.%s.%s.%s" + raftVoteSubj = "$NRG.V.%s.%s" + raftAppendSubj = "$NRG.E.%s.%s" + raftReplySubj = "$NRG.R.%s" ) func (n *raft) createInternalSubs() error { @@ -795,8 +824,10 @@ func (n *raft) run() { } func (n *raft) debug(format string, args ...interface{}) { - nf := fmt.Sprintf("RAFT [%s - %s] %s", n.id, n.group, format) - n.s.Debugf(nf, args...) + if n.dflag { + nf := fmt.Sprintf("RAFT [%s - %s] %s", n.id, n.group, format) + n.s.Debugf(nf, args...) + } } func (n *raft) error(format string, args ...interface{}) { @@ -1211,8 +1242,6 @@ func (n *raft) applyCommit(index uint64) { } // Pass to the upper layers if we have normal entries. if len(committed) > 0 { - // We will block here placing the commit entry on purpose. - // FIXME(dlc) - We should not block here. select { case n.applyc <- &CommittedEntry{index, committed}: default: @@ -1359,6 +1388,49 @@ func (n *raft) handleAppendEntry(sub *subscription, c *client, subject, reply st n.processAppendEntry(ae, sub) } +// Lock should be held. +func (n *raft) cancelCatchup() { + n.debug("Canceling catchup subscription since we are now up to date") + n.s.sysUnsubscribe(n.catchup.sub) + n.catchup = nil +} + +// catchupStalled will try to determine if we are stalled. This is called +// on a new entry from our leader. +// Lock should be held. +func (n *raft) catchupStalled() bool { + if n.catchup == nil { + return false + } + const maxHBs = 3 + if n.catchup.pindex == n.pindex { + n.catchup.hbs++ + } else { + n.catchup.pindex = n.pindex + n.catchup.hbs = 0 + } + return n.catchup.hbs >= maxHBs +} + +// Lock should be held. +func (n *raft) createCatchup(ae *appendEntry) string { + // Cleanup any old ones. + if n.catchup != nil { + n.s.sysUnsubscribe(n.catchup.sub) + } + // Snapshot term and index. + n.catchup = &catchupState{ + cterm: ae.pterm, + cindex: ae.pindex, + pterm: n.pterm, + pindex: n.pindex, + } + inbox := n.newInbox(n.s.ClusterName()) + sub, _ := n.s.sysSubscribe(inbox, n.handleAppendEntry) + n.catchup.sub = sub + return inbox +} + // processAppendEntry will process an appendEntry. func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Lock() @@ -1368,10 +1440,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { return } - // Is this a new entry or a replay on startup? - isNew := sub != nil && sub != n.catchup // Catching up state. catchingUp := n.catchup != nil + // Is this a new entry or a replay on startup? + isNew := sub != nil && (!catchingUp || sub != n.catchup.sub) if isNew { n.resetElectionTimeout() @@ -1394,18 +1466,24 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Check state if we are catching up. if catchingUp && isNew { - if n.pterm >= n.cterm && n.pindex >= n.cindex { - // If we are here we are good, so if we have a catchup we can shut that down. - n.debug("Canceling catchup subscription since we are now up to date") - n.s.sysUnsubscribe(n.catchup) - n.catchup = nil + if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { + // If we are here we are good, so if we have a catchup pending we can cancel. + n.cancelCatchup() catchingUp = false - n.cterm, n.cindex = 0, 0 } else { + var ar *appendEntryResponse + var inbox string + // Check to see if we are stalled. If so recreate our catchup state and resend response. + if n.catchupStalled() { + n.debug("Catchup may be stalled, will request again") + inbox = n.createCatchup(ae) + ar = &appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_} + } // Ignore new while catching up or replaying. - // This is ok since builtin catchup should be small. - // For larger items will do this outside. n.Unlock() + if ar != nil { + n.sendRPC(ae.reply, inbox, ar.encode()) + } return } } @@ -1443,23 +1521,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) // Reset our term. n.term = n.pterm - // Snapshot term and index - n.cterm, n.cindex = ae.pterm, ae.pindex - // Setup our subscription for catching up. - // Cleanup any old ones. - if n.catchup != nil { - n.s.sysUnsubscribe(n.catchup) - } - inbox := n.newInbox(n.s.ClusterName()) - var err error - if n.catchup, err = n.s.sysSubscribe(inbox, n.handleAppendEntry); err != nil { - n.Unlock() - n.debug("Error subscribing to our inbox for catchup: %v", err) - return - } + // Setup our state for catching up. + inbox := n.createCatchup(ae) ar := appendEntryResponse{n.pterm, n.pindex, n.id, false, _EMPTY_} n.Unlock() - n.sendRPC(ae.reply, inbox, ar.encode()) return } diff --git a/server/stream.go b/server/stream.go index e9de141d..2eded332 100644 --- a/server/stream.go +++ b/server/stream.go @@ -153,7 +153,7 @@ func (a *Account) addStream(config *StreamConfig, fsConfig *FileStoreConfig, sa // If we do not have the stream assigned to us in cluster mode we can not proceed. // Running in single server mode this always returns true. if !jsa.streamAssigned(config.Name) { - return nil, ErrJetStreamNotAssigned + s.Debugf("Stream %q does not seem to be assigned to this server", config.Name) } // Sensible defaults. diff --git a/test/jetstream_cluster_test.go b/test/jetstream_cluster_test.go index 5473751a..52a0e876 100644 --- a/test/jetstream_cluster_test.go +++ b/test/jetstream_cluster_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) @@ -116,17 +115,8 @@ func TestJetStreamClusterSingleReplicaStreams(t *testing.T) { c := createJetStreamClusterExplicit(t, "R1S", 3) defer c.shutdown() - sc := &server.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo", "bar"}, - } - // Make sure non-leaders error if directly called. - s := c.randomNonLeader() - if _, err := s.GlobalAccount().AddStream(sc); err == nil { - t.Fatalf("Expected an error from a non-leader") - } - // Client based API + s := c.randomNonLeader() nc, js := jsClientConnect(t, s) defer nc.Close() @@ -298,7 +288,10 @@ func TestJetStreamClusterDelete(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - resp, _ := nc.Request(fmt.Sprintf(server.JSApiStreamCreateT, cfg.Name), req, time.Second) + resp, err := nc.Request(fmt.Sprintf(server.JSApiStreamCreateT, cfg.Name), req, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } var scResp server.JSApiStreamCreateResponse if err := json.Unmarshal(resp.Data, &scResp); err != nil { t.Fatalf("Unexpected error: %v", err) @@ -619,6 +612,7 @@ func TestJetStreamClusterMetaSnapshotsMultiChange(t *testing.T) { // Shut it down. rs.Shutdown() + time.Sleep(250 * time.Millisecond) // We want to make changes here that test each delta scenario for the meta snapshots. // Add new stream and consumer. @@ -1065,10 +1059,6 @@ func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers sn := fmt.Sprintf("S-%d", cp-startClusterPort+1) conf := fmt.Sprintf(jsClusterTempl, sn, storeDir, clusterName, cp, routeConfig) s, o := RunServerWithConfig(createConfFile(t, []byte(conf))) - if doLog { - pre := fmt.Sprintf("[S-%d] - ", cp-startClusterPort+1) - s.SetLogger(logger.NewTestLogger(pre, true), true, true) - } c.servers = append(c.servers, s) c.opts = append(c.opts, o) } @@ -1088,10 +1078,6 @@ func (c *cluster) addInNewServer() *server.Server { seedRoute := fmt.Sprintf("nats-route://127.0.0.1:%d", c.opts[0].Cluster.Port) conf := fmt.Sprintf(jsClusterTempl, sn, storeDir, c.name, -1, seedRoute) s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf))) - if doLog { - pre := fmt.Sprintf("[%s] - ", sn) - s.SetLogger(logger.NewTestLogger(pre, true), true, true) - } c.servers = append(c.servers, s) c.opts = append(c.opts, o) c.checkClusterFormed() @@ -1141,10 +1127,6 @@ func (c *cluster) restartServer(rs *server.Server) *server.Server { } opts = c.opts[index] s, o := RunServerWithConfig(opts.ConfigFile) - if doLog { - pre := fmt.Sprintf("[%s] - ", s.Name()) - s.SetLogger(logger.NewTestLogger(pre, true), true, true) - } c.servers[index] = s c.opts[index] = o return s