diff --git a/.github/actions/nightly-release/action.yaml b/.github/actions/nightly-release/action.yaml new file mode 100644 index 00000000..7fab7dd6 --- /dev/null +++ b/.github/actions/nightly-release/action.yaml @@ -0,0 +1,57 @@ +name: Nightly Docker Releaser +description: Builds nightly docker images + +inputs: + go: + description: The version of go to build with + required: true + + label: + description: The label to use for built images + required: true + + hub_username: + description: Docker hub username + required: true + + hub_password: + description: Docker hub password + required: true + + workdir: + description: The working directory for actions requiring it + required: true + +runs: + using: composite + steps: + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: "${{ inputs.go }}" + + - name: goreleaser + uses: goreleaser/goreleaser-action@v3 + with: + workdir: "${{ inputs.workdir }}" + version: latest + args: release --snapshot --config .goreleaser-nightly.yml + + - name: images + shell: bash + run: docker images + + - name: docker_login + shell: bash + run: docker login -u "${{ inputs.hub_username }}" -p "${{ inputs.hub_password }}" + + - name: docker_push + shell: bash + run: | + NDATE=$(date +%Y%m%d) + + docker tag synadia/nats-server:nightly-${NDATE} synadia/nats-server:${{ inputs.label }}-${NDATE} + docker tag synadia/nats-server:nightly-${NDATE} synadia/nats-server:${{ inputs.label }} + + docker push synadia/nats-server:${{ inputs.label }}-${NDATE} + docker push synadia/nats-server:${{ inputs.label }} diff --git a/.github/workflows/rc_nightly.yaml b/.github/workflows/rc_nightly.yaml new file mode 100644 index 00000000..29e16bd8 --- /dev/null +++ b/.github/workflows/rc_nightly.yaml @@ -0,0 +1,25 @@ +name: NATS Server Nightly MAIN +on: + workflow_dispatch: {} + + schedule: + - cron: "40 4 * * *" + + +jobs: + nightly_main_release: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + path: src/github.com/nats-io/nats-server + ref: main + + - uses: ./src/github.com/nats-io/nats-server/.github/actions/nightly-release + with: + go: "1.19" + workdir: src/github.com/nats-io/nats-server + label: nightly-main + hub_username: "${{ secrets.DOCKER_USERNAME }}" + hub_password: "${{ secrets.DOCKER_PASSWORD }}" \ No newline at end of file diff --git a/server/consumer.go b/server/consumer.go index da18e05d..e1eea662 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1,4 +1,4 @@ -// Copyright 2019-2022 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/server/filestore.go b/server/filestore.go index 5a9b99b0..d2067141 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5631,11 +5631,9 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg) return } - // Mark dirty - mb.fssNeedsWrite = true - if ss.Msgs == 1 { delete(mb.fss, subj) + mb.fssNeedsWrite = true // Mark dirty return } @@ -5648,8 +5646,10 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg) if ss.Msgs == 1 { if seq != ss.First { ss.Last = ss.First + mb.fssNeedsWrite = true // Mark dirty } else { ss.First = ss.Last + mb.fssNeedsWrite = true // Mark dirty } return } @@ -5665,6 +5665,7 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg) if sm, _ := mb.cacheLookup(tseq, smp); sm != nil { if sm.subj == subj { ss.First = tseq + mb.fssNeedsWrite = true // Mark dirty return } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c8822016..357ca029 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4315,7 +4315,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea if s != nil && mset != nil { s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err) } + } else if state, err := o.store.State(); err == nil { + // See if we need to process this update if our parent stream is not a limits policy stream. + o.mu.RLock() + mset := o.mset + shouldProcessAcks := mset != nil && o.retention != LimitsPolicy + o.mu.RUnlock() + // We should make sure to update the acks. + if shouldProcessAcks { + var ss StreamState + mset.store.FastState(&ss) + for seq := ss.FirstSeq; seq <= state.AckFloor.Stream; seq++ { + mset.ackMsg(o, seq) + } + } } + } else if e.Type == EntryRemovePeer { js.mu.RLock() var ourID string diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 05c1a45e..84ba1e96 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2958,6 +2958,7 @@ func TestJetStreamClusterWorkQueueConsumerReplicatedAfterScaleUp(t *testing.T) { require_True(t, ci.Config.Replicas == 0 || ci.Config.Replicas == 3) + c.waitOnConsumerLeader(globalAccountName, "TEST", ci.Name) s := c.consumerLeader(globalAccountName, "TEST", ci.Name) require_NotNil(t, s) @@ -3015,3 +3016,94 @@ func TestJetStreamClusterWorkQueueAfterScaleUp(t *testing.T) { require_NoError(t, err) require_True(t, si.State.Msgs == 0) } + +func TestJetStreamClusterInterestBasedStreamAndConsumerSnapshots(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 3, + Subjects: []string{"foo"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + sub, err := js.SubscribeSync("foo", nats.Durable("d22")) + require_NoError(t, err) + + num := 200 + for i := 0; i < num; i++ { + js.PublishAsync("foo", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + checkSubsPending(t, sub, num) + + // Shutdown one server. + s := c.randomServer() + s.Shutdown() + + c.waitOnStreamLeader(globalAccountName, "TEST") + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Now ack all messages while the other server is down. + for i := 0; i < num; i++ { + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + m.AckSync() + } + + // Wait for all message acks to be processed and all messages to be removed. + checkFor(t, time.Second, 200*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.State.Msgs == 0 { + return nil + } + return fmt.Errorf("Still have %d msgs left", si.State.Msgs) + }) + + // Force a snapshot on the consumer leader before restarting the downed server. + cl := c.consumerLeader(globalAccountName, "TEST", "d22") + require_NotNil(t, cl) + + mset, err := cl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + o := mset.lookupConsumer("d22") + require_NotNil(t, o) + + snap, err := o.store.EncodedState() + require_NoError(t, err) + + n := o.raftNode() + require_NotNil(t, n) + require_NoError(t, n.InstallSnapshot(snap)) + + // Now restart the downed server. + s = c.restartServer(s) + + // Make the restarted server the eventual leader. + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + c.waitOnStreamLeader(globalAccountName, "TEST") + if sl := c.streamLeader(globalAccountName, "TEST"); sl != s { + sl.JetStreamStepdownStream(globalAccountName, "TEST") + return fmt.Errorf("Server %s is not leader yet", s) + } + return nil + }) + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_True(t, si.State.Msgs == 0) +} diff --git a/server/ocsp.go b/server/ocsp.go index fa8b2113..9c5dcb93 100644 --- a/server/ocsp.go +++ b/server/ocsp.go @@ -433,18 +433,18 @@ func (srv *Server) NewOCSPMonitor(config *tlsConfigKind) (*tls.Config, *OCSPMoni }, nil } - // Check whether need to verify staples from a client connection depending on the type. + // Check whether need to verify staples from a peer router or gateway connection. switch kind { - case kindStringMap[ROUTER], kindStringMap[GATEWAY], kindStringMap[LEAF]: + case kindStringMap[ROUTER], kindStringMap[GATEWAY]: tc.VerifyConnection = func(s tls.ConnectionState) error { oresp := s.OCSPResponse if oresp == nil { - return fmt.Errorf("%s client missing OCSP Staple", kind) + return fmt.Errorf("%s peer missing OCSP Staple", kind) } - // Client route connections will verify the response of the staple. + // Peer connections will verify the response of the staple. if len(s.VerifiedChains) == 0 { - return fmt.Errorf("%s client missing TLS verified chains", kind) + return fmt.Errorf("%s peer missing TLS verified chains", kind) } chain := s.VerifiedChains[0] @@ -453,7 +453,7 @@ func (srv *Server) NewOCSPMonitor(config *tlsConfigKind) (*tls.Config, *OCSPMoni resp, err := ocsp.ParseResponseForCert(oresp, leaf, parent) if err != nil { - return fmt.Errorf("failed to parse OCSP response from %s client: %w", kind, err) + return fmt.Errorf("failed to parse OCSP response from %s peer: %w", kind, err) } if resp.Certificate == nil { if err := resp.CheckSignatureFrom(parent); err != nil { @@ -475,13 +475,13 @@ func (srv *Server) NewOCSPMonitor(config *tlsConfigKind) (*tls.Config, *OCSPMoni } } if resp.Status != ocsp.Good { - return fmt.Errorf("bad status for OCSP Staple from %s client: %s", kind, ocspStatusString(resp.Status)) + return fmt.Errorf("bad status for OCSP Staple from %s peer: %s", kind, ocspStatusString(resp.Status)) } return nil } - // When server makes a client connection, need to also present an OCSP Staple. + // When server makes a peer connection, need to also present an OCSP Staple. tc.GetClientCertificate = func(info *tls.CertificateRequestInfo) (*tls.Certificate, error) { raw, _, err := mon.getStatus() if err != nil { @@ -558,12 +558,12 @@ func (s *Server) configureOCSP() []*tlsConfigKind { tlsConfig: config, tlsOpts: opts, apply: func(tc *tls.Config) { - // RequireAndVerifyClientCert is used to tell a client that it // should send the client cert to the server. - tc.ClientAuth = tls.RequireAndVerifyClientCert - // GetClientCertificate is used by a client to send the client cert - // to a server. We're a server, so we must not set this. + if opts.Verify { + tc.ClientAuth = tls.RequireAndVerifyClientCert + } + // We're a leaf hub server, so we must not set this. tc.GetClientCertificate = nil sopts.LeafNode.TLSConfig = tc }, @@ -580,8 +580,7 @@ func (s *Server) configureOCSP() []*tlsConfigKind { tlsConfig: config, tlsOpts: opts, apply: func(tc *tls.Config) { - // GetCertificate is used by a server to send the server cert to a - // client. We're a client, so we must not set this. + // We're a leaf client, so we must not set this. tc.GetCertificate = nil r.TLSConfig = tc }, diff --git a/server/raft.go b/server/raft.go index 6a4ed96b..a38efe9e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2507,6 +2507,10 @@ func (n *raft) applyCommit(index uint64) error { // Used to track a success response and apply entries. func (n *raft) trackResponse(ar *appendEntryResponse) { n.Lock() + if n.state == Closed { + n.Unlock() + return + } // Update peer's last index. if ps := n.peers[ar.peer]; ps != nil && ar.index > ps.li { @@ -2532,8 +2536,8 @@ func (n *raft) trackResponse(ar *appendEntryResponse) { if nr := len(results); nr >= n.qn { // We have a quorum. for index := n.commit + 1; index <= ar.index; index++ { - if err := n.applyCommit(index); err != nil { - n.error("Got an error apply commit for %d: %v", index, err) + if err := n.applyCommit(index); err != nil && err != errNodeClosed { + n.error("Got an error applying commit for %d: %v", index, err) break } } diff --git a/test/ocsp_test.go b/test/ocsp_test.go index 7cf30e64..c1d2a542 100644 --- a/test/ocsp_test.go +++ b/test/ocsp_test.go @@ -1255,6 +1255,7 @@ func TestOCSPLeaf(t *testing.T) { setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-06-cert.pem", ocsp.Good) setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-07-cert.pem", ocsp.Good) setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-08-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/client-cert.pem", ocsp.Good) // Store Dirs storeDirA := t.TempDir() @@ -1275,6 +1276,7 @@ func TestOCSPLeaf(t *testing.T) { timeout: 5 } store_dir: '%s' + leafnodes { host: "127.0.0.1" port: -1 @@ -1285,6 +1287,8 @@ func TestOCSPLeaf(t *testing.T) { key_file: "configs/certs/ocsp/server-status-request-url-02-key.pem" ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 + # Leaf connection must present certs. + verify: true } } ` @@ -1293,7 +1297,8 @@ func TestOCSPLeaf(t *testing.T) { srvA, optsA := RunServerWithConfig(sconfA) defer srvA.Shutdown() - // LeafNode that has the original as a remote. + // LeafNode that has the original as a remote and running + // without OCSP Stapling for the leaf remote. srvConfB := ` host: "127.0.0.1" port: -1 @@ -1307,12 +1312,14 @@ func TestOCSPLeaf(t *testing.T) { timeout: 5 } store_dir: '%s' + leafnodes { remotes: [ { url: "tls://127.0.0.1:%d" tls { - cert_file: "configs/certs/ocsp/server-status-request-url-04-cert.pem" - key_file: "configs/certs/ocsp/server-status-request-url-04-key.pem" + # Cert without OCSP Stapling enabled is able to connect. + cert_file: "configs/certs/ocsp/client-cert.pem" + key_file: "configs/certs/ocsp/client-key.pem" ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 } @@ -1341,19 +1348,19 @@ func TestOCSPLeaf(t *testing.T) { t.Fatal(err) } defer cA.Close() - // checkLeafNodeConnected(t, srvA) + checkLeafNodeConnected(t, srvA) // Revoke the seed server cluster certificate, following servers will not be able to verify connection. setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-02-cert.pem", ocsp.Revoked) - // Original set of servers still can communicate to each other, even though the cert has been revoked. - // checkLeafNodeConnected(t, srvA) + // Original set of servers still can communicate to each other via leafnode, even though the staple + // for the leaf server has been revoked. + checkLeafNodeConnected(t, srvA) - // Wait for seed server to notice that its certificate has been revoked, - // so that new leafnodes can't connect to it. + // Wait for seed server to notice that its certificate has been revoked. time.Sleep(6 * time.Second) - // Start another server against the seed server that has an invalid OCSP Staple + // Start another server against the seed server that has an revoked OCSP Staple. srvConfC := ` host: "127.0.0.1" port: -1 @@ -1417,7 +1424,8 @@ func TestOCSPLeaf(t *testing.T) { } defer cC.Close() - // There should be no connectivity between the clients due to the revoked staple. + // There should be connectivity between the clients even if there is a revoked staple + // from a leafnode connection. _, err = cA.Subscribe("foo", func(m *nats.Msg) { m.Respond(nil) }) @@ -1432,13 +1440,13 @@ func TestOCSPLeaf(t *testing.T) { t.Fatal(err) } cB.Flush() - resp, err := cC.Request("foo", nil, 2*time.Second) - if err == nil { - t.Errorf("Unexpected success, response: %+v", resp) + _, err = cC.Request("foo", nil, 2*time.Second) + if err != nil { + t.Errorf("Expected success, got: %+v", err) } - resp, err = cC.Request("bar", nil, 2*time.Second) - if err == nil { - t.Errorf("Unexpected success, response: %+v", resp) + _, err = cC.Request("bar", nil, 2*time.Second) + if err != nil { + t.Errorf("Expected success, got: %+v", err) } // Switch the certs from the leafnode server to new ones that are not revoked, @@ -1466,6 +1474,7 @@ func TestOCSPLeaf(t *testing.T) { key_file: "configs/certs/ocsp/server-status-request-url-08-key.pem" ca_file: "configs/certs/ocsp/ca-cert.pem" timeout: 5 + verify: true } } ` @@ -1502,6 +1511,508 @@ func TestOCSPLeaf(t *testing.T) { } } +func TestOCSPLeafNoVerify(t *testing.T) { + const ( + caCert = "configs/certs/ocsp/ca-cert.pem" + caKey = "configs/certs/ocsp/ca-key.pem" + ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ocspr := newOCSPResponder(t, caCert, caKey) + defer ocspr.Shutdown(ctx) + addr := fmt.Sprintf("http://%s", ocspr.Addr) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-01-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-02-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-03-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-04-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-05-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-06-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-07-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-08-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/client-cert.pem", ocsp.Good) + + // Store Dirs + storeDirA := t.TempDir() + storeDirB := t.TempDir() + storeDirC := t.TempDir() + + // LeafNode server configuration + srvConfA := ` + host: "127.0.0.1" + port: -1 + + server_name: "AAA" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-01-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-01-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + store_dir: '%s' + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-02-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-02-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + # Leaf server does not require certs for clients. + verify: false + } + } + ` + srvConfA = fmt.Sprintf(srvConfA, storeDirA) + sconfA := createConfFile(t, []byte(srvConfA)) + srvA, optsA := RunServerWithConfig(sconfA) + defer srvA.Shutdown() + + // LeafNode remote that will connect to A and will not present certs. + srvConfB := ` + host: "127.0.0.1" + port: -1 + + server_name: "BBB" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-03-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-03-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + store_dir: '%s' + + leafnodes { + remotes: [ { + url: "tls://127.0.0.1:%d" + tls { + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + } ] + } + ` + srvConfB = fmt.Sprintf(srvConfB, storeDirB, optsA.LeafNode.Port) + conf := createConfFile(t, []byte(srvConfB)) + srvB, optsB := RunServerWithConfig(conf) + defer srvB.Shutdown() + + // Client connects to server A. + cA, err := nats.Connect(fmt.Sprintf("tls://127.0.0.1:%d", optsA.Port), + nats.Secure(&tls.Config{ + VerifyConnection: func(s tls.ConnectionState) error { + if s.OCSPResponse == nil { + return fmt.Errorf("missing OCSP Staple from server") + } + return nil + }, + }), + nats.RootCAs(caCert), + nats.ErrorHandler(noOpErrHandler), + ) + if err != nil { + t.Fatal(err) + } + defer cA.Close() + checkLeafNodeConnected(t, srvA) + + // Revoke the seed server cluster certificate, following servers will not be able to verify connection. + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-02-cert.pem", ocsp.Revoked) + + // Original set of servers still can communicate to each other, even though the cert has been revoked. + checkLeafNodeConnected(t, srvA) + + // Wait for seed server to notice that its certificate has been revoked. + time.Sleep(6 * time.Second) + + // Start another server against the seed server that has an revoked OCSP Staple. + srvConfC := ` + host: "127.0.0.1" + port: -1 + + server_name: "CCC" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-05-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-05-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + store_dir: '%s' + leafnodes { + remotes: [ { + url: "tls://127.0.0.1:%d" + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-06-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-06-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + timeout: 5 + } + } ] + } + ` + srvConfC = fmt.Sprintf(srvConfC, storeDirC, optsA.LeafNode.Port) + conf = createConfFile(t, []byte(srvConfC)) + srvC, optsC := RunServerWithConfig(conf) + defer srvC.Shutdown() + + cB, err := nats.Connect(fmt.Sprintf("tls://127.0.0.1:%d", optsB.Port), + nats.Secure(&tls.Config{ + VerifyConnection: func(s tls.ConnectionState) error { + if s.OCSPResponse == nil { + return fmt.Errorf("missing OCSP Staple from server") + } + return nil + }, + }), + nats.RootCAs(caCert), + nats.ErrorHandler(noOpErrHandler), + ) + if err != nil { + t.Fatal(err) + } + defer cB.Close() + cC, err := nats.Connect(fmt.Sprintf("tls://127.0.0.1:%d", optsC.Port), + nats.Secure(&tls.Config{ + VerifyConnection: func(s tls.ConnectionState) error { + if s.OCSPResponse == nil { + return fmt.Errorf("missing OCSP Staple from server") + } + return nil + }, + }), + nats.RootCAs(caCert), + nats.ErrorHandler(noOpErrHandler), + ) + if err != nil { + t.Fatal(err) + } + defer cC.Close() + + // There should be connectivity between the clients even if there is a revoked staple + // from a leafnode connection. + _, err = cA.Subscribe("foo", func(m *nats.Msg) { + m.Respond(nil) + }) + if err != nil { + t.Errorf("%v", err) + } + cA.Flush() + _, err = cB.Subscribe("bar", func(m *nats.Msg) { + m.Respond(nil) + }) + if err != nil { + t.Fatal(err) + } + cB.Flush() + _, err = cC.Request("foo", nil, 2*time.Second) + if err != nil { + t.Errorf("Expected success, got: %+v", err) + } + _, err = cC.Request("bar", nil, 2*time.Second) + if err != nil { + t.Errorf("Expected success, got: %+v", err) + } + + // Switch the certs from the leafnode server to new ones that are not revoked, + // this should restart OCSP Stapling for the leafnode server. + srvConfA = ` + host: "127.0.0.1" + port: -1 + + server_name: "AAA" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-07-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-07-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + store_dir: '%s' + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-08-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-08-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + verify: true + } + } + ` + srvConfA = fmt.Sprintf(srvConfA, storeDirA) + if err := os.WriteFile(sconfA, []byte(srvConfA), 0666); err != nil { + t.Fatalf("Error writing config: %v", err) + } + if err := srvA.Reload(); err != nil { + t.Fatal(err) + } + time.Sleep(4 * time.Second) + + // A <-> A + _, err = cA.Request("foo", nil, 2*time.Second) + if err != nil { + t.Errorf("%v", err) + } + + // B <-> A + _, err = cB.Request("foo", nil, 2*time.Second) + if err != nil { + t.Errorf("%v", err) + } + + // C <-> A + _, err = cC.Request("foo", nil, 2*time.Second) + if err != nil { + t.Errorf("%v", err) + } + // C <-> B via leafnode A + _, err = cC.Request("bar", nil, 2*time.Second) + if err != nil { + t.Errorf("%v", err) + } +} + +func TestOCSPLeafVerifyLeafRemote(t *testing.T) { + const ( + caCert = "configs/certs/ocsp/ca-cert.pem" + caKey = "configs/certs/ocsp/ca-key.pem" + ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ocspr := newOCSPResponder(t, caCert, caKey) + defer ocspr.Shutdown(ctx) + addr := fmt.Sprintf("http://%s", ocspr.Addr) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-01-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-02-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-03-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-04-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-05-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-06-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-07-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-08-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/client-cert.pem", ocsp.Good) + + // Store Dirs + storeDirA := t.TempDir() + storeDirB := t.TempDir() + + // LeafNode server configuration + srvConfA := ` + host: "127.0.0.1" + port: -1 + + server_name: "AAA" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-01-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-01-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + store_dir: '%s' + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-02-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-02-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + verify: true + } + } + ` + srvConfA = fmt.Sprintf(srvConfA, storeDirA) + sconfA := createConfFile(t, []byte(srvConfA)) + srvA, optsA := RunServerWithConfig(sconfA) + defer srvA.Shutdown() + + // LeafNode remote that will connect to A and will not present certs. + srvConfB := ` + host: "127.0.0.1" + port: -1 + + server_name: "BBB" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-03-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-03-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + store_dir: '%s' + + leafnodes { + remotes: [ { + url: "tls://127.0.0.1:%d" + tls { + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + } ] + } + ` + srvConfB = fmt.Sprintf(srvConfB, storeDirB, optsA.LeafNode.Port) + conf := createConfFile(t, []byte(srvConfB)) + srvB, _ := RunServerWithConfig(conf) + defer srvB.Shutdown() + + // Client connects to server A. + cA, err := nats.Connect(fmt.Sprintf("tls://127.0.0.1:%d", optsA.Port), + nats.Secure(&tls.Config{ + VerifyConnection: func(s tls.ConnectionState) error { + if s.OCSPResponse == nil { + return fmt.Errorf("missing OCSP Staple from server") + } + return nil + }, + }), + nats.RootCAs(caCert), + nats.ErrorHandler(noOpErrHandler), + ) + if err != nil { + t.Fatal(err) + } + defer cA.Close() + + // Should not have been able to connect. + checkLeafNodeConnections(t, srvA, 0) +} + +func TestOCSPLeafVerifyAndMapLeafRemote(t *testing.T) { + const ( + caCert = "configs/certs/ocsp/ca-cert.pem" + caKey = "configs/certs/ocsp/ca-key.pem" + ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ocspr := newOCSPResponder(t, caCert, caKey) + defer ocspr.Shutdown(ctx) + addr := fmt.Sprintf("http://%s", ocspr.Addr) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-01-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-02-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-03-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-04-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-05-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-06-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-07-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/server-status-request-url-08-cert.pem", ocsp.Good) + setOCSPStatus(t, addr, "configs/certs/ocsp/client-cert.pem", ocsp.Good) + + // Store Dirs + storeDirA := t.TempDir() + storeDirB := t.TempDir() + + // LeafNode server configuration + srvConfA := ` + host: "127.0.0.1" + port: -1 + + server_name: "AAA" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-01-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-01-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + verify_and_map: true + } + store_dir: '%s' + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-02-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-02-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + verify_and_map: true + } + } + + accounts: { + leaf: { + users: [ {user: "C=US, ST=CA, L=San Francisco, O=Synadia, OU=nats.io, CN=localhost server-status-request-url-04"} ] + } + client: { + users: [ {user: "C=US, ST=CA, L=San Francisco, O=Synadia, OU=nats.io, CN=localhost client"} ] + } + } + + ` + srvConfA = fmt.Sprintf(srvConfA, storeDirA) + sconfA := createConfFile(t, []byte(srvConfA)) + srvA, optsA := RunServerWithConfig(sconfA) + defer srvA.Shutdown() + + // LeafNode remote that will connect to A and will not present certs. + srvConfB := ` + host: "127.0.0.1" + port: -1 + + server_name: "BBB" + + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-03-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-03-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + store_dir: '%s' + + leafnodes { + remotes: [ { + url: "tls://127.0.0.1:%d" + tls { + cert_file: "configs/certs/ocsp/server-status-request-url-04-cert.pem" + key_file: "configs/certs/ocsp/server-status-request-url-04-key.pem" + ca_file: "configs/certs/ocsp/ca-cert.pem" + timeout: 5 + } + } ] + } + ` + srvConfB = fmt.Sprintf(srvConfB, storeDirB, optsA.LeafNode.Port) + conf := createConfFile(t, []byte(srvConfB)) + srvB, _ := RunServerWithConfig(conf) + defer srvB.Shutdown() + + // Client connects to server A. + cA, err := nats.Connect(fmt.Sprintf("tls://127.0.0.1:%d", optsA.Port), + nats.Secure(&tls.Config{ + VerifyConnection: func(s tls.ConnectionState) error { + if s.OCSPResponse == nil { + return fmt.Errorf("missing OCSP Staple from server") + } + return nil + }, + }), + nats.ClientCert("./configs/certs/ocsp/client-cert.pem", "./configs/certs/ocsp/client-key.pem"), + nats.RootCAs(caCert), + nats.ErrorHandler(noOpErrHandler), + ) + if err != nil { + t.Fatal(err) + } + defer cA.Close() + checkLeafNodeConnections(t, srvA, 1) +} + func TestOCSPGateway(t *testing.T) { const ( caCert = "configs/certs/ocsp/ca-cert.pem"