Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-01-26 08:28:27 -08:00
2 changed files with 76 additions and 6 deletions

View File

@@ -1264,6 +1264,16 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error {
js.processConsumerAssignment(ca)
}
}
// Perform updates on those in saChk. These were existing so make
// sure to process any changes.
for _, sa := range saChk {
if isRecovering {
js.setStreamAssignmentRecovering(sa)
}
js.processUpdateStreamAssignment(sa)
}
// Now do the deltas for existing stream's consumers.
for _, ca := range caDel {
if isRecovering {
@@ -6147,14 +6157,36 @@ func decodeStreamAssignment(buf []byte) (*streamAssignment, error) {
// createGroupForConsumer will create a new group from same peer set as the stream.
func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *streamAssignment) *raftGroup {
peers := copyStrings(sa.Group.Peers)
if len(peers) == 0 {
if len(sa.Group.Peers) == 0 || cfg.Replicas > len(sa.Group.Peers) {
return nil
}
if cfg.Replicas > 0 && cfg.Replicas != len(peers) {
// First shuffle the peers and then select to account for replica = 1.
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
peers = peers[:cfg.Replicas]
peers := copyStrings(sa.Group.Peers)
var _ss [5]string
active := _ss[:0]
// Calculate all active peers.
for _, peer := range peers {
if sir, ok := cc.s.nodeToInfo.Load(peer); ok && sir != nil {
if !sir.(nodeInfo).offline {
active = append(active, peer)
}
}
}
if quorum := cfg.Replicas/2 + 1; quorum > len(active) {
// Not enough active to satisfy the request.
return nil
}
// If we want less then our parent stream, select from active.
if cfg.Replicas > 0 && cfg.Replicas < len(peers) {
// Pedantic in case stream is say R5 and consumer is R3 and 3 or more offline, etc.
if len(active) < cfg.Replicas {
return nil
}
// First shuffle the active peers and then select to account for replica = 1.
rand.Shuffle(len(active), func(i, j int) { active[i], active[j] = active[j], active[i] })
peers = active[:cfg.Replicas]
}
storage := sa.Config.Storage
if cfg.MemoryStorage {

View File

@@ -2462,3 +2462,41 @@ func TestJetStreamClusterDirectGetStreamUpgrade(t *testing.T) {
require_NoError(t, err)
require_True(t, string(entry.Value()) == "derek")
}
// https://github.com/nats-io/nats-server/issues/3791
func TestJetStreamClusterKVWatchersWithServerDown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)
kv.PutString("foo", "bar")
kv.PutString("foo", "baz")
// Shutdown a follower.
s := c.randomNonStreamLeader(globalAccountName, "KV_TEST")
s.Shutdown()
c.waitOnLeader()
nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()
js, err = nc.JetStream(nats.MaxWait(2 * time.Second))
require_NoError(t, err)
kv, err = js.KeyValue("TEST")
require_NoError(t, err)
for i := 0; i < 100; i++ {
w, err := kv.Watch("foo")
require_NoError(t, err)
w.Stop()
}
}