diff --git a/TODO.md b/TODO.md index 9d7700e3..5459f7bb 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,7 @@ # General - [ ] Multiple listen endpoints +- [ ] Websocket / HTTP2 strategy - [ ] Listen configure key vs addr and port - [ ] Multiple Authorization / Access - [ ] T series reservations diff --git a/server/client.go b/server/client.go index 1712a5d8..f5764174 100644 --- a/server/client.go +++ b/server/client.go @@ -143,13 +143,6 @@ func (c *client) initClient(tlsConn bool) { c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid) } - // No clue why, but this stalls and kills performance on Mac (Mavericks). - // - // if ip, ok := c.nc.(*net.TCPConn); ok { - // ip.SetReadBuffer(s.opts.BufSize) - // ip.SetWriteBuffer(2 * s.opts.BufSize) - // } - if !tlsConn { // Set the Ping timer c.setPingTimer() @@ -557,7 +550,6 @@ func (c *client) processSub(argo []byte) (err error) { // race conditions. We should make sure that we process only one. sid := string(sub.sid) if c.subs[sid] == nil { - c.subs[sid] = sub if c.srv != nil { err = c.srv.sl.Insert(sub) if err != nil { @@ -570,12 +562,19 @@ func (c *client) processSub(argo []byte) (err error) { c.mu.Unlock() if err != nil { c.sendErr("Invalid Subject") + return nil } else if c.opts.Verbose { c.sendOK() } if shouldForward { c.srv.broadcastSubscribe(sub) } + + // We add it to local client map here to avoid race with new routers and sendLocalSubsToRoute(). + c.mu.Lock() + c.subs[sid] = sub + c.mu.Unlock() + return nil } @@ -798,6 +797,7 @@ func (c *client) processMsg(msg []byte) { c.cache.results = make(map[string]*SublistResult) c.cache.genid = genid } + if !ok { subject := string(c.pa.subject) r = srv.sl.Match(subject) diff --git a/server/route.go b/server/route.go index 6d0d03f5..52279f02 100644 --- a/server/route.go +++ b/server/route.go @@ -36,6 +36,7 @@ type route struct { url *url.URL authRequired bool tlsRequired bool + didSubs bool } type connectInfo struct { @@ -231,7 +232,6 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { // and large subscription space. Plus buffering in place not a good idea. func (s *Server) sendLocalSubsToRoute(route *client) { b := bytes.Buffer{} - s.mu.Lock() for _, client := range s.clients { client.mu.Lock() @@ -249,6 +249,7 @@ func (s *Server) sendLocalSubsToRoute(route *client) { s.mu.Unlock() route.mu.Lock() + route.route.didSubs = true defer route.mu.Unlock() route.bw.Write(b.Bytes()) route.bw.Flush() @@ -480,8 +481,10 @@ func (s *Server) broadcastToRoutes(proto string) { for _, route := range s.routes { // FIXME(dlc) - Make same logic as deliverMsg route.mu.Lock() - route.bw.WriteString(proto) - route.bw.Flush() + if route.route.didSubs { + route.bw.WriteString(proto) + route.bw.Flush() + } route.mu.Unlock() route.traceOutOp("", arg) } diff --git a/test/test.go b/test/test.go index cd011cc5..fac72f52 100644 --- a/test/test.go +++ b/test/test.go @@ -331,13 +331,9 @@ const ( msgIndex = 6 ) -// Reuse expect buffer -// TODO(dlc) - This may be too simplistic in the long run, may need -// to consider holding onto data from previous reads matched by conn. -var expBuf = make([]byte, 32768) - // Test result from server against regexp func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte { + expBuf := make([]byte, 32768) // Wait for commands to be processed and results queued for read c.SetReadDeadline(time.Now().Add(2 * time.Second)) n, err := c.Read(expBuf) @@ -355,6 +351,7 @@ func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte { } func expectNothing(t tLogger, c net.Conn) { + expBuf := make([]byte, 32) c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) n, err := c.Read(expBuf) c.SetReadDeadline(time.Time{})