mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Do proper ubsubscribe when shutting off restore endpoint
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1344,8 +1344,8 @@ func (a *Account) internalClient() *client {
|
||||
func (a *Account) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
|
||||
a.mu.Lock()
|
||||
c := a.internalClient()
|
||||
sid := strconv.FormatUint(a.isid+1, 10)
|
||||
a.isid++
|
||||
sid := strconv.FormatUint(a.isid, 10)
|
||||
a.mu.Unlock()
|
||||
|
||||
// This will happen in parsing when the account has not been properly setup.
|
||||
|
||||
@@ -2494,7 +2494,7 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
|
||||
if unsub {
|
||||
c.unsubscribe(acc, sub, false, true)
|
||||
if acc != nil && kind == CLIENT || kind == SYSTEM {
|
||||
if acc != nil && kind == CLIENT || kind == SYSTEM || kind == ACCOUNT {
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
|
||||
@@ -1164,7 +1164,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
|
||||
if reply == _EMPTY_ {
|
||||
tfile.Close()
|
||||
os.Remove(tfile.Name())
|
||||
c.processUnsub(sub.sid)
|
||||
sub.client.processUnsub(sub.sid)
|
||||
s.Warnf("Restore for stream %q in account %q requires reply subject for each chunk", stream, c.acc.Name)
|
||||
return
|
||||
}
|
||||
@@ -1179,7 +1179,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
|
||||
mset, err := acc.RestoreStream(stream, tfile)
|
||||
tfile.Close()
|
||||
os.Remove(tfile.Name())
|
||||
c.processUnsub(sub.sid)
|
||||
sub.client.processUnsub(sub.sid)
|
||||
|
||||
end := time.Now()
|
||||
|
||||
@@ -1217,7 +1217,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, subject, r
|
||||
FriendlyBytes(int64(received)), stream, c.acc.Name, err)
|
||||
tfile.Close()
|
||||
os.Remove(tfile.Name())
|
||||
c.processUnsub(sub.sid)
|
||||
sub.client.processUnsub(sub.sid)
|
||||
if reply != _EMPTY_ {
|
||||
s.sendInternalAccountMsg(acc, reply, "-ERR 'storage failure during restore'")
|
||||
}
|
||||
|
||||
@@ -3136,6 +3136,28 @@ func TestJetStreamSnapshotsAPI(t *testing.T) {
|
||||
if rresp.Error != nil {
|
||||
t.Fatalf("Got an unexpected error response: %+v", rresp.Error)
|
||||
}
|
||||
|
||||
// Make sure when we send something without a reply subject the subscription is shutoff.
|
||||
r := bytes.NewReader(snapshot)
|
||||
n, _ := r.Read(chunk[:])
|
||||
nc2.Publish(rresp.DeliverSubject, chunk[:n])
|
||||
nc2.Flush()
|
||||
n, _ = r.Read(chunk[:])
|
||||
if _, err := nc2.Request(rresp.DeliverSubject, chunk[:n], 50*time.Millisecond); err == nil {
|
||||
t.Fatalf("Expected restore subscriptionm to be closed")
|
||||
}
|
||||
|
||||
rmsg, err = nc2.Request(fmt.Sprintf(server.JSApiStreamRestoreT, mname), nil, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error on snapshot request: %v", err)
|
||||
}
|
||||
// Make sure to clear.
|
||||
rresp.Error = nil
|
||||
json.Unmarshal(rmsg.Data, &rresp)
|
||||
if rresp.Error != nil {
|
||||
t.Fatalf("Got an unexpected error response: %+v", rresp.Error)
|
||||
}
|
||||
|
||||
for r := bytes.NewReader(snapshot); ; {
|
||||
n, err := r.Read(chunk[:])
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user