diff --git a/README.md b/README.md index 5fa5e1dd..1d284c1d 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,8 @@ If you are interested in contributing to NATS, read about our... [Fossa-Image]: https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fnats-server.svg?type=shield [Build-Status-Url]: https://travis-ci.com/github/nats-io/nats-server [Build-Status-Image]: https://travis-ci.com/nats-io/nats-server.svg?branch=main -[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.9.3 -[Release-image]: https://img.shields.io/badge/release-v2.9.3-1eb0fc.svg +[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.9.4 +[Release-image]: https://img.shields.io/badge/release-v2.9.4-1eb0fc.svg [Coverage-Url]: https://coveralls.io/r/nats-io/nats-server?branch=main [Coverage-image]: https://coveralls.io/repos/github/nats-io/nats-server/badge.svg?branch=main [ReportCard-Url]: https://goreportcard.com/report/nats-io/nats-server diff --git a/go.mod b/go.mod index 86e6c71f..0bb92903 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.15.11 github.com/minio/highwayhash v1.0.2 github.com/nats-io/jwt/v2 v2.3.0 - github.com/nats-io/nats.go v1.18.1-0.20221020050032-ffbe2f99dcc9 + github.com/nats-io/nats.go v1.19.0 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.5.1 diff --git a/go.sum b/go.sum index 2c4138f6..0198d625 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats.go v1.18.1-0.20221020050032-ffbe2f99dcc9 h1:c8WDoal9g/px+mfTREFbXa7kNT9LehusELc3YjsjQpo= -github.com/nats-io/nats.go v1.18.1-0.20221020050032-ffbe2f99dcc9/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.19.0 h1:H6j8aBnTQFoVrTGB6Xjd903UMdE7jz6DS4YkmAqgZ9Q= +github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/consumer.go b/server/consumer.go index a38ecf9c..84e384cd 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2611,24 +2611,20 @@ func (wq *waitQueue) compact() { wq.rp, wq.wp, wq.n, wq.reqs = 0, i, i, nreqs } -// Return the replies for our pending requests. +// Return the map of pending requests keyed by the reply subject. // No-op if push consumer or invalid etc. -func (o *consumer) pendingRequestReplies() []string { +func (o *consumer) pendingRequests() map[string]*waitingRequest { if o.waiting == nil { return nil } - wq, m := o.waiting, make(map[string]struct{}) + wq, m := o.waiting, make(map[string]*waitingRequest) for i, rp := 0, o.waiting.rp; i < wq.n; i++ { if wr := wq.reqs[rp]; wr != nil { - m[wr.reply] = struct{}{} + m[wr.reply] = wr } rp = (rp + 1) % cap(wq.reqs) } - var replies []string - for reply := range m { - replies = append(replies, reply) - } - return replies + return m } // Return next waiting request. This will check for expirations but not noWait or interest. diff --git a/server/jetstream.go b/server/jetstream.go index 6e26c904..e6a46b45 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -822,7 +822,7 @@ func (s *Server) signalPullConsumers() { defer js.mu.RUnlock() // In case we have stale pending requests. - hdr := []byte("NATS/1.0 409 Server Shutdown\r\n\r\n") + const hdr = "NATS/1.0 409 Server Shutdown\r\n" + JSPullRequestPendingMsgs + ": %d\r\n" + JSPullRequestPendingBytes + ": %d\r\n\r\n" var didSend bool for _, jsa := range js.accounts { @@ -833,8 +833,9 @@ func (s *Server) signalPullConsumers() { o.mu.RLock() // Only signal on R1. if o.cfg.Replicas <= 1 { - for _, reply := range o.pendingRequestReplies() { - o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + for reply, wr := range o.pendingRequests() { + shdr := fmt.Sprintf(hdr, wr.n, wr.b) + o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, []byte(shdr), nil, nil, 0)) didSend = true } } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 171c62bd..8a92d702 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -19629,18 +19629,29 @@ func TestJetStreamPullConsumersTimeoutHeaders(t *testing.T) { bytesReceived := 0 messagesReceived := 0 - for { + + checkHeaders := func(expectedStatus, expectedDesc string, m *nats.Msg) { + t.Helper() + if value := m.Header.Get("Status"); value != expectedStatus { + t.Fatalf("Expected status %q, got %q", expectedStatus, value) + } + if value := m.Header.Get("Description"); value != expectedDesc { + t.Fatalf("Expected description %q, got %q", expectedDesc, value) + } + if value := m.Header.Get(JSPullRequestPendingMsgs); value != fmt.Sprint(batch-messagesReceived) { + t.Fatalf("Expected %d messages, got %s", batch-messagesReceived, value) + } + if value := m.Header.Get(JSPullRequestPendingBytes); value != fmt.Sprint(maxBytes-bytesReceived) { + t.Fatalf("Expected %d bytes, got %s", maxBytes-bytesReceived, value) + } + } + + for done := false; !done; { select { case m := <-msgs: if len(m.Data) == 0 && m.Header != nil { - - if value := m.Header.Get(JSPullRequestPendingMsgs); value != fmt.Sprint(batch-messagesReceived) { - t.Fatalf("Expected %d messages, got %s", batch-messagesReceived, value) - } - if value := m.Header.Get(JSPullRequestPendingBytes); value != fmt.Sprint(maxBytes-bytesReceived) { - t.Fatalf("Expected %d bytes, got %s", maxBytes-bytesReceived, value) - } - return + checkHeaders("408", "Request Timeout", m) + done = true } else { messagesReceived += 1 bytesReceived += (len(m.Data) + len(m.Header) + len(m.Reply) + len(m.Subject)) @@ -19650,4 +19661,23 @@ func TestJetStreamPullConsumersTimeoutHeaders(t *testing.T) { } } + // Now resend the request but then shutdown the server and + // make sure we have the same info. + err = nc.PublishRequest(rsubj, reply, jreq) + require_NoError(t, err) + natsFlush(t, nc) + + s.Shutdown() + + // It is possible that the client did not receive, so let's not fail + // on that. But if the 409 indicating the the server is shutdown + // is received, then it should have the new headers. + messagesReceived, bytesReceived = 0, 0 + select { + case m := <-msgs: + checkHeaders("409", "Server Shutdown", m) + case <-time.After(500 * time.Millisecond): + // we can't fail for that. + t.Logf("Subscription did not receive the pull request response on server shutdown") + } } diff --git a/server/leafnode.go b/server/leafnode.go index a7caf53f..00e899cd 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1670,7 +1670,8 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { c.leaf.smap = make(map[string]int32) for _, sub := range subs { subj := string(sub.subject) - if c.isSpokeLeafNode() && !c.canSubscribe(subj) { + // Check perms regardless of role. + if !c.canSubscribe(subj) { c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", subj, accName, accNTag) continue } @@ -1970,6 +1971,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { if checkPerms && subjectIsLiteral(string(sub.subject)) && !c.pubAllowedFullCheck(string(sub.subject), true, true) { c.mu.Unlock() c.leafSubPermViolation(sub.subject) + c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) return nil } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index bc2e7c7f..903396cb 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4564,3 +4564,101 @@ func TestLeafNodeSignatureCB(t *testing.T) { defer sl.Shutdown() checkLeafNodeConnected(t, sl) } + +type testLeafTraceLogger struct { + DummyLogger + ch chan string +} + +func (l *testLeafTraceLogger) Tracef(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + // We will sub to 'baz' and to 'bar', so filter on 'ba' prefix. + if strings.Contains(msg, "[LS+ ba") { + select { + case l.ch <- msg: + default: + } + } +} + +// Make sure permissioned denied subs do not make it to the leafnode even if existing. +func TestLeafNodePermsSuppressSubs(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + authorization { + PERMS = { + publish = "foo" + subscribe = ["_INBOX.>"] + } + users = [ + {user: "user", password: "pass"} + {user: "ln", password: "pass" , permissions: $PERMS } + ] + } + no_auth_user: user + + leafnodes { + listen: 127.0.0.1:7422 + } + `)) + defer removeFile(t, conf) + + lconf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + leafnodes { + remotes = [ { url: "nats://ln:pass@127.0.0.1" } ] + } + trace = true + `)) + defer removeFile(t, lconf) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + // Connect client to the hub. + nc, err := nats.Connect(s.ClientURL()) + require_NoError(t, err) + + // This should not be seen on leafnode side since we only allow pub to "foo" + _, err = nc.SubscribeSync("baz") + require_NoError(t, err) + + ln, _ := RunServerWithConfig(lconf) + defer ln.Shutdown() + + // Setup logger to capture trace events. + l := &testLeafTraceLogger{ch: make(chan string, 10)} + ln.SetLogger(l, true, true) + + checkLeafNodeConnected(t, ln) + + // Need to have ot reconnect to trigger since logger attaches too late. + ln.mu.Lock() + for _, c := range ln.leafs { + c.mu.Lock() + c.nc.Close() + c.mu.Unlock() + } + ln.mu.Unlock() + checkLeafNodeConnectedCount(t, ln, 0) + checkLeafNodeConnectedCount(t, ln, 1) + + select { + case msg := <-l.ch: + t.Fatalf("Unexpected LS+ seen on leafnode: %s", msg) + case <-time.After(50 * time.Millisecond): + // OK + } + + // Now double check that new subs also do not propagate. + // This behavior was working already. + _, err = nc.SubscribeSync("bar") + require_NoError(t, err) + + select { + case msg := <-l.ch: + t.Fatalf("Unexpected LS+ seen on leafnode: %s", msg) + case <-time.After(50 * time.Millisecond): + // OK + } +}