mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix race in interest propogation to new routes
This commit is contained in:
1
TODO.md
1
TODO.md
@@ -2,6 +2,7 @@
|
||||
# General
|
||||
|
||||
- [ ] Multiple listen endpoints
|
||||
- [ ] Websocket / HTTP2 strategy
|
||||
- [ ] Listen configure key vs addr and port
|
||||
- [ ] Multiple Authorization / Access
|
||||
- [ ] T series reservations
|
||||
|
||||
@@ -143,13 +143,6 @@ func (c *client) initClient(tlsConn bool) {
|
||||
c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid)
|
||||
}
|
||||
|
||||
// No clue why, but this stalls and kills performance on Mac (Mavericks).
|
||||
//
|
||||
// if ip, ok := c.nc.(*net.TCPConn); ok {
|
||||
// ip.SetReadBuffer(s.opts.BufSize)
|
||||
// ip.SetWriteBuffer(2 * s.opts.BufSize)
|
||||
// }
|
||||
|
||||
if !tlsConn {
|
||||
// Set the Ping timer
|
||||
c.setPingTimer()
|
||||
@@ -557,7 +550,6 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
// race conditions. We should make sure that we process only one.
|
||||
sid := string(sub.sid)
|
||||
if c.subs[sid] == nil {
|
||||
c.subs[sid] = sub
|
||||
if c.srv != nil {
|
||||
err = c.srv.sl.Insert(sub)
|
||||
if err != nil {
|
||||
@@ -570,12 +562,19 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
c.mu.Unlock()
|
||||
if err != nil {
|
||||
c.sendErr("Invalid Subject")
|
||||
return nil
|
||||
} else if c.opts.Verbose {
|
||||
c.sendOK()
|
||||
}
|
||||
if shouldForward {
|
||||
c.srv.broadcastSubscribe(sub)
|
||||
}
|
||||
|
||||
// We add it to local client map here to avoid race with new routers and sendLocalSubsToRoute().
|
||||
c.mu.Lock()
|
||||
c.subs[sid] = sub
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -798,6 +797,7 @@ func (c *client) processMsg(msg []byte) {
|
||||
c.cache.results = make(map[string]*SublistResult)
|
||||
c.cache.genid = genid
|
||||
}
|
||||
|
||||
if !ok {
|
||||
subject := string(c.pa.subject)
|
||||
r = srv.sl.Match(subject)
|
||||
|
||||
@@ -36,6 +36,7 @@ type route struct {
|
||||
url *url.URL
|
||||
authRequired bool
|
||||
tlsRequired bool
|
||||
didSubs bool
|
||||
}
|
||||
|
||||
type connectInfo struct {
|
||||
@@ -231,7 +232,6 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
|
||||
// and large subscription space. Plus buffering in place not a good idea.
|
||||
func (s *Server) sendLocalSubsToRoute(route *client) {
|
||||
b := bytes.Buffer{}
|
||||
|
||||
s.mu.Lock()
|
||||
for _, client := range s.clients {
|
||||
client.mu.Lock()
|
||||
@@ -249,6 +249,7 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
|
||||
s.mu.Unlock()
|
||||
|
||||
route.mu.Lock()
|
||||
route.route.didSubs = true
|
||||
defer route.mu.Unlock()
|
||||
route.bw.Write(b.Bytes())
|
||||
route.bw.Flush()
|
||||
@@ -480,8 +481,10 @@ func (s *Server) broadcastToRoutes(proto string) {
|
||||
for _, route := range s.routes {
|
||||
// FIXME(dlc) - Make same logic as deliverMsg
|
||||
route.mu.Lock()
|
||||
route.bw.WriteString(proto)
|
||||
route.bw.Flush()
|
||||
if route.route.didSubs {
|
||||
route.bw.WriteString(proto)
|
||||
route.bw.Flush()
|
||||
}
|
||||
route.mu.Unlock()
|
||||
route.traceOutOp("", arg)
|
||||
}
|
||||
|
||||
@@ -331,13 +331,9 @@ const (
|
||||
msgIndex = 6
|
||||
)
|
||||
|
||||
// Reuse expect buffer
|
||||
// TODO(dlc) - This may be too simplistic in the long run, may need
|
||||
// to consider holding onto data from previous reads matched by conn.
|
||||
var expBuf = make([]byte, 32768)
|
||||
|
||||
// Test result from server against regexp
|
||||
func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
|
||||
expBuf := make([]byte, 32768)
|
||||
// Wait for commands to be processed and results queued for read
|
||||
c.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
n, err := c.Read(expBuf)
|
||||
@@ -355,6 +351,7 @@ func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
|
||||
}
|
||||
|
||||
func expectNothing(t tLogger, c net.Conn) {
|
||||
expBuf := make([]byte, 32)
|
||||
c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
n, err := c.Read(expBuf)
|
||||
c.SetReadDeadline(time.Time{})
|
||||
|
||||
Reference in New Issue
Block a user