Merge branch 'main' into force-consumer-replicas

This commit is contained in:
Derek Collison
2023-01-26 08:35:49 -08:00
3 changed files with 79 additions and 13 deletions

View File

@@ -4357,7 +4357,9 @@ func (o *consumer) signalSub() *subscription {
if subject == _EMPTY_ {
subject = fwcs
}
return &subscription{subject: []byte(subject), icb: o.processStreamSignal}
sub := &subscription{subject: []byte(subject), icb: o.processStreamSignal}
o.sigSub = sub
return sub
}
// This is what will be called when our parent stream wants to kick us regarding a new message.

View File

@@ -1264,6 +1264,16 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error {
js.processConsumerAssignment(ca)
}
}
// Perform updates on those in saChk. These were existing so make
// sure to process any changes.
for _, sa := range saChk {
if isRecovering {
js.setStreamAssignmentRecovering(sa)
}
js.processUpdateStreamAssignment(sa)
}
// Now do the deltas for existing stream's consumers.
for _, ca := range caDel {
if isRecovering {
@@ -6147,14 +6157,36 @@ func decodeStreamAssignment(buf []byte) (*streamAssignment, error) {
// createGroupForConsumer will create a new group from same peer set as the stream.
func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *streamAssignment) *raftGroup {
peers := copyStrings(sa.Group.Peers)
if len(peers) == 0 {
if len(sa.Group.Peers) == 0 || cfg.Replicas > len(sa.Group.Peers) {
return nil
}
if cfg.Replicas > 0 && cfg.Replicas != len(peers) {
// First shuffle the peers and then select to account for replica = 1.
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
peers = peers[:cfg.Replicas]
peers := copyStrings(sa.Group.Peers)
var _ss [5]string
active := _ss[:0]
// Calculate all active peers.
for _, peer := range peers {
if sir, ok := cc.s.nodeToInfo.Load(peer); ok && sir != nil {
if !sir.(nodeInfo).offline {
active = append(active, peer)
}
}
}
if quorum := cfg.Replicas/2 + 1; quorum > len(active) {
// Not enough active to satisfy the request.
return nil
}
// If we want less then our parent stream, select from active.
if cfg.Replicas > 0 && cfg.Replicas < len(peers) {
// Pedantic in case stream is say R5 and consumer is R3 and 3 or more offline, etc.
if len(active) < cfg.Replicas {
return nil
}
// First shuffle the active peers and then select to account for replica = 1.
rand.Shuffle(len(active), func(i, j int) { active[i], active[j] = active[j], active[i] })
peers = active[:cfg.Replicas]
}
storage := sa.Config.Storage
if cfg.MemoryStorage {

View File

@@ -2468,12 +2468,6 @@ func TestJetStreamClusterDirectGetStreamUpgrade(t *testing.T) {
// to create a consumer where the replication factor does not match. This could cause
// instability in the state between servers and cause problems on leader switches.
func TestJetStreamClusterInterestPolicyStreamForConsumersToMatchRFactor(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
@@ -2490,3 +2484,41 @@ func TestJetStreamClusterInterestPolicyStreamForConsumersToMatchRFactor(t *testi
require_Error(t, err, NewJSConsumerReplicasShouldMatchStreamError())
}
// https://github.com/nats-io/nats-server/issues/3791
func TestJetStreamClusterKVWatchersWithServerDown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)
kv.PutString("foo", "bar")
kv.PutString("foo", "baz")
// Shutdown a follower.
s := c.randomNonStreamLeader(globalAccountName, "KV_TEST")
s.Shutdown()
c.waitOnLeader()
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()
js, err = nc.JetStream(nats.MaxWait(2 * time.Second))
require_NoError(t, err)
kv, err = js.KeyValue("TEST")
require_NoError(t, err)
for i := 0; i < 100; i++ {
w, err := kv.Watch("foo")
require_NoError(t, err)
w.Stop()
}
}