mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge branch 'main' into dev
This commit is contained in:
@@ -759,7 +759,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
|
||||
jsub := rr.psubs[0]
|
||||
|
||||
// If this is directly from a client connection ok to do in place.
|
||||
if c.kind != ROUTER && c.kind != GATEWAY {
|
||||
if c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF {
|
||||
start := time.Now()
|
||||
jsub.icb(sub, c, acc, subject, reply, rmsg)
|
||||
if dur := time.Since(start); dur >= readLoopReportThreshold {
|
||||
@@ -768,7 +768,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
|
||||
return
|
||||
}
|
||||
|
||||
// If we are here we have received this request over a non client connection.
|
||||
// If we are here we have received this request over a non-client connection.
|
||||
// We need to make sure not to block. We will send the request to a long-lived
|
||||
// go routine.
|
||||
|
||||
|
||||
@@ -42,7 +42,10 @@ type jetStreamCluster struct {
|
||||
streams map[string]map[string]*streamAssignment
|
||||
// These are inflight proposals and used to apply limits when there are
|
||||
// concurrent requests that would otherwise be accepted.
|
||||
inflight map[string]map[string]struct{}
|
||||
// We also record the group for the stream. This is needed since if we have
|
||||
// concurrent requests for same account and stream we need to let it process to get
|
||||
// a response but they need to be same group, peers etc.
|
||||
inflight map[string]map[string]*raftGroup
|
||||
// Signals meta-leader should check the stream assignments.
|
||||
streamsCheck bool
|
||||
// Server.
|
||||
@@ -2532,7 +2535,7 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
|
||||
streamName := mset.name()
|
||||
|
||||
if isLeader {
|
||||
s.Noticef("JetStream cluster new stream leader for '%s > %s'", sa.Client.serviceAccount(), streamName)
|
||||
s.Noticef("JetStream cluster new stream leader for '%s > %s'", account, streamName)
|
||||
s.sendStreamLeaderElectAdvisory(mset)
|
||||
// Check for peer removal and process here if needed.
|
||||
js.checkPeers(sa.Group)
|
||||
@@ -2709,20 +2712,23 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
|
||||
// Update our state.
|
||||
accStreams[stream] = sa
|
||||
cc.streams[accName] = accStreams
|
||||
hasResponded := sa.responded
|
||||
js.mu.Unlock()
|
||||
|
||||
acc, err := s.LookupAccount(accName)
|
||||
if err != nil {
|
||||
ll := fmt.Sprintf("Account [%s] lookup for stream create failed: %v", accName, err)
|
||||
if isMember {
|
||||
// If we can not lookup the account and we are a member, send this result back to the metacontroller leader.
|
||||
result := &streamAssignmentResult{
|
||||
Account: accName,
|
||||
Stream: stream,
|
||||
Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}},
|
||||
if !hasResponded {
|
||||
// If we can not lookup the account and we are a member, send this result back to the metacontroller leader.
|
||||
result := &streamAssignmentResult{
|
||||
Account: accName,
|
||||
Stream: stream,
|
||||
Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}},
|
||||
}
|
||||
result.Response.Error = NewJSNoAccountError()
|
||||
s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, result)
|
||||
}
|
||||
result.Response.Error = NewJSNoAccountError()
|
||||
s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, result)
|
||||
s.Warnf(ll)
|
||||
} else {
|
||||
s.Debugf(ll)
|
||||
@@ -2993,10 +2999,49 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
|
||||
mset, err = acc.lookupStream(sa.Config.Name)
|
||||
if err == nil && mset != nil {
|
||||
osa := mset.streamAssignment()
|
||||
// If we already have a stream assignment and they are the same exact config, short circuit here.
|
||||
if osa != nil {
|
||||
if reflect.DeepEqual(osa.Config, sa.Config) {
|
||||
if sa.Group.Name == osa.Group.Name && reflect.DeepEqual(sa.Group.Peers, osa.Group.Peers) {
|
||||
// Since this already exists we know it succeeded, just respond to this caller.
|
||||
js.mu.RLock()
|
||||
client, subject, reply := sa.Client, sa.Subject, sa.Reply
|
||||
js.mu.RUnlock()
|
||||
|
||||
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
|
||||
resp.StreamInfo = &StreamInfo{
|
||||
Created: mset.createdTime(),
|
||||
State: mset.state(),
|
||||
Config: mset.config(),
|
||||
Cluster: js.clusterInfo(mset.raftGroup()),
|
||||
Sources: mset.sourcesInfo(),
|
||||
Mirror: mset.mirrorInfo(),
|
||||
}
|
||||
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
|
||||
return
|
||||
} else {
|
||||
// We had a bug where we could have multiple assignments for the same
|
||||
// stream but with different group assignments, including multiple raft
|
||||
// groups. So check for that here. We can only bet on the last one being
|
||||
// consistent in the long run, so let it continue if we see this condition.
|
||||
s.Warnf("JetStream cluster detected duplicate assignment for stream %q for account %q", sa.Config.Name, acc.Name)
|
||||
if osa.Group.node != nil && osa.Group.node != sa.Group.node {
|
||||
osa.Group.node.Delete()
|
||||
osa.Group.node = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
mset.setStreamAssignment(sa)
|
||||
if err = mset.updateWithAdvisory(sa.Config, false); err != nil {
|
||||
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
|
||||
// Process the raft group and make sure it's running if needed.
|
||||
js.createRaftGroup(acc.GetName(), osa.Group, storage)
|
||||
mset.setStreamAssignment(osa)
|
||||
if rg.node != nil {
|
||||
rg.node.Delete()
|
||||
rg.node = nil
|
||||
}
|
||||
}
|
||||
} else if err == NewJSStreamNotFoundError() {
|
||||
// Add in the stream here.
|
||||
@@ -5049,26 +5094,40 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
|
||||
js.mu.Lock()
|
||||
defer js.mu.Unlock()
|
||||
|
||||
// Capture if we have existing assignment first.
|
||||
osa := js.streamAssignment(acc.Name, cfg.Name)
|
||||
var areEqual bool
|
||||
if osa != nil {
|
||||
areEqual = reflect.DeepEqual(osa.Config, cfg)
|
||||
}
|
||||
|
||||
// If this stream already exists, turn this into a stream info call.
|
||||
if sa := js.streamAssignment(acc.Name, cfg.Name); sa != nil {
|
||||
if osa != nil {
|
||||
// If they are the same then we will forward on as a stream info request.
|
||||
// This now matches single server behavior.
|
||||
if reflect.DeepEqual(sa.Config, cfg) {
|
||||
isubj := fmt.Sprintf(JSApiStreamInfoT, cfg.Name)
|
||||
// We want to make sure we send along the client info.
|
||||
cij, _ := json.Marshal(ci)
|
||||
hdr := map[string]string{
|
||||
ClientInfoHdr: string(cij),
|
||||
JSResponseType: jsCreateResponse,
|
||||
if areEqual {
|
||||
// This works when we have a stream leader. If we have no leader let the dupe
|
||||
// go through as normal. We will handle properly on the other end.
|
||||
// We must check interest at the $SYS account layer, not user account since import
|
||||
// will always show interest.
|
||||
sisubj := fmt.Sprintf(clusterStreamInfoT, acc.Name, cfg.Name)
|
||||
if s.SystemAccount().Interest(sisubj) > 0 {
|
||||
isubj := fmt.Sprintf(JSApiStreamInfoT, cfg.Name)
|
||||
// We want to make sure we send along the client info.
|
||||
cij, _ := json.Marshal(ci)
|
||||
hdr := map[string]string{
|
||||
ClientInfoHdr: string(cij),
|
||||
JSResponseType: jsCreateResponse,
|
||||
}
|
||||
// Send this as system account, but include client info header.
|
||||
s.sendInternalAccountMsgWithReply(nil, isubj, reply, hdr, nil, true)
|
||||
return
|
||||
}
|
||||
// Send this as system account, but include client info header.
|
||||
s.sendInternalAccountMsgWithReply(nil, isubj, reply, hdr, nil, true)
|
||||
} else {
|
||||
resp.Error = NewJSStreamNameExistError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
resp.Error = NewJSStreamNameExistError()
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
|
||||
if cfg.Sealed {
|
||||
@@ -5080,6 +5139,10 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
|
||||
// Check for subject collisions here.
|
||||
asa := cc.streams[acc.Name]
|
||||
for _, sa := range asa {
|
||||
// If we found an osa and are here we are letting this through
|
||||
if sa == osa && areEqual {
|
||||
continue
|
||||
}
|
||||
for _, subj := range sa.Config.Subjects {
|
||||
for _, tsubj := range cfg.Subjects {
|
||||
if SubjectsCollide(tsubj, subj) {
|
||||
@@ -5100,29 +5163,45 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
|
||||
}
|
||||
|
||||
// Raft group selection and placement.
|
||||
rg, err := js.createGroupForStream(ci, cfg)
|
||||
if err != nil {
|
||||
resp.Error = NewJSClusterNoPeersError(err)
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
return
|
||||
var rg *raftGroup
|
||||
if osa != nil && areEqual {
|
||||
rg = osa.Group
|
||||
} else {
|
||||
// Check inflight before proposing in case we have an existing inflight proposal.
|
||||
if cc.inflight == nil {
|
||||
cc.inflight = make(map[string]map[string]*raftGroup)
|
||||
}
|
||||
streams, ok := cc.inflight[acc.Name]
|
||||
if !ok {
|
||||
streams = make(map[string]*raftGroup)
|
||||
cc.inflight[acc.Name] = streams
|
||||
} else if existing, ok := streams[cfg.Name]; ok {
|
||||
// We have existing for same stream. Re-use same group.
|
||||
rg = existing
|
||||
}
|
||||
}
|
||||
// Pick a preferred leader.
|
||||
rg.setPreferred()
|
||||
// Create a new one here.
|
||||
if rg == nil {
|
||||
nrg, err := js.createGroupForStream(ci, cfg)
|
||||
if err != nil {
|
||||
resp.Error = NewJSClusterNoPeersError(err)
|
||||
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
|
||||
return
|
||||
}
|
||||
rg = nrg
|
||||
// Pick a preferred leader.
|
||||
rg.setPreferred()
|
||||
}
|
||||
|
||||
// Sync subject for post snapshot sync.
|
||||
sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
|
||||
if err := cc.meta.Propose(encodeAddStreamAssignment(sa)); err == nil {
|
||||
// On success, add this as an inflight proposal so we can apply limits
|
||||
// on concurrent create requests while this stream assignment has
|
||||
// possibly not been processed yet.
|
||||
if cc.inflight == nil {
|
||||
cc.inflight = make(map[string]map[string]struct{})
|
||||
if streams, ok := cc.inflight[acc.Name]; ok {
|
||||
streams[cfg.Name] = rg
|
||||
}
|
||||
streams, ok := cc.inflight[acc.Name]
|
||||
if !ok {
|
||||
streams = make(map[string]struct{})
|
||||
cc.inflight[acc.Name] = streams
|
||||
}
|
||||
streams[cfg.Name] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1465,3 +1465,175 @@ func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T)
|
||||
_, err = js.ConsumerInfo("TEST", "d")
|
||||
require_Error(t, err, nats.ErrConsumerNotFound)
|
||||
}
|
||||
|
||||
// https://github.com/nats-io/nats-server/issues/3677
|
||||
func TestJetStreamParallelStreamCreation(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
np := 20
|
||||
|
||||
startCh := make(chan bool)
|
||||
errCh := make(chan error, np)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(np)
|
||||
for i := 0; i < np; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Individual connection
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
// Make them all fire at once.
|
||||
<-startCh
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"common.*.*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
close(startCh)
|
||||
wg.Wait()
|
||||
|
||||
if len(errCh) > 0 {
|
||||
t.Fatalf("Expected no errors, got %d", len(errCh))
|
||||
}
|
||||
}
|
||||
|
||||
// In addition to test above, if streams were attempted to be created in parallel
|
||||
// it could be that multiple raft groups would be created for the same asset.
|
||||
func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
np := 20
|
||||
|
||||
startCh := make(chan bool)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(np)
|
||||
for i := 0; i < np; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Individual connection
|
||||
nc, _ := jsClientConnect(t, c.randomServer())
|
||||
js, _ := nc.JetStream(nats.MaxWait(time.Second))
|
||||
defer nc.Close()
|
||||
|
||||
// Make them all fire at once.
|
||||
<-startCh
|
||||
|
||||
// Ignore errors in this test, care about raft group and metastate.
|
||||
js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"common.*.*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
close(startCh)
|
||||
wg.Wait()
|
||||
|
||||
// Restart a server too.
|
||||
s := c.randomServer()
|
||||
s.Shutdown()
|
||||
s = c.restartServer(s)
|
||||
c.waitOnLeader()
|
||||
c.waitOnStreamLeader(globalAccountName, "TEST")
|
||||
// Check that this server has only two active raft nodes after restart.
|
||||
if nrn := s.numRaftNodes(); nrn != 2 {
|
||||
t.Fatalf("Expected only two active raft nodes, got %d", nrn)
|
||||
}
|
||||
|
||||
// Make sure we only have 2 unique raft groups for all servers.
|
||||
// One for meta, one for stream.
|
||||
expected := 2
|
||||
rg := make(map[string]struct{})
|
||||
for _, s := range c.servers {
|
||||
s.mu.RLock()
|
||||
for _, ni := range s.raftNodes {
|
||||
n := ni.(*raft)
|
||||
rg[n.Group()] = struct{}{}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
if len(rg) != expected {
|
||||
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamParallelConsumerCreation(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"common.*.*"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
np := 20
|
||||
|
||||
startCh := make(chan bool)
|
||||
errCh := make(chan error, np)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(np)
|
||||
for i := 0; i < np; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Individual connection
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
// Make them all fire at once.
|
||||
<-startCh
|
||||
|
||||
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
|
||||
Durable: "dlc",
|
||||
Replicas: 3,
|
||||
})
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
close(startCh)
|
||||
wg.Wait()
|
||||
|
||||
if len(errCh) > 0 {
|
||||
t.Fatalf("Expected no errors, got %d", len(errCh))
|
||||
}
|
||||
|
||||
// Make sure we only have 3 unique raft groups for all servers.
|
||||
// One for meta, one for stream, one for consumer.
|
||||
expected := 3
|
||||
rg := make(map[string]struct{})
|
||||
for _, s := range c.servers {
|
||||
s.mu.RLock()
|
||||
for _, ni := range s.raftNodes {
|
||||
n := ni.(*raft)
|
||||
rg[n.Group()] = struct{}{}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
if len(rg) != expected {
|
||||
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1436,6 +1436,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
|
||||
ocfg := mset.cfg
|
||||
s := mset.srv
|
||||
mset.mu.RUnlock()
|
||||
|
||||
cfg, err := mset.jsa.configUpdateCheck(&ocfg, config, s)
|
||||
if err != nil {
|
||||
return NewJSStreamInvalidConfigError(err, Unless(err))
|
||||
|
||||
Reference in New Issue
Block a user