Make ephemeral consumers R=1 and provide optimistic migration on peer removal or server shutdown.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-02-28 16:50:25 -08:00
parent 03954eedc6
commit df77724aa4
4 changed files with 119 additions and 1 deletions

View File

@@ -456,6 +456,63 @@ func (s *Server) JetStreamEnabled() bool {
return enabled
}
// Will migrate off ephemerals if possible.
// This means parent stream needs to be replicated.
func (s *Server) migrateEphemerals() {
js, cc := s.getJetStreamCluster()
// Make sure JetStream is enabled and we are clustered.
if js == nil || cc == nil {
return
}
var consumers []*consumerAssignment
js.mu.Lock()
ourID := cc.meta.ID()
for _, asa := range cc.streams {
for _, sa := range asa {
if rg := sa.Group; rg != nil && len(rg.Peers) > 1 && rg.isMember(ourID) && len(sa.consumers) > 0 {
for _, ca := range sa.consumers {
if ca.Group != nil && len(ca.Group.Peers) == 1 && ca.Group.isMember(ourID) {
// Need to select possible new peer from parent stream.
for _, p := range rg.Peers {
if p != ourID {
ca.Group.Peers = []string{p}
ca.Group.Preferred = p
consumers = append(consumers, ca)
break
}
}
}
}
}
}
}
js.mu.Unlock()
// Process the consumers.
for _, ca := range consumers {
// Locate the consumer itself.
if acc, err := s.LookupAccount(ca.Client.Account); err == nil && acc != nil {
if mset, err := acc.lookupStream(ca.Stream); err == nil && mset != nil {
if o := mset.lookupConsumer(ca.Name); o != nil {
state := o.readStoreState()
o.deleteWithoutAdvisory()
js.mu.Lock()
// Delete old one.
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
// Encode state and new name.
ca.State = state
ca.Name = createConsumerName()
addEntry := encodeAddConsumerAssignmentCompressed(ca)
cc.meta.ForwardProposal(addEntry)
js.mu.Unlock()
}
}
}
}
}
// Shutdown jetstream for this server.
func (s *Server) shutdownJetStream() {
s.mu.Lock()

View File

@@ -3630,6 +3630,10 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// We need to set the ephemeral here before replicating.
var oname string
if !isDurableConsumer(cfg) {
// We chose to have ephemerals be R=1.
rg.Peers = []string{rg.Preferred}
rg.Name = groupNameForConsumer(rg.Peers, rg.Storage)
// Make sure name is unique.
for {
oname = createConsumerName()
if sa.consumers != nil {

View File

@@ -2075,6 +2075,49 @@ func TestJetStreamClusterEphemeralConsumerCleanup(t *testing.T) {
})
}
func TestJetStreamClusterEphemeralConsumersNotReplicated(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 3})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := js.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ci, _ := sub.ConsumerInfo()
if ci == nil {
t.Fatalf("Unexpected error: no consumer info")
}
if _, err = js.Publish("foo", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
checkSubsPending(t, sub, 1)
if ci.Cluster == nil || len(ci.Cluster.Replicas) != 0 {
t.Fatalf("Expected ephemeral to be R=1, got %+v", ci.Cluster)
}
scl := c.serverByName(ci.Cluster.Leader)
if scl == nil {
t.Fatalf("Could not select server where ephemeral consumer is running")
}
// Test migrations.
scl.Shutdown()
if _, err = js.Publish("foo", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
checkSubsPending(t, sub, 2)
}
func TestJetStreamClusterUserSnapshotAndRestore(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
@@ -2972,7 +3015,8 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := js.SubscribeSync("NO-Q")
// Make durable to have R match Stream.
sub, err := js.SubscribeSync("NO-Q", nats.Durable("rr"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -4680,6 +4724,15 @@ func (c *cluster) waitOnAllCurrent() {
}
}
func (c *cluster) serverByName(sname string) *Server {
for _, s := range c.servers {
if s.Name() == sname {
return s
}
}
return nil
}
func (c *cluster) randomNonLeader() *Server {
// range should randomize.. but..
for _, s := range c.servers {

View File

@@ -1650,6 +1650,10 @@ func (s *Server) Shutdown() {
// Transfer off any raft nodes that we are a leader by shutting them all down.
s.shutdownRaftNodes()
// This is for clustered JetStream and ephemeral consumers.
// No-op if not clustered or not running JetStream.
s.migrateEphemerals()
// Shutdown the eventing system as needed.
// This is done first to send out any messages for
// account status. We will also clean up any