Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-02-28 15:17:24 -08:00
15 changed files with 358 additions and 231 deletions

View File

@@ -506,8 +506,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
// Assign our transform for republishing.
mset.tr = tr
}
jsa.streams[cfg.Name] = mset
storeDir := filepath.Join(jsa.storeDir, streamsDir, cfg.Name)
jsa.mu.Unlock()
@@ -594,6 +592,11 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}
}
// Register with our account last.
jsa.mu.Lock()
jsa.streams[cfg.Name] = mset
jsa.mu.Unlock()
return mset, nil
}
@@ -3627,39 +3630,9 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
mset.mu.RLock()
isLeader, isClustered, isSealed := mset.isLeader(), mset.isClustered(), mset.cfg.Sealed
mset.mu.RUnlock()
// If we are not the leader just ignore.
if !isLeader {
return
}
if isSealed {
var resp = JSPubAckResponse{
PubAck: &PubAck{Stream: mset.name()},
Error: NewJSStreamSealedError(),
}
b, _ := json.Marshal(resp)
mset.outq.sendMsg(reply, b)
return
}
// Always move this to another Go routine.
hdr, msg := c.msgParts(rmsg)
// If we are not receiving directly from a client we should move this to another Go routine.
if c.kind != CLIENT {
mset.queueInboundMsg(subject, reply, hdr, msg)
return
}
// If we are clustered we need to propose this message to the underlying raft group.
if isClustered {
mset.processClusteredInboundMsg(subject, reply, hdr, msg)
} else {
mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0)
}
mset.queueInboundMsg(subject, reply, hdr, msg)
}
var (
@@ -3696,11 +3669,24 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
numConsumers := len(mset.consumers)
interestRetention := mset.cfg.Retention == InterestPolicy
// Snapshot if we are the leader and if we can respond.
isLeader := mset.isLeader()
isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed
canRespond := doAck && len(reply) > 0 && isLeader
var resp = &JSPubAckResponse{}
// Bail here if sealed.
if isSealed {
outq := mset.outq
mset.mu.Unlock()
if canRespond && outq != nil {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamSealedErr]
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return ApiErrors[JSStreamSealedErr]
}
var buf [256]byte
pubAck := append(buf[:0], mset.pubAck...)
@@ -4161,13 +4147,13 @@ func (mset *stream) signalConsumersLoop() {
// This will update and signal all consumers that match.
func (mset *stream) signalConsumers(subj string, seq uint64) {
mset.clsMu.RLock()
defer mset.clsMu.RUnlock()
if mset.csl == nil {
mset.clsMu.RUnlock()
return
}
r := mset.csl.Match(subj)
mset.clsMu.RUnlock()
if len(r.psubs) == 0 {
return
}
@@ -4759,19 +4745,20 @@ func (mset *stream) state() StreamState {
}
func (mset *stream) stateWithDetail(details bool) StreamState {
mset.mu.RLock()
c, store := mset.client, mset.store
mset.mu.RUnlock()
if c == nil || store == nil {
// mset.store does not change once set, so ok to reference here directly.
// We do this elsewhere as well.
store := mset.store
if store == nil {
return StreamState{}
}
// Currently rely on store.
// Currently rely on store for details.
if details {
return store.State()
}
// Here we do the fast version.
var state StreamState
mset.store.FastState(&state)
store.FastState(&state)
return state
}