mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Merge pull request #51 from zrob/stale_subs_rebase
Retry: Fix subscription sharing/route reconnect race that causes some clients not to receive messages
This commit is contained in:
@@ -453,6 +453,10 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
if c.nc == nil {
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
c.subs.Set(sub.sid, sub)
|
||||
if c.srv != nil {
|
||||
err = c.srv.sl.Insert(sub.subject, sub)
|
||||
@@ -726,7 +730,7 @@ func (c *client) processMsg(msg []byte) {
|
||||
if rmap == nil {
|
||||
rmap = make(map[string]struct{}, srv.numRoutes())
|
||||
}
|
||||
if sub.client == nil || sub.client.route == nil ||
|
||||
if sub.client == nil || sub.client.nc == nil || sub.client.route == nil ||
|
||||
sub.client.route.remoteID == "" {
|
||||
Debug("Bad or Missing ROUTER Identity, not processing msg",
|
||||
clientConnStr(c.nc), c.cid)
|
||||
|
||||
@@ -477,6 +477,23 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) {
|
||||
s, c, _ := setupClient()
|
||||
c.closeConnection()
|
||||
subs := []byte("SUB foo 1\r\nSUB bar 2\r\n")
|
||||
|
||||
ch := make(chan bool)
|
||||
go func() {
|
||||
c.parse(subs)
|
||||
ch <- true
|
||||
}()
|
||||
<-ch
|
||||
|
||||
if s.sl.Count() != 0 {
|
||||
t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count())
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientMapRemoval(t *testing.T) {
|
||||
s, c, _ := setupClient()
|
||||
c.nc.Close()
|
||||
|
||||
@@ -4,6 +4,7 @@ package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/apcera/gnatsd/hashmap"
|
||||
@@ -11,8 +12,12 @@ import (
|
||||
)
|
||||
|
||||
func TestSplitBufferSubOp(t *testing.T) {
|
||||
cli, trash := net.Pipe()
|
||||
defer cli.Close()
|
||||
defer trash.Close()
|
||||
|
||||
s := &Server{sl: sublist.New()}
|
||||
c := &client{srv: s, subs: hashmap.New()}
|
||||
c := &client{srv: s, subs: hashmap.New(), nc: cli}
|
||||
|
||||
subop := []byte("SUB foo 1\r\n")
|
||||
subop1 := subop[:6]
|
||||
|
||||
Reference in New Issue
Block a user