From dd116fcfd4ecada200bd4dc8c7b72d213d549ece Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 3 Oct 2019 10:27:24 -0700 Subject: [PATCH] JetStream first pass basics. This is the first checkin for JetStream. Has some rudimentary basics working. TODO 1. Push vs pull mode for observables. (work queues) 2. Disk/File store, memory only for now. 3. clustering code - design shaping up well. 4. Finalize account import semantics. 5. Lots of other little things. Signed-off-by: Derek Collison --- main.go | 6 +- server/accounts.go | 2 +- server/client.go | 52 +++-- server/const.go | 6 + server/events.go | 41 ++-- server/jetstream.go | 356 +++++++++++++++++++++++++++++ server/memstore.go | 187 +++++++++++++++ server/memstore_test.go | 179 +++++++++++++++ server/msgset.go | 355 +++++++++++++++++++++++++++++ server/observable.go | 290 ++++++++++++++++++++++++ server/opts.go | 17 +- server/parser.go | 1 + server/server.go | 105 +++++++-- server/store.go | 48 ++++ server/sysmem/mem_bsd.go | 20 ++ server/sysmem/mem_darwin.go | 20 ++ server/sysmem/mem_linux.go | 27 +++ server/sysmem/mem_windows.go | 44 ++++ server/sysmem/sysctl.go | 31 +++ test/bench_test.go | 114 ++++++++++ test/jetstream_test.go | 426 +++++++++++++++++++++++++++++++++++ test/test.go | 6 + 22 files changed, 2275 insertions(+), 58 deletions(-) create mode 100644 server/jetstream.go create mode 100644 server/memstore.go create mode 100644 server/memstore_test.go create mode 100644 server/msgset.go create mode 100644 server/observable.go create mode 100644 server/store.go create mode 100644 server/sysmem/mem_bsd.go create mode 100644 server/sysmem/mem_darwin.go create mode 100644 server/sysmem/mem_linux.go create mode 100644 server/sysmem/mem_windows.go create mode 100644 server/sysmem/sysctl.go create mode 100644 test/jetstream_test.go diff --git a/main.go b/main.go index e93834b6..f49d76b0 100644 --- a/main.go +++ b/main.go @@ -31,10 +31,10 @@ Server Options: -m, --http_port Use port for http monitoring -ms,--https_port Use port for https monitoring -c, --config Configuration file + -t Test configuration and exit -sl,--signal [=] Send signal to nats-server process (stop, quit, reopen, reload) can be either a PID (e.g. 1) or the path to a PID file (e.g. /var/run/nats-server.pid) --client_advertise Client URL to advertise to other servers - -t Test configuration and exit Logging Options: -l, --log File to redirect log output @@ -47,6 +47,9 @@ Logging Options: -DV Debug and trace -DVV Debug and verbose trace (traces system account as well) +JetStream Options: + -js, --jetstream Enable JetStream functionality. + Authorization Options: --user User required for connections --pass Password required for connections @@ -66,7 +69,6 @@ Cluster Options: --cluster_advertise Cluster URL to advertise to other servers --connect_retries For implicit routes, number of connect retries - Common Options: -h, --help Show this message -v, --version Show version diff --git a/server/accounts.go b/server/accounts.go index e1a718f4..e3477b59 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -30,7 +30,7 @@ import ( // For backwards compatibility with NATS < 2.0, users who are not explicitly defined into an // account will be grouped in the default global account. -const globalAccountName = "$G" +const globalAccountName = DEFAULT_GLOBAL_ACCOUNT // Account are subject namespace definitions. By default no messages are shared between accounts. // You can share via Exports and Imports of Streams and Services. diff --git a/server/client.go b/server/client.go index 7d7a1ad3..5bc074ea 100644 --- a/server/client.go +++ b/server/client.go @@ -44,6 +44,8 @@ const ( SYSTEM // LEAF is for leaf node connections. LEAF + // JETSTREAM is an internal jetstream client. + JETSTREAM ) const ( @@ -382,6 +384,7 @@ type subscription struct { client *client im *streamImport // This is for import stream support. shadow []*subscription // This is to track shadowed accounts. + icb msgHandler subject []byte queue []byte sid []byte @@ -489,6 +492,8 @@ func (c *client) initClient() { c.ncs = fmt.Sprintf("%s - lid:%d", conn, c.cid) case SYSTEM: c.ncs = "SYSTEM" + case JETSTREAM: + c.ncs = "JETSTREAM" } } @@ -813,6 +818,7 @@ func (c *client) writeLoop() { // message that now is being delivered. func (c *client) flushClients(budget time.Duration) time.Time { last := time.Now() + // Check pending clients for flush. for cp := range c.pcd { // TODO(dlc) - Wonder if it makes more sense to create a new map? @@ -976,7 +982,7 @@ func (c *client) readLoop() { return } - if cpacc && start.Sub(lpacc) >= closedSubsCheckInterval { + if cpacc && (start.Sub(lpacc)) >= closedSubsCheckInterval { c.pruneClosedSubFromPerAccountCache() lpacc = time.Now() } @@ -1053,15 +1059,17 @@ func (c *client) flushOutbound() bool { c.mu.Unlock() // flush here - now := time.Now() + start := time.Now() + // FIXME(dlc) - writev will do multiple IOs past 1024 on // most platforms, need to account for that with deadline? - nc.SetWriteDeadline(now.Add(wdl)) + nc.SetWriteDeadline(start.Add(wdl)) // Actual write to the socket. n, err := nb.WriteTo(nc) nc.SetWriteDeadline(time.Time{}) - lft := time.Since(now) + + lft := time.Since(start) // Re-acquire client lock. c.mu.Lock() @@ -1561,7 +1569,7 @@ func (c *client) maxPayloadViolation(sz int, max int32) { } // queueOutbound queues data for a clientconnection. -// Return if the data is referenced or not. If referenced, the caller +// Returns if the data is referenced or not. If referenced, the caller // should not reuse the `data` array. // Lock should be held. func (c *client) queueOutbound(data []byte) bool { @@ -1933,8 +1941,8 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) sid := string(sub.sid) - // This check does not apply to SYSTEM clients (because they don't have a `nc`...) - if kind != SYSTEM && c.isClosed() { + // This check does not apply to SYSTEM or JETSTREAM clients (because they don't have a `nc`...) + if c.isClosed() && (kind != SYSTEM && kind != JETSTREAM) { c.mu.Unlock() return sub, nil } @@ -2490,11 +2498,15 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b atomic.AddInt64(&srv.outMsgs, 1) atomic.AddInt64(&srv.outBytes, msgSize) - // Check for internal subscription. - if client.kind == SYSTEM { + // Check for internal subscriptions. + if client.kind == SYSTEM || client.kind == JETSTREAM { s := client.srv client.mu.Unlock() - s.deliverInternalMsg(sub, c, subject, c.pa.reply, msg[:msgSize]) + if sub.icb == nil { + s.Debugf("Received internal callback with no registered handler") + return false + } + sub.icb(sub, c, string(subject), string(c.pa.reply), msg[:msgSize]) return true } @@ -2865,7 +2877,13 @@ func (c *client) processInboundClientMsg(msg []byte) { atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { flag |= pmrCollectQueueNames } - qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, flag) + // With JetStream we now have times where we want to match a subsctiption + // on one subject, but deliver it with abother. e.g. JetStream deliverables. + subj := c.pa.subject + if len(c.pa.deliver) > 0 { + subj = c.pa.deliver + } + qnames = c.processMsgResults(c.acc, r, msg, subj, c.pa.reply, flag) } // Now deal with gateways @@ -3526,6 +3544,14 @@ func (c *client) closeConnection(reason ClosedState) { func (c *client) teardownConn() { c.mu.Lock() + // Be consistent with the creation: for routes and gateways, + // we use Noticef on create, so use that too for delete. + if c.kind == ROUTER || c.kind == GATEWAY { + c.Noticef("%s connection closed", c.typeString()) + } else { // Client, System, Jetstream and Leafnode connections. + c.Debugf("%s connection closed", c.typeString()) + } + c.clearAuthTimer() c.clearPingTimer() // Unblock anyone who is potentially stalled waiting on us. @@ -3576,8 +3602,8 @@ func (c *client) teardownConn() { c.mu.Unlock() - // Remove client's or leaf node subscriptions. - if (kind == CLIENT || kind == LEAF) && acc != nil { + // Remove client's or leaf node or jetstream subscriptions. + if acc != nil && (kind == CLIENT || kind == LEAF || kind == JETSTREAM) { acc.sl.RemoveBatch(subs) } else if kind == ROUTER { go c.removeRemoteSubs() diff --git a/server/const.go b/server/const.go index df88ec79..9e35f382 100644 --- a/server/const.go +++ b/server/const.go @@ -182,4 +182,10 @@ const ( // DEFAULT_SERVICE_LATENCY_SAMPLING is the default sampling rate for service // latency metrics DEFAULT_SERVICE_LATENCY_SAMPLING = 100 + + // DEFAULT_SYSTEM_ACCOUNT + DEFAULT_SYSTEM_ACCOUNT = "$SYS" + + // DEFAULT GLOBAL_ACCOUNT + DEFAULT_GLOBAL_ACCOUNT = "$G" ) diff --git a/server/events.go b/server/events.go index 857f6e46..018e5c68 100644 --- a/server/events.go +++ b/server/events.go @@ -67,11 +67,10 @@ type internal struct { account *Account client *client seq uint64 - sid uint64 + sid int servers map[string]*serverUpdate sweeper *time.Timer stmr *time.Timer - subs map[string]msgHandler replies map[string]msgHandler sendq chan *pubMsg resetCh chan struct{} @@ -255,7 +254,14 @@ RESET: } var b []byte if pm.msg != nil { - b, _ = json.MarshalIndent(pm.msg, _EMPTY_, " ") + switch v := pm.msg.(type) { + case string: + b = []byte(v) + case []byte: + b = v + default: + b, _ = json.MarshalIndent(pm.msg, _EMPTY_, " ") + } } c.mu.Lock() // We can have an override for account here. @@ -312,7 +318,6 @@ func (s *Server) sendShutdownEvent() { // Stop any more messages from queueing up. s.sys.sendq = nil // Unhook all msgHandlers. Normal client cleanup will deal with subs, etc. - s.sys.subs = nil s.sys.replies = nil s.mu.Unlock() // Send to the internal queue and mark as last. @@ -1096,19 +1101,6 @@ func (s *Server) sendAuthErrorEvent(c *client) { // required to be copied. type msgHandler func(sub *subscription, client *client, subject, reply string, msg []byte) -func (s *Server) deliverInternalMsg(sub *subscription, c *client, subject, reply, msg []byte) { - s.mu.Lock() - if !s.eventsEnabled() || s.sys.subs == nil { - s.mu.Unlock() - return - } - cb := s.sys.subs[string(sub.sid)] - s.mu.Unlock() - if cb != nil { - cb(sub, c, string(subject), string(reply), msg) - } -} - // Create an internal subscription. No support for queue groups atm. func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) { return s.systemSubscribe(subject, false, cb) @@ -1127,11 +1119,10 @@ func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandle return nil, fmt.Errorf("undefined message handler") } s.mu.Lock() - sid := strconv.FormatInt(int64(s.sys.sid), 10) - s.sys.subs[sid] = cb - s.sys.sid++ c := s.sys.client trace := c.trace + s.sys.sid++ + sid := s.sys.sid s.mu.Unlock() arg := []byte(subject + " " + sid) @@ -1140,7 +1131,14 @@ func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandle } // Now create the subscription - return c.processSub(arg, internalOnly) + sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(sid)), internalOnly) + if err != nil { + return nil, err + } + c.mu.Lock() + sub.icb = cb + c.mu.Unlock() + return sub, nil } func (s *Server) sysUnsubscribe(sub *subscription) { @@ -1150,7 +1148,6 @@ func (s *Server) sysUnsubscribe(sub *subscription) { s.mu.Lock() acc := s.sys.account c := s.sys.client - delete(s.sys.subs, string(sub.sid)) s.mu.Unlock() c.unsubscribe(acc, sub, true, true) } diff --git a/server/jetstream.go b/server/jetstream.go new file mode 100644 index 00000000..5b27d1b4 --- /dev/null +++ b/server/jetstream.go @@ -0,0 +1,356 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "io/ioutil" + "math" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/nats-io/nats-server/v2/server/sysmem" +) + +// JetStreamConfig determines this server's configuration. +// MaxMemory and MaxStore are in bytes. +type JetStreamConfig struct { + MaxMemory int64 + MaxStore int64 + StoreDir string +} + +// This is for internal accounting for JetStream for this server. +type jetStream struct { + mu sync.RWMutex + srv *Server + config JetStreamConfig + accounts map[*Account]*jsAccount + // TODO(dlc) - need to track + // memUsage int64 + // diskUsage int64 +} + +// TODO(dlc) - decide on what these look like for single vs cluster vs supercluster. +// TODO(dlc) - need to track and rollup against server limits, etc. +type JetStreamAccountLimits struct { + MaxMemory int64 + MaxStore int64 + MaxMsgSets int +} + +const ( + // OK response + JsOK = "+OK" + + // JsEnabled allows a user to dynamically check if JetStream is enabled + // with a simple request. + // Will return +OK on success and will timeout if not imported. + JsEnabledExport = "$JS.*.ENABLED" + JsEnabled = "$JS.ENABLED" + + // jsInfo is for obtaining general information about JetStream for this account. + // Will return JSON response. + JsInfoExport = "$JS.*.INFO" + JsInfo = "$JS.INFO" + + // JsCreatObservable is the endpoint to create observers for a message set. + // Will return +OK on success and -ERR on failure. + JsCreateObservableExport = "$JS.*.OBSERVABLE.CREATE" + JsCreateObservable = "$JS.OBSERVABLE.CREATE" + + // JsCreatMsgSet is the endpoint to create message sets. + // Will return +OK on success and -ERR on failure. + JsCreateMsgSetExport = "$JS.*.MSGSET.CREATE" + JsCreateMsgSet = "$JS.MSGSET.CREATE" + + // JsMsgSetInfo is for obtaining general information about a named message set. + // Will return JSON response. + JsMsgSetInfoExport = "$JS.*.MSGSET.INFO" + JsMsgSetInfo = "$JS.MSGSET.INFO" + + // JsAckPre is the prefix for the ack stream coming back to observable. + JsAckPre = "$JS.A" +) + +// For easier handling of exports and imports. +var allJsExports = []string{JsEnabledExport, JsInfoExport, JsCreateObservableExport, JsCreateMsgSetExport, JsMsgSetInfoExport} + +// This represents a jetstream enabled account. +// Worth noting that we include the js ptr, this is because +// in general we want to be very efficient when receiving messages on +// and internal sub for a msgSet, so we will direct link to the msgSet +// and walk backwards as needed vs multiple hash lookups and locks, etc. +type jsAccount struct { + mu sync.Mutex + js *jetStream + account *Account + limits JetStreamAccountLimits + msgSets map[string]*MsgSet +} + +// EnableJetStream will enable JetStream support on this server with +// the given configuration. A nil configuration will dynamically choose +// the limits and temporary file storage directory. +// If this server is part of a cluster, a system account will need to be defined. +// This allows the server to send and receive messages. +func (s *Server) EnableJetStream(config *JetStreamConfig) error { + s.mu.Lock() + if !s.standAloneMode() { + s.mu.Unlock() + return fmt.Errorf("jetstream restricted to single server mode") + } + if s.js != nil { + s.mu.Unlock() + return fmt.Errorf("jetstream already enabled") + } + s.Noticef("Starting jetstream") + if config == nil { + s.Debugf("Jetstream creating dynamic configuration - 75%% system memory, %s disk", friendlyBytes(JetStreamMaxStoreDefault)) + config = s.dynJetStreamConfig() + } + s.js = &jetStream{srv: s, config: *config, accounts: make(map[*Account]*jsAccount)} + s.mu.Unlock() + + if stat, err := os.Stat(config.StoreDir); os.IsNotExist(err) { + if err := os.MkdirAll(config.StoreDir, 0755); err != nil { + return fmt.Errorf("could not create storage directory - %v", err) + } + } else { + // Make sure its a directory and that we can write to it. + if stat == nil || !stat.IsDir() { + return fmt.Errorf("storage directory is not a directory") + } + tmpfile, err := ioutil.TempFile(config.StoreDir, "_test_") + if err != nil { + return fmt.Errorf("storage directory is not writable") + } + os.Remove(tmpfile.Name()) + // TODO(dlc) - Recover state + } + + // JetStream is an internal service so we need to make sure we have a system account. + // This system account will export the JetStream service endpoints. + if sacc := s.SystemAccount(); sacc == nil { + s.SetDefaultSystemAccount() + } + + // Setup our internal subscriptions. + if _, err := s.sysSubscribe(JsEnabledExport, s.isJsEnabledRequest); err != nil { + return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err) + } + + s.Noticef("----------- jetstream config -----------") + s.Noticef(" MaxMemory: %s", friendlyBytes(config.MaxMemory)) + s.Noticef(" MaxStore: %s", friendlyBytes(config.MaxStore)) + s.Noticef(" StoreDir: %q", config.StoreDir) + + // If not in operator mode, setup our internal system exports if needed. + // If we are in operator mode, the system account will need to have them + // added externally. + if !s.inOperatorMode() { + sacc := s.SystemAccount() + // FIXME(dlc) - Should we lock these down? + s.Debugf(" Exports:") + for _, export := range allJsExports { + s.Debugf(" %s", export) + if err := sacc.AddServiceExport(export, nil); err != nil { + return fmt.Errorf("Error setting up jetstream service exports: %v", err) + } + } + } + s.Noticef("----------------------------------------") + + // If we have no configured accounts setup then setup imports on global account. + if s.globalAccountOnly() && !s.inOperatorMode() { + if err := s.JetStreamEnableAccount(s.GlobalAccount(), JetStreamAccountLimitsNoLimits); err != nil { + return fmt.Errorf("Error enabling jetstream on the global account") + } + } + + return nil +} + +// JetStreamEnabled reports if jetstream is enabled. +func (s *Server) JetStreamEnabled() bool { + s.mu.Lock() + enabled := s.js != nil + s.mu.Unlock() + return enabled +} + +// JetStreamConfig will return the current config. Useful if the system +// created a dynamic configuration. A copy is returned. +func (s *Server) JetStreamConfig() *JetStreamConfig { + var c *JetStreamConfig + s.mu.Lock() + if s.js != nil { + copy := s.js.config + c = &(copy) + } + s.mu.Unlock() + return c +} + +// JetStreamAccountLimitsNoLimits defines no limits for the given account. +var JetStreamAccountLimitsNoLimits = &JetStreamAccountLimits{-1, -1, -1} + +// JetStreamEnableAccount enables jetstream capabilties for the given account with the defined limits. +func (s *Server) JetStreamEnableAccount(a *Account, limits *JetStreamAccountLimits) error { + js := s.getJetStream() + if js == nil { + return fmt.Errorf("jetstream not enabled") + } + if a, err := s.LookupAccount(a.Name); err != nil || a == nil { + return fmt.Errorf("jetstream unknown account") + } + + // TODO(dlc) - Should we have accounts marked to allow? + // TODO(dlc) - Should we check and warn if these limits are above the server? Maybe only if single server? + + js.mu.Lock() + if _, ok := js.accounts[a]; ok { + js.mu.Unlock() + return fmt.Errorf("jetstream already enabled for account") + } + jsa := &jsAccount{js: js, account: a, limits: *limits, msgSets: make(map[string]*MsgSet)} + js.accounts[a] = jsa + js.mu.Unlock() + + // If we are not in operator mode we should create the proper imports here. + if !s.inOperatorMode() { + sys := s.SystemAccount() + for _, export := range allJsExports { + importTo := strings.Replace(export, "*", a.Name, -1) + importFrom := strings.Replace(export, ".*.", tsep, -1) + // fmt.Printf("export is %q %q \n", importTo, importFrom) + if err := a.AddServiceImport(sys, importFrom, importTo); err != nil { + return fmt.Errorf("Error setting up jetstream service imports for global account: %v", err) + } + } + } + + s.Debugf("Enabled jetstream for account: %q", a.Name) + return nil +} + +func (s *Server) JetStreamDisableAccount(a *Account) error { + js := s.getJetStream() + if js == nil { + return fmt.Errorf("jetstream not enabled") + } + + js.mu.Lock() + defer js.mu.Unlock() + + if _, ok := js.accounts[a]; !ok { + return fmt.Errorf("jetstream not enabled for account") + } + delete(js.accounts, a) + + return nil +} + +// JetStreamEnabled is a helper to determine if jetstream is enabled for an account. +func (a *Account) JetStreamEnabled() bool { + if a == nil { + return false + } + a.mu.RLock() + s := a.srv + a.mu.RUnlock() + if s == nil { + return false + } + js := s.getJetStream() + if js == nil { + return false + } + js.mu.Lock() + _, ok := js.accounts[a] + js.mu.Unlock() + + return ok +} + +// JetStreamNumAccounts returns the number of enabled accounts this server is tracking. +func (s *Server) JetStreamNumAccounts() int { + js := s.getJetStream() + if js == nil { + return 0 + } + js.mu.Lock() + defer js.mu.Unlock() + return len(js.accounts) +} + +func (s *Server) getJetStream() *jetStream { + s.mu.Lock() + js := s.js + s.mu.Unlock() + return js +} + +func (js *jetStream) lookupAccount(a *Account) *jsAccount { + js.mu.RLock() + jsa := js.accounts[a] + js.mu.RUnlock() + return jsa +} + +// Request to check if jetstream is enabled. +func (s *Server) isJsEnabledRequest(sub *subscription, c *client, subject, reply string, msg []byte) { + if c != nil && c.acc != nil && c.acc.JetStreamEnabled() { + s.sendInternalAccountMsg(c.acc, reply, JsOK) + } +} + +const ( + // JetStreamStoreDir is the prefix we use. + JetStreamStoreDir = "nats-jetstream" + // JetStreamMaxStoreDefault is the default disk storage limit. 1TB + JetStreamMaxStoreDefault = 1024 * 1024 * 1024 * 1024 + // JetStreamMaxMemDefault is only used when we can't determine system memory. 256MB + JetStreamMaxMemDefault = 1024 * 1024 * 256 +) + +// Dynamically create a config with a tmp based directory (repeatable) and 75% of system memory. +func (s *Server) dynJetStreamConfig() *JetStreamConfig { + jsc := &JetStreamConfig{} + jsc.StoreDir = filepath.Join(os.TempDir(), JetStreamStoreDir) + jsc.MaxStore = JetStreamMaxStoreDefault + // Estimate to 75% of total memory if we can determine system memory. + if sysMem := sysmem.Memory(); sysMem > 0 { + jsc.MaxMemory = sysMem / 4 * 3 + } else { + jsc.MaxMemory = JetStreamMaxMemDefault + } + return jsc +} + +// friendlyBytes returns a string with the given bytes int64 +// represented as a size, such as 1KB, 10MB, etc... +func friendlyBytes(bytes int64) string { + fbytes := float64(bytes) + base := 1024 + pre := []string{"K", "M", "G", "T", "P", "E"} + if fbytes < float64(base) { + return fmt.Sprintf("%v B", fbytes) + } + exp := int(math.Log(fbytes) / math.Log(float64(base))) + index := exp - 1 + return fmt.Sprintf("%.2f %sB", fbytes/math.Pow(float64(base), float64(exp)), pre[index]) +} diff --git a/server/memstore.go b/server/memstore.go new file mode 100644 index 00000000..48454296 --- /dev/null +++ b/server/memstore.go @@ -0,0 +1,187 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "sort" + "sync" + "time" +) + +// TODO(dlc) - This is a fairly simplistic approach but should do for now. +type memStore struct { + mu sync.RWMutex + stats MsgSetStats + msgs map[uint64]*storedMsg + ageChk *time.Timer + config MsgSetConfig +} + +type storedMsg struct { + subj string + msg []byte + seq uint64 + ts int64 // nanoseconds +} + +func newMemStore(cfg *MsgSetConfig) (*memStore, error) { + if cfg == nil { + return nil, fmt.Errorf("config required for MsgSetStore") + } + if cfg.Storage != MemoryStorage { + return nil, fmt.Errorf("memStore requires memory storage type in cfg") + } + ms := &memStore{msgs: make(map[uint64]*storedMsg), config: *cfg} + // This only happens once, so ok to call here. + return ms, nil +} + +// Store stores a message. +// both arguments should be copied. +func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, error) { + ms.mu.Lock() + seq := ms.stats.LastSeq + 1 + if ms.stats.FirstSeq == 0 { + ms.stats.FirstSeq = seq + } + + // Make copies - https://github.com/go101/go101/wiki + // TODO(dlc) - Maybe be smarter here. + msg = append(msg[:0:0], msg...) + + ms.msgs[seq] = &storedMsg{subj, msg, seq, time.Now().UnixNano()} + ms.stats.Msgs++ + ms.stats.Bytes += memStoreMsgSize(subj, msg) + ms.stats.LastSeq = seq + + // Limits checks and enforcement. + ms.enforceMsgLimit() + ms.enforceBytesLimit() + + // Check it we have and need age expiration timer running. + if ms.ageChk == nil && ms.config.MaxAge != 0 { + ms.startAgeChk() + } + ms.mu.Unlock() + + return seq, nil +} + +// GetSeqFromTime looks for the first sequence number that has the message +// with >= timestamp. +func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { + ts := t.UnixNano() + ms.mu.RLock() + defer ms.mu.RUnlock() + if len(ms.msgs) == 0 { + return ms.stats.LastSeq + 1 + } + if ts <= ms.msgs[ms.stats.FirstSeq].ts { + return ms.stats.FirstSeq + } + last := ms.msgs[ms.stats.LastSeq].ts + if ts == last { + return ms.stats.LastSeq + } + if ts > last { + return ms.stats.LastSeq + 1 + } + index := sort.Search(len(ms.msgs), func(i int) bool { + return ms.msgs[uint64(i)+ms.stats.FirstSeq].ts >= ts + }) + return uint64(index) + ms.stats.FirstSeq +} + +// Will check the msg limit and drop firstSeq msg if needed. +// Lock should be held. +func (ms *memStore) enforceMsgLimit() { + if ms.config.MaxMsgs == 0 || ms.stats.Msgs <= ms.config.MaxMsgs { + return + } + ms.deleteFirstMsgOrPanic() +} + +// Will check the bytes limit and drop msgs if needed. +// Lock should be held. +func (ms *memStore) enforceBytesLimit() { + if ms.config.MaxBytes == 0 || ms.stats.Bytes <= ms.config.MaxBytes { + return + } + for bs := ms.stats.Bytes; bs > ms.config.MaxBytes; bs = ms.stats.Bytes { + ms.deleteFirstMsgOrPanic() + } +} + +// Will start the age check timer. +// Lock should be held. +func (ms *memStore) startAgeChk() { + if ms.ageChk == nil && ms.config.MaxAge != 0 { + ms.ageChk = time.AfterFunc(ms.config.MaxAge, ms.expireMsgs) + } +} + +// Will expire msgs that are too old. +func (ms *memStore) expireMsgs() { + ms.mu.Lock() + defer ms.mu.Unlock() + + minAge := time.Now().UnixNano() - int64(ms.config.MaxAge) + for { + if sm, ok := ms.msgs[ms.stats.FirstSeq]; ok && sm.ts < minAge { + ms.deleteFirstMsgOrPanic() + } else { + return + } + } +} + +func (ms *memStore) deleteFirstMsgOrPanic() { + if !ms.deleteFirstMsg() { + panic("jetstream memstore has inconsistent state, can't find firstSeq msg") + } +} + +func (ms *memStore) deleteFirstMsg() bool { + sm, ok := ms.msgs[ms.stats.FirstSeq] + if !ok || sm == nil { + return false + } + delete(ms.msgs, ms.stats.FirstSeq) + ms.stats.FirstSeq++ + ms.stats.Msgs-- + ms.stats.Bytes -= memStoreMsgSize(sm.subj, sm.msg) + return true +} + +func (ms *memStore) Lookup(seq uint64) (string, []byte, int64, error) { + ms.mu.RLock() + sm, ok := ms.msgs[seq] + ms.mu.RUnlock() + + if !ok || sm == nil { + return "", nil, 0, ErrStoreMsgNotFound + } + return sm.subj, sm.msg, sm.ts, nil +} + +func (ms *memStore) Stats() MsgSetStats { + ms.mu.RLock() + defer ms.mu.RUnlock() + return ms.stats +} + +func memStoreMsgSize(subj string, msg []byte) uint64 { + return uint64(len(subj) + len(msg) + 16) // 8*2 for seq + age +} diff --git a/server/memstore_test.go b/server/memstore_test.go new file mode 100644 index 00000000..c9094b72 --- /dev/null +++ b/server/memstore_test.go @@ -0,0 +1,179 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "bytes" + "testing" + "time" +) + +func TestMemStoreBasics(t *testing.T) { + ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage}) + if err != nil { + t.Fatalf("Unexpected error creating store: %v", err) + } + subj, msg := "foo", []byte("Hello World") + if seq, err := ms.StoreMsg(subj, msg); err != nil { + t.Fatalf("Error storing msg: %v", err) + } else if seq != 1 { + t.Fatalf("Expected sequence to be 1, got %d", seq) + } + stats := ms.Stats() + if stats.Msgs != 1 { + t.Fatalf("Expected 1 msg, got %d", stats.Msgs) + } + expectedSize := memStoreMsgSize(subj, msg) + if stats.Bytes != expectedSize { + t.Fatalf("Expected %d bytes, got %d", expectedSize, stats.Bytes) + } + nsubj, nmsg, _, err := ms.Lookup(1) + if err != nil { + t.Fatalf("Unexpected error looking up msg: %v", err) + } + if nsubj != subj { + t.Fatalf("Subjects don't match, original %q vs %q", subj, nsubj) + } + if !bytes.Equal(nmsg, msg) { + t.Fatalf("Msgs don't match, original %q vs %q", msg, nmsg) + } +} + +func TestMemStoreMsgLimit(t *testing.T) { + maxMsgs := uint64(10) + ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage, MaxMsgs: maxMsgs}) + if err != nil { + t.Fatalf("Unexpected error creating store: %v", err) + } + subj, msg := "foo", []byte("Hello World") + for i := uint64(0); i < maxMsgs; i++ { + ms.StoreMsg(subj, msg) + } + stats := ms.Stats() + if stats.Msgs != maxMsgs { + t.Fatalf("Expected %d msgs, got %d", maxMsgs, stats.Msgs) + } + if _, err := ms.StoreMsg(subj, msg); err != nil { + t.Fatalf("Error storing msg: %v", err) + } + stats = ms.Stats() + if stats.Msgs != maxMsgs { + t.Fatalf("Expected %d msgs, got %d", maxMsgs, stats.Msgs) + } + if stats.LastSeq != 11 { + t.Fatalf("Expected the last sequence to be 11 now, but got %d", stats.LastSeq) + } + if stats.FirstSeq != 2 { + t.Fatalf("Expected the first sequence to be 2 now, but got %d", stats.FirstSeq) + } + // Make sure we can not lookup seq 1. + if _, _, _, err := ms.Lookup(1); err == nil { + t.Fatalf("Expected error looking up seq 1 but got none") + } +} + +func TestMemStoreBytesLimit(t *testing.T) { + subj, msg := "foo", make([]byte, 512) + storedMsgSize := memStoreMsgSize(subj, msg) + + toStore := uint64(1024) + maxBytes := storedMsgSize * toStore + + ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage, MaxBytes: maxBytes}) + if err != nil { + t.Fatalf("Unexpected error creating store: %v", err) + } + + for i := uint64(0); i < toStore; i++ { + ms.StoreMsg(subj, msg) + } + stats := ms.Stats() + if stats.Msgs != toStore { + t.Fatalf("Expected %d msgs, got %d", toStore, stats.Msgs) + } + if stats.Bytes != storedMsgSize*toStore { + t.Fatalf("Expected bytes to be %d, got %d", storedMsgSize*toStore, stats.Bytes) + } + + // Now send 10 more and check that bytes limit enforced. + for i := 0; i < 10; i++ { + if _, err := ms.StoreMsg(subj, msg); err != nil { + t.Fatalf("Error storing msg: %v", err) + } + } + stats = ms.Stats() + if stats.Msgs != toStore { + t.Fatalf("Expected %d msgs, got %d", toStore, stats.Msgs) + } + if stats.Bytes != storedMsgSize*toStore { + t.Fatalf("Expected bytes to be %d, got %d", storedMsgSize*toStore, stats.Bytes) + } + if stats.FirstSeq != 11 { + t.Fatalf("Expected first sequence to be 11, got %d", stats.FirstSeq) + } + if stats.LastSeq != toStore+10 { + t.Fatalf("Expected last sequence to be %d, got %d", toStore+10, stats.LastSeq) + } +} + +func TestMemStoreAgeLimit(t *testing.T) { + maxAge := 10 * time.Millisecond + ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage, MaxAge: maxAge}) + if err != nil { + t.Fatalf("Unexpected error creating store: %v", err) + } + // Store some messages. Does not really matter how many. + subj, msg := "foo", []byte("Hello World") + toStore := 100 + for i := 0; i < toStore; i++ { + ms.StoreMsg(subj, msg) + } + stats := ms.Stats() + if stats.Msgs != uint64(toStore) { + t.Fatalf("Expected %d msgs, got %d", toStore, stats.Msgs) + } + // Let them expire + time.Sleep(maxAge * 10) + stats = ms.Stats() + if stats.Msgs != 0 { + t.Fatalf("Expected no msgs, got %d", stats.Msgs) + } + if stats.Bytes != 0 { + t.Fatalf("Expected no bytes, got %d", stats.Bytes) + } +} + +func TestMemStoreTimeStamps(t *testing.T) { + ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage}) + if err != nil { + t.Fatalf("Unexpected error creating store: %v", err) + } + last := time.Now().UnixNano() + subj, msg := "foo", []byte("Hello World") + for i := 0; i < 10; i++ { + ms.StoreMsg(subj, msg) + time.Sleep(5 * time.Microsecond) + } + for seq := uint64(1); seq <= 10; seq++ { + _, _, ts, err := ms.Lookup(seq) + if err != nil { + t.Fatalf("Unexpected error looking up msg: %v", err) + } + // These should be different + if ts <= last { + t.Fatalf("Expected different timestamps, got %v", ts) + } + last = ts + } +} diff --git a/server/msgset.go b/server/msgset.go new file mode 100644 index 00000000..62c70f0e --- /dev/null +++ b/server/msgset.go @@ -0,0 +1,355 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "strconv" + "strings" + "sync" + "time" +) + +// MsgSet is a jetstream message set. When we receive a message internally destined +// for a MsgSet we will direct link from the client to this MsgSet structure. +type MsgSet struct { + mu sync.Mutex + sg *sync.Cond + sgw int + jsa *jsAccount + client *client + sid int + sendq chan *jsPubMsg + store MsgSetStore + obs map[string]*Observable + config MsgSetConfig +} + +// MsgSetConfig will determine the name, subjects and retention policy +// for a given message set. If subjects is empty the name will be used. +type MsgSetConfig struct { + Name string + Subjects []string + Retention RetentionPolicy + MaxMsgs uint64 + MaxBytes uint64 + MaxAge time.Duration + Storage StorageType + Replicas int +} + +// RetentionPolicy determines how messages in a set are retained. +type RetentionPolicy int + +const ( + // StreamPolicy (default) means that messages are retained until any possible given limit is reached. + // This could be any one of MaxMsgs, MaxBytes, or MaxAge. + StreamPolicy RetentionPolicy = iota + // InterestPolicy specifies that when all known subscribers have acknowledged a message it can be removed. + InterestPolicy + // WorkQueuePolicy specifies that when the first subscriber acknowledges the message it can be removed. + WorkQueuePolicy +) + +// JetStreamAddMsgSet adds a message set for the given account. +func (s *Server) JetStreamAddMsgSet(a *Account, config *MsgSetConfig) (*MsgSet, error) { + js := s.getJetStream() + if js == nil { + return nil, fmt.Errorf("jetstream not enabled") + } + + jsa := js.lookupAccount(a) + if jsa == nil { + return nil, fmt.Errorf("jetstream not enabled for account") + } + + // TODO(dlc) - check config for conflicts, e.g replicas > 1 in single server mode. + + jsa.mu.Lock() + if _, ok := jsa.msgSets[config.Name]; ok { + jsa.mu.Unlock() + return nil, fmt.Errorf("message set name already taken") + } + c := s.createInternalJetStreamClient() + mset := &MsgSet{jsa: jsa, config: *config, client: c, obs: make(map[string]*Observable)} + mset.sg = sync.NewCond(&mset.mu) + + if len(mset.config.Subjects) == 0 { + mset.config.Subjects = append(mset.config.Subjects, mset.config.Name) + } + jsa.msgSets[config.Name] = mset + jsa.mu.Unlock() + + // Bind to the account. + c.registerWithAccount(a) + + // Create the appropriate storage + if err := mset.setupStore(); err != nil { + mset.delete() + return nil, err + } + // Setup our internal send go routine. + mset.setupSendCapabilities() + + // Setup subscriptions + if err := mset.subscribeToMsgSet(); err != nil { + mset.delete() + return nil, err + } + + return mset, nil +} + +// JetStreamDeleteMsgSet will delete a message set. +func (s *Server) JetStreamDeleteMsgSet(mset *MsgSet) error { + return mset.Delete() +} + +// Delete deletes a message set. +func (mset *MsgSet) Delete() error { + mset.mu.Lock() + jsa := mset.jsa + mset.mu.Unlock() + if jsa == nil { + return fmt.Errorf("jetstream not enabled for account") + } + jsa.mu.Lock() + delete(jsa.msgSets, mset.config.Name) + jsa.mu.Unlock() + + return mset.delete() +} + +func (mset *MsgSet) Purge() error { + return fmt.Errorf("NO IMPL") +} + +// Will create internal subscriptions for the msgSet. +// Lock should be held. +func (mset *MsgSet) subscribeToMsgSet() error { + for _, subject := range mset.config.Subjects { + if _, err := mset.subscribeInternal(subject, mset.processInboundJetStreamMsg); err != nil { + return err + } + } + return nil +} + +// FIXME(dlc) - This only works in single server mode for the moment. Need to fix as we expand to clusters. +func (mset *MsgSet) subscribeInternal(subject string, cb msgHandler) (*subscription, error) { + return mset.nmsSubscribeInternal(subject, true, cb) +} + +func (mset *MsgSet) nmsSubscribeInternal(subject string, internalOnly bool, cb msgHandler) (*subscription, error) { + c := mset.client + if c == nil { + return nil, fmt.Errorf("invalid message set") + } + if !c.srv.eventsEnabled() { + return nil, ErrNoSysAccount + } + if cb == nil { + return nil, fmt.Errorf("undefined message handler") + } + + mset.sid++ + + // Now create the subscription + sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(mset.sid)), internalOnly) + if err != nil { + return nil, err + } + c.mu.Lock() + sub.icb = cb + c.mu.Unlock() + return sub, nil +} + +// Lock should be held. +func (mset *MsgSet) unsubscribe(sub *subscription) { + if sub == nil || mset.client == nil { + return + } + mset.client.unsubscribe(mset.client.acc, sub, true, true) +} + +func (mset *MsgSet) setupStore() error { + mset.mu.Lock() + defer mset.mu.Unlock() + + switch mset.config.Storage { + case MemoryStorage: + ms, err := newMemStore(&mset.config) + if err != nil { + return err + } + mset.store = ms + case DiskStorage: + return fmt.Errorf("NO IMPL") + } + return nil +} + +func (mset *MsgSet) processInboundJetStreamMsg(_ *subscription, _ *client, subject, reply string, msg []byte) { + mset.mu.Lock() + store := mset.store + c := mset.client + mset.mu.Unlock() + + if c == nil { + return + } + + // FIXME(dlc) - Not inline unless memory based. + + msg = append(msg[:0:0], msg...) + + if _, err := store.StoreMsg(subject, msg); err != nil { + mset.mu.Lock() + s := c.srv + accName := c.acc.Name + name := mset.config.Name + mset.mu.Unlock() + s.Errorf("Jetstream failed to store msg on account: %q message set: %q - %v", accName, name, err) + // TODO(dlc) - Send err here + return + } + // Send Ack here. + if len(reply) > 0 { + mset.sendq <- &jsPubMsg{reply, "", "", []byte(JsOK)} + } + + mset.signalObservers() +} + +func (mset *MsgSet) signalObservers() { + mset.mu.Lock() + if mset.sgw > 0 { + mset.sg.Broadcast() + } + mset.mu.Unlock() +} + +type jsPubMsg struct { + subj string + dsubj string + reply string + msg []byte +} + +// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering +const nmsSendQSize = 1024 + +// This is similar to system semantics but did not want to overload the single system sendq, +// or require system account when doing simple setup with jetstream. +func (mset *MsgSet) setupSendCapabilities() { + mset.mu.Lock() + defer mset.mu.Unlock() + if mset.sendq != nil { + return + } + mset.sendq = make(chan *jsPubMsg, nmsSendQSize) + go mset.internalSendLoop() +} + +func (mset *MsgSet) internalSendLoop() { + mset.mu.Lock() + c := mset.client + if c == nil { + mset.mu.Unlock() + return + } + s := c.srv + sendq := mset.sendq + name := mset.config.Name + mset.mu.Unlock() + + // Warn when internal send queue is backed up past 75% + warnThresh := 3 * nmsSendQSize / 4 + warnFreq := time.Second + last := time.Now().Add(-warnFreq) + + for { + if len(sendq) > warnThresh && time.Since(last) >= warnFreq { + s.Warnf("Jetstream internal send queue > 75% for account: %q message set: %q", c.acc.Name, name) + last = time.Now() + } + select { + case pm := <-sendq: + if pm == nil { + return + } + c.pa.subject = []byte(pm.subj) + c.pa.deliver = []byte(pm.dsubj) + c.pa.size = len(pm.msg) + c.pa.szb = []byte(strconv.Itoa(c.pa.size)) + c.pa.reply = []byte(pm.reply) + msg := append(pm.msg, _CRLF_...) + c.processInboundClientMsg(msg) + c.pa.szb = nil + c.flushClients(0) + case <-s.quitCh: + return + } + } +} + +func (mset *MsgSet) delete() error { + var obs []*Observable + + mset.mu.Lock() + if mset.sendq != nil { + mset.sendq <- nil + } + c := mset.client + mset.client = nil + for _, o := range mset.obs { + obs = append(obs, o) + } + mset.mu.Unlock() + c.closeConnection(ClientClosed) + + for _, o := range obs { + o.Delete() + } + // TODO(dlc) - Clean up any storage, memory and disk + return nil +} + +// Returns a name that can be used as a single token for subscriptions. +// Really only need to replace token separators. +// Lock should be held +func (mset *MsgSet) cleanName() string { + return strings.Replace(mset.config.Name, tsep, "-", -1) +} + +// Stats will return the current stats for this message set. +func (mset *MsgSet) Stats() MsgSetStats { + // Currently rely on store. + // TODO(dlc) - This will need to change with clusters. + return mset.store.Stats() +} + +func (mset *MsgSet) waitForMsgs() { + mset.mu.Lock() + defer mset.mu.Unlock() + + if mset.client == nil { + return + } + + mset.sgw++ + mset.sg.Wait() + mset.sgw-- +} diff --git a/server/observable.go b/server/observable.go new file mode 100644 index 00000000..c8c2e456 --- /dev/null +++ b/server/observable.go @@ -0,0 +1,290 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "crypto/rand" + "crypto/sha256" + "fmt" + "sync" + "sync/atomic" + "time" +) + +type ObservableConfig struct { + Delivery string `json:"delivery_subject"` + Durable string `json:"durable_name,omitempty"` + StartSeq uint64 `json:"start_seq,omitempty"` + StartTime time.Time `json:"start_time,omitempty"` + DeliverAll bool `json:"deliver_all,omitempty"` + DeliverLast bool `json:"deliver_last,omitempty"` + AckPolicy AckPolicy `json:"ack_policy"` +} + +// AckPolicy determines how the observable shoulc acknowledge delivered messages. +type AckPolicy int + +const ( + // AckExplicit requires ack or nack for all messages. + AckExplicit AckPolicy = iota + // When acking a sequence number, this implicitly acks all sequences below this one as well. + AckAll + // AckNone requires no acks for delivered messages. + AckNone +) + +// Observable is a jetstream observable/subscriber. +type Observable struct { + mu sync.Mutex + name string + mset *MsgSet + seq uint64 + dsubj string + ackSub *subscription + ackReply string + config ObservableConfig +} + +func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) { + if config == nil { + return nil, fmt.Errorf("observable config required") + } + // For now expect a literal subject that is not empty. + // FIXME(dlc) - Empty == Worker mode + if config.Delivery == "" { + return nil, fmt.Errorf("observable delivery subject is empty") + } + if !subjectIsLiteral(config.Delivery) { + return nil, fmt.Errorf("observable delivery subject has wildcards") + } + if mset.deliveryFormsCycle(config.Delivery) { + return nil, fmt.Errorf("observable delivery subject forms a cycle") + } + + // Check on start position conflicts. + noTime := time.Time{} + if config.StartSeq > 0 && (config.StartTime != noTime || config.DeliverAll || config.DeliverLast) { + return nil, fmt.Errorf("observable starting position conflict") + } else if config.StartTime != noTime && (config.DeliverAll || config.DeliverLast) { + return nil, fmt.Errorf("observable starting position conflict") + } else if config.DeliverAll && config.DeliverLast { + return nil, fmt.Errorf("observable starting position conflict") + } + + // Check if we are not durable that the delivery subject has interest. + if config.Durable == "" { + if mset.noInterest(config.Delivery) { + return nil, fmt.Errorf("observable requires interest for delivery subject when ephemeral") + } + } + + // Set name, which will be durable name if set, otherwise we create one at random. + o := &Observable{mset: mset, config: *config, dsubj: config.Delivery} + if isDurableObservable(config) { + o.name = config.Durable + } else { + o.name = createObservableName() + } + + // Select starting sequence number + o.selectStartingSeqNo() + + // Now register with mset and create ack subscription. + mset.mu.Lock() + c := mset.client + if c == nil { + mset.mu.Unlock() + return nil, fmt.Errorf("message set not valid") + } + s, a := c.srv, c.acc + if _, ok := mset.obs[o.name]; ok { + mset.mu.Unlock() + return nil, fmt.Errorf("observable already exists") + } + // Set up the ack subscription for this observable. Will use wildcard for all acks. + // We will remember the template to generate replaies with sequence numbers and use + // that to scanf them back in. + cn := mset.cleanName() + o.ackReply = fmt.Sprintf("%s.%s.%s.%%d", JsAckPre, cn, o.name) + ackSubj := fmt.Sprintf("%s.%s.%s.*", JsAckPre, cn, o.name) + if sub, err := mset.subscribeInternal(ackSubj, o.processObservableAck); err != nil { + return nil, err + } else { + o.ackSub = sub + } + mset.obs[o.name] = o + mset.mu.Unlock() + + // Now start up Go routine to deliver msgs. + go o.loopAndDeliverMsgs(s, a) + + return o, nil +} + +func (o *Observable) msgSet() *MsgSet { + o.mu.Lock() + mset := o.mset + o.mu.Unlock() + return mset +} + +func (o *Observable) processObservableAck(_ *subscription, _ *client, subject, _ string, msg []byte) { + // No-op for now. +} + +func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) { + var mset *MsgSet + for { + // observable is closed when mset is set to nil. + if mset = o.msgSet(); mset == nil { + return + } + // Deliver all the msgs we have now, once done or on a condition, we wait. + for { + seq := atomic.LoadUint64(&o.seq) + subj, msg, _, err := mset.store.Lookup(seq) + if err == nil { + atomic.AddUint64(&o.seq, 1) + o.deliverMsg(mset, subj, msg, seq) + } else if err != ErrStoreMsgNotFound { + s.Warnf("Jetstream internal storage error on lookup: %v", err) + return + } else { + break + } + } + mset.waitForMsgs() + } +} + +// Deliver a msg to the observable. +func (o *Observable) deliverMsg(mset *MsgSet, subj string, msg []byte, seq uint64) { + mset.sendq <- &jsPubMsg{o.dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg} +} + +// SeqFromReply will extract a sequence number from a reply ack subject. +func (o *Observable) SeqFromReply(reply string) (seq uint64) { + n, err := fmt.Sscanf(reply, o.ackReply, &seq) + if err != nil || n != 1 { + return 0 + } + return +} + +// Will select the starting sequence. +func (o *Observable) selectStartingSeqNo() { + stats := o.mset.Stats() + noTime := time.Time{} + if o.config.StartSeq == 0 { + if o.config.DeliverAll { + o.seq = stats.FirstSeq + } else if o.config.DeliverLast { + o.seq = stats.LastSeq + } else if o.config.StartTime != noTime { + // If we are here we are time based. + // TODO(dlc) - Once clustered can't rely on this. + o.seq = o.mset.store.GetSeqFromTime(o.config.StartTime) + } else { + // Default is deliver new only. + o.seq = stats.LastSeq + 1 + } + } else { + o.seq = o.config.StartSeq + } + + if o.seq < stats.FirstSeq { + o.seq = stats.FirstSeq + } else if o.seq > stats.LastSeq { + o.seq = stats.LastSeq + 1 + } +} + +// Test whether a config represents a durable subscriber. +func isDurableObservable(config *ObservableConfig) bool { + return config != nil && config.Durable != _EMPTY_ +} + +const randObservableNameLen = 6 + +func createObservableName() string { + var b [64]byte + rand.Read(b[:]) + sha := sha256.New() + sha.Write(b[:]) + return fmt.Sprintf("%x", sha.Sum(nil))[:randObservableNameLen] +} + +// DeleteObservable will delete the observable from this message set. +func (mset *MsgSet) DeleteObservable(o *Observable) error { + return o.Delete() +} + +// Delete will delete the observable for the associated message set. +func (o *Observable) Delete() error { + o.mu.Lock() + // TODO(dlc) - Do cleanup here. + mset := o.mset + o.mset = nil + ackSub := o.ackSub + o.ackSub = nil + o.mu.Unlock() + + if mset == nil { + return nil + } + + mset.mu.Lock() + // Break us out of the readLoop. + // TODO(dlc) - Should not be bad for small amounts of observables, maybe + // even into thousands. Above that should check what this might do + // performance wise. + mset.sg.Broadcast() + mset.unsubscribe(ackSub) + delete(mset.obs, o.name) + mset.mu.Unlock() + + return nil +} + +// Checks to see if there is registered interest in the delivery subject. +// Note that since we require delivery to be a literal this is just like +// a publish match. +// +// TODO(dlc) - if gateways are enabled we need to do some more digging for the +// real answer. +func (mset *MsgSet) noInterest(delivery string) bool { + var acc *Account + mset.mu.Lock() + if mset.client != nil { + acc = mset.client.acc + } + mset.mu.Unlock() + if acc == nil { + return true + } + r := acc.sl.Match(delivery) + return len(r.psubs)+len(r.qsubs) == 0 +} + +func (mset *MsgSet) deliveryFormsCycle(deliverySubject string) bool { + mset.mu.Lock() + defer mset.mu.Unlock() + + for _, subject := range mset.config.Subjects { + if subjectIsSubsetMatch(deliverySubject, subject) { + return true + } + } + return false +} diff --git a/server/opts.go b/server/opts.go index 151885ae..9d30c880 100644 --- a/server/opts.go +++ b/server/opts.go @@ -183,6 +183,7 @@ type Options struct { Cluster ClusterOpts `json:"cluster,omitempty"` Gateway GatewayOpts `json:"gateway,omitempty"` LeafNode LeafNodeOpts `json:"leaf,omitempty"` + JetStream bool `json:"jetstream"` ProfPort int `json:"-"` PidFile string `json:"-"` PortsFileDir string `json:"-"` @@ -203,6 +204,7 @@ type Options struct { WriteDeadline time.Duration `json:"-"` MaxClosedClients int `json:"-"` LameDuckDuration time.Duration `json:"-"` + // MaxTracedMsgLen is the maximum printable length for traced messages. MaxTracedMsgLen int `json:"-"` @@ -3117,34 +3119,35 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.StringVar(&configFile, "c", "", "Configuration file.") fs.StringVar(&configFile, "config", "", "Configuration file.") fs.BoolVar(&opts.CheckConfig, "t", false, "Check configuration and exit.") - fs.StringVar(&signal, "sl", "", "Send signal to nats-server process (stop, quit, reopen, reload)") - fs.StringVar(&signal, "signal", "", "Send signal to nats-server process (stop, quit, reopen, reload)") + fs.StringVar(&signal, "sl", "", "Send signal to nats-server process (stop, quit, reopen, reload).") + fs.StringVar(&signal, "signal", "", "Send signal to nats-server process (stop, quit, reopen, reload).") fs.StringVar(&opts.PidFile, "P", "", "File to store process pid.") fs.StringVar(&opts.PidFile, "pid", "", "File to store process pid.") - fs.StringVar(&opts.PortsFileDir, "ports_file_dir", "", "Creates a ports file in the specified directory (_.ports)") + fs.StringVar(&opts.PortsFileDir, "ports_file_dir", "", "Creates a ports file in the specified directory (_.ports).") fs.StringVar(&opts.LogFile, "l", "", "File to store logging output.") fs.StringVar(&opts.LogFile, "log", "", "File to store logging output.") fs.Int64Var(&opts.LogSizeLimit, "log_size_limit", 0, "Logfile size limit being auto-rotated") fs.BoolVar(&opts.Syslog, "s", false, "Enable syslog as log method.") - fs.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method..") + fs.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method.") fs.StringVar(&opts.RemoteSyslog, "r", "", "Syslog server addr (udp://127.0.0.1:514).") fs.StringVar(&opts.RemoteSyslog, "remote_syslog", "", "Syslog server addr (udp://127.0.0.1:514).") fs.BoolVar(&showVersion, "version", false, "Print version information.") fs.BoolVar(&showVersion, "v", false, "Print version information.") - fs.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port") + fs.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port.") fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.") fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") fs.StringVar(&opts.Cluster.Advertise, "cluster_advertise", "", "Cluster URL to advertise to other servers.") fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") - fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries") + fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries.") fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") fs.BoolVar(&opts.TLS, "tls", false, "Enable TLS.") fs.BoolVar(&opts.TLSVerify, "tlsverify", false, "Enable TLS with client verification.") fs.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.") fs.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.") fs.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.") - fs.IntVar(&opts.MaxTracedMsgLen, "max_traced_msg_len", 0, "Maximum printable length for traced messages. 0 for unlimited") + fs.IntVar(&opts.MaxTracedMsgLen, "max_traced_msg_len", 0, "Maximum printable length for traced messages. 0 for unlimited.") + fs.BoolVar(&opts.JetStream, "js", false, "Enable JetStream.") // The flags definition above set "default" values to some of the options. // Calling Parse() here will override the default options with any value diff --git a/server/parser.go b/server/parser.go index 0a8d4263..f0e3ea86 100644 --- a/server/parser.go +++ b/server/parser.go @@ -22,6 +22,7 @@ type pubArg struct { pacache []byte account []byte subject []byte + deliver []byte reply []byte szb []byte queues [][]byte diff --git a/server/server.go b/server/server.go index 45ff0148..746745a6 100644 --- a/server/server.go +++ b/server/server.go @@ -71,6 +71,7 @@ type Info struct { TLSRequired bool `json:"tls_required,omitempty"` TLSVerify bool `json:"tls_verify,omitempty"` MaxPayload int32 `json:"max_payload"` + JetStream bool `json:"jetstream,omitempty"` IP string `json:"ip,omitempty"` CID uint64 `json:"client_id,omitempty"` ClientIP string `json:"client_ip,omitempty"` @@ -110,6 +111,7 @@ type Server struct { listener net.Listener gacc *Account sys *internal + js *jetStream accounts sync.Map tmpAccounts sync.Map // Temporarily stores accounts that are being built activeAccounts int32 @@ -185,7 +187,7 @@ type Server struct { // Trusted public operator keys. trustedKeys []string - // We use this to minimize mem copies for request to monitoring + // We use this to minimize mem copies for requests to monitoring // endpoint /varz (when it comes from http). varzMu sync.Mutex varz *Varz @@ -264,9 +266,11 @@ func NewServer(opts *Options) (*Server, error) { TLSRequired: tlsReq, TLSVerify: verify, MaxPayload: opts.MaxPayload, + JetStream: opts.JetStream, } now := time.Now() + s := &Server{ kp: kp, configFile: opts.ConfigFile, @@ -594,6 +598,24 @@ func (s *Server) generateRouteInfoJSON() { s.routeInfoJSON = bytes.Join(pcs, []byte(" ")) } +// Determines if we are in operator mode. +// Uses opts instead of server lock. +func (s *Server) inOperatorMode() bool { + return len(s.getOpts().TrustedOperators) > 0 +} + +// Determines if we are in pre NATS 2.0 setup with no accounts. +// Uses opts instead of server lock. +func (s *Server) globalAccountOnly() bool { + return len(s.getOpts().Accounts) == 0 +} + +// Determines if this server is in standalone mode, meaning no routes or gateways or leafnodes. +func (s *Server) standAloneMode() bool { + opts := s.getOpts() + return opts.Cluster.Port == 0 && opts.LeafNode.Port == 0 && opts.Gateway.Port == 0 +} + // isTrustedIssuer will check that the issuer is a trusted public key. // This is used to make sure an account was signed by a trusted operator. func (s *Server) isTrustedIssuer(issuer string) bool { @@ -809,8 +831,25 @@ func (s *Server) SystemAccount() *Account { return sacc } +// GlobalAccount returns the global account. +// Default clients will use the global account. +func (s *Server) GlobalAccount() *Account { + s.mu.Lock() + defer s.mu.Unlock() + return s.gacc +} + +// SetDefaultSystemAccount will create a default system account if one is not present. +func (s *Server) SetDefaultSystemAccount() error { + if _, isNew := s.LookupOrRegisterAccount(DEFAULT_SYSTEM_ACCOUNT); !isNew { + return nil + } + s.Debugf("Created system account: %q", DEFAULT_SYSTEM_ACCOUNT) + return s.SetSystemAccount(DEFAULT_SYSTEM_ACCOUNT) +} + // For internal sends. -const internalSendQLen = 4096 +const internalSendQLen = 8192 // Assign a system account. Should only be called once. // This sets up a server to send and receive messages from @@ -847,11 +886,10 @@ func (s *Server) setSystemAccount(acc *Account) error { now := time.Now() s.sys = &internal{ account: acc, - client: &client{srv: s, kind: SYSTEM, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}, + client: s.createInternalSystemClient(), seq: 1, sid: 1, servers: make(map[string]*serverUpdate), - subs: make(map[string]msgHandler), replies: make(map[string]msgHandler), sendq: make(chan *pubMsg, internalSendQLen), resetCh: make(chan struct{}), @@ -859,8 +897,6 @@ func (s *Server) setSystemAccount(acc *Account) error { orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval, } - s.sys.client.initClient() - s.sys.client.echo = false s.sys.wg.Add(1) s.mu.Unlock() @@ -894,6 +930,39 @@ func (s *Server) setSystemAccount(acc *Account) error { return nil } +func (s *Server) systemAccount() *Account { + var sacc *Account + s.mu.Lock() + if s.sys != nil { + sacc = s.sys.account + } + s.mu.Unlock() + return sacc +} + +// Creates an internal system client. +func (s *Server) createInternalSystemClient() *client { + return s.createInternalClient(SYSTEM) +} + +// Creates and internal jetstream client. +func (s *Server) createInternalJetStreamClient() *client { + return s.createInternalClient(JETSTREAM) +} + +// Internal clients. kind should be SYSTEM or JETSTREAM +func (s *Server) createInternalClient(kind int) *client { + if kind != SYSTEM && kind != JETSTREAM { + return nil + } + now := time.Now() + c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now} + c.initClient() + c.echo = false + c.flags.set(noReconnect) + return c +} + // Determine if accounts should track subscriptions for // efficient propagation. // Lock should be held on entry. @@ -1167,12 +1236,6 @@ func (s *Server) Start() { } } - // Start monitoring if needed - if err := s.StartMonitoring(); err != nil { - s.Fatalf("Can't start monitoring: %v", err) - return - } - // Setup system account which will start the eventing stack. if sa := opts.SystemAccount; sa != _EMPTY_ { if err := s.SetSystemAccount(sa); err != nil { @@ -1185,6 +1248,22 @@ func (s *Server) Start() { // this server is configured with gateway or not. s.startGWReplyMapExpiration() + // Check if JetStream has been enabled. This needs to be after + // the system account setup above. JetStream will create its + // own system account if one is not present. + if opts.JetStream { + if err := s.EnableJetStream(nil); err != nil { + s.Fatalf("Can't start jetstream: %v", err) + return + } + } + + // Start monitoring if needed + if err := s.StartMonitoring(); err != nil { + s.Fatalf("Can't start monitoring: %v", err) + return + } + // Start up gateway if needed. Do this before starting the routes, because // we want to resolve the gateway host:port so that this information can // be sent to other routes. @@ -2270,7 +2349,7 @@ func (s *Server) getNonLocalIPsIfHostIsIPAny(host string, all bool) (bool, []str ip = nil continue } - s.Debugf(" ip=%s", ipStr) + s.Debugf(" ip=%s", ipStr) ips = append(ips, ipStr) if !all { break diff --git a/server/store.go b/server/store.go new file mode 100644 index 00000000..bd77855e --- /dev/null +++ b/server/store.go @@ -0,0 +1,48 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "errors" + "time" +) + +// StorageType determines how messages are stored for retention. +type StorageType int + +const ( + // Memory specifies in memory only. + MemoryStorage StorageType = iota + // Disk specifies on disk, designated by the JetStream config StoreDir. + DiskStorage +) + +type MsgSetStore interface { + StoreMsg(subj string, msg []byte) (uint64, error) + Lookup(seq uint64) (subj string, msg []byte, ts int64, err error) + GetSeqFromTime(t time.Time) uint64 + Stats() MsgSetStats +} + +// MsgSetStats are stats about this given message set. +type MsgSetStats struct { + Msgs uint64 + Bytes uint64 + FirstSeq uint64 + LastSeq uint64 +} + +var ( + ErrStoreMsgNotFound = errors.New("no message found") +) diff --git a/server/sysmem/mem_bsd.go b/server/sysmem/mem_bsd.go new file mode 100644 index 00000000..f62a7d27 --- /dev/null +++ b/server/sysmem/mem_bsd.go @@ -0,0 +1,20 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build freebsd openbsd dragonfly netbsd + +package sysmem + +func Memory() int64 { + return sysctlInt64("hw.physmem") +} diff --git a/server/sysmem/mem_darwin.go b/server/sysmem/mem_darwin.go new file mode 100644 index 00000000..e5cce831 --- /dev/null +++ b/server/sysmem/mem_darwin.go @@ -0,0 +1,20 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build darwin + +package sysmem + +func Memory() int64 { + return sysctlInt64("hw.memsize") +} diff --git a/server/sysmem/mem_linux.go b/server/sysmem/mem_linux.go new file mode 100644 index 00000000..6842a157 --- /dev/null +++ b/server/sysmem/mem_linux.go @@ -0,0 +1,27 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package sysmem + +import "syscall" + +func Memory() int64 { + var info syscall.Sysinfo_t + err := syscall.Sysinfo(&info) + if err != nil { + return 0 + } + return int64(info.Totalram) * int64(info.Unit) +} diff --git a/server/sysmem/mem_windows.go b/server/sysmem/mem_windows.go new file mode 100644 index 00000000..12f3d736 --- /dev/null +++ b/server/sysmem/mem_windows.go @@ -0,0 +1,44 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +import ( + "syscall" + "unsafe" +) + +// https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/ns-sysinfoapi-memorystatusex +type _memoryStatusEx struct { + dwLength uint32 + dwMemoryLoad uint32 + ullTotalPhys uint64 + unused [6]uint64 // ignore rest of struct +} + +func Memory() int64 { + k32, err := syscall.LoadDLL("kernel32.dll") + if err != nil { + return 0 + } + globalMemoryStatusEx, err := kernel32.FindProc("GlobalMemoryStatusEx") + if err != nil { + return 0 + } + msx := &_memoryStatusEx{dwLength: 64} + res, _, _ := globalMemoryStatusEx.Call(uintptr(unsafe.Pointer(msx))) + if res == 0 { + return 0 + } + return msx.ullTotalPhys +} diff --git a/server/sysmem/sysctl.go b/server/sysmem/sysctl.go new file mode 100644 index 00000000..e70e037c --- /dev/null +++ b/server/sysmem/sysctl.go @@ -0,0 +1,31 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build darwin freebsd openbsd dragonfly netbsd + +package sysmem + +import ( + "syscall" + "unsafe" +) + +func sysctlInt64(name string) int64 { + s, err := syscall.Sysctl(name) + if err != nil { + return 0 + } + // hack because the string conversion above drops a \0 + b := []byte(s) + return *(*int64)(unsafe.Pointer(&b[0])) +} diff --git a/test/bench_test.go b/test/bench_test.go index 5d5c8134..cbf7f22d 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -25,10 +25,12 @@ import ( "math/rand" "net" "net/url" + "runtime" "testing" "time" "github.com/nats-io/nats-server/v2/server" + nats "github.com/nats-io/nats.go" ) const PERF_PORT = 8422 @@ -1394,3 +1396,115 @@ func Benchmark_____RoutedIntGraph(b *testing.B) { } b.StopTimer() } + +/////////////////////////////////////////////////////////////////////////// +// Simple JetStream Benchmarks +/////////////////////////////////////////////////////////////////////////// + +func Benchmark_JetStreamPubWithAck(b *testing.B) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo"}) + if err != nil { + b.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + b.Fatalf("Failed to create client: %v", err) + } + defer nc.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + nc.Request("foo", []byte("Hello World!"), 50*time.Millisecond) + } + b.StopTimer() + + stats := mset.Stats() + if int(stats.Msgs) != b.N { + b.Fatalf("Expected %d messages, got %d", b.N, stats.Msgs) + } +} + +func Benchmark_JetStreamPubNoAck(b *testing.B) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo"}) + if err != nil { + b.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + b.Fatalf("Failed to create client: %v", err) + } + defer nc.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := nc.Publish("foo", []byte("Hello World!")); err != nil { + b.Fatalf("Unexpected error: %v", err) + } + } + nc.Flush() + b.StopTimer() + + stats := mset.Stats() + if int(stats.Msgs) != b.N { + b.Fatalf("Expected %d messages, got %d", b.N, stats.Msgs) + } +} + +func Benchmark_JetStreamPubAsyncAck(b *testing.B) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo"}) + if err != nil { + b.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc, err := nats.Connect(s.ClientURL(), nats.NoReconnect()) + if err != nil { + b.Fatalf("Failed to create client: %v", err) + } + defer nc.Close() + + // Put ack stream on its own connection. + anc, err := nats.Connect(s.ClientURL()) + if err != nil { + b.Fatalf("Failed to create client: %v", err) + } + defer anc.Close() + + acks := nats.NewInbox() + sub, _ := anc.Subscribe(acks, func(m *nats.Msg) { + // Just eat them for this test. + }) + // set max pending to unlimited. + sub.SetPendingLimits(-1, -1) + defer sub.Unsubscribe() + + anc.Flush() + runtime.GC() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := nc.PublishRequest("foo", acks, []byte("Hello World!")); err != nil { + b.Fatalf("[%d] Unexpected error: %v", i, err) + } + } + nc.Flush() + b.StopTimer() + + stats := mset.Stats() + if int(stats.Msgs) != b.N { + b.Fatalf("Expected %d messages, got %d", b.N, stats.Msgs) + } +} diff --git a/test/jetstream_test.go b/test/jetstream_test.go new file mode 100644 index 00000000..fe164cfa --- /dev/null +++ b/test/jetstream_test.go @@ -0,0 +1,426 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats-server/v2/server/sysmem" + "github.com/nats-io/nats.go" +) + +func TestJetStreamBasicNilConfig(t *testing.T) { + s := RunRandClientPortServer() + defer s.Shutdown() + + if err := s.EnableJetStream(nil); err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if !s.JetStreamEnabled() { + t.Fatalf("Expected JetStream to be enabled") + } + if s.SystemAccount() == nil { + t.Fatalf("Expected system account to be created automatically") + } + // Grab our config since it was dynamically generated. + config := s.JetStreamConfig() + if config == nil { + t.Fatalf("Expected non-nil config") + } + // Check dynamic max memory. + hwMem := sysmem.Memory() + if hwMem != 0 { + // Make sure its about 75% + est := hwMem / 4 * 3 + if config.MaxMemory != est { + t.Fatalf("Expected memory to be 80 percent of system memory, got %v vs %v", config.MaxMemory, est) + } + } + // Check store path, should be tmpdir. + expectedDir := filepath.Join(os.TempDir(), server.JetStreamStoreDir) + if config.StoreDir != expectedDir { + t.Fatalf("Expected storage directory of %q, but got %q", expectedDir, config.StoreDir) + } + // Make sure it was created. + stat, err := os.Stat(config.StoreDir) + if err != nil { + t.Fatalf("Expected the store directory to be present, %v", err) + } + if stat == nil || !stat.IsDir() { + t.Fatalf("Expected a directory") + } +} + +func RunBasicJetStreamServer() *server.Server { + opts := DefaultTestOptions + opts.Port = -1 + opts.JetStream = true + return RunServer(&opts) +} + +func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn { + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + return nc +} + +func TestJetStreamEnableAndDisableAccount(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + // Global in simple setup should be enabled already. + if !s.GlobalAccount().JetStreamEnabled() { + t.Fatalf("Expected to have jetstream enabled on global account") + } + if na := s.JetStreamNumAccounts(); na != 1 { + t.Fatalf("Expected 1 account, got %d", na) + } + acc, _ := s.LookupOrRegisterAccount("$FOO") + if err := s.JetStreamEnableAccount(acc, server.JetStreamAccountLimitsNoLimits); err != nil { + t.Fatalf("Did not expect error on enabling account: %v", err) + } + if na := s.JetStreamNumAccounts(); na != 2 { + t.Fatalf("Expected 2 accounts, got %d", na) + } + if err := s.JetStreamDisableAccount(acc); err != nil { + t.Fatalf("Did not expect error on disabling account: %v", err) + } + if na := s.JetStreamNumAccounts(); na != 1 { + t.Fatalf("Expected 1 account, got %d", na) + } + // We should get error if disabling something not enabled. + acc, _ = s.LookupOrRegisterAccount("$BAR") + if err := s.JetStreamDisableAccount(acc); err == nil { + t.Fatalf("Expected error on disabling account that was not enabled") + } + // Should get an error for trying to enable a non-registered account. + acc = server.NewAccount("$BAZ") + if err := s.JetStreamEnableAccount(acc, server.JetStreamAccountLimitsNoLimits); err == nil { + t.Fatalf("Expected error on enabling account that was not registered") + } + if err := s.JetStreamDisableAccount(s.GlobalAccount()); err != nil { + t.Fatalf("Did not expect error on disabling account: %v", err) + } + if na := s.JetStreamNumAccounts(); na != 0 { + t.Fatalf("Expected no accounts, got %d", na) + } +} + +func TestJetStreamAddMsgSet(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mconfig := &server.MsgSetConfig{ + Name: "foo", + Retention: server.StreamPolicy, + MaxAge: time.Hour, + Storage: server.MemoryStorage, + Replicas: 1, + } + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), mconfig) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + + nc := clientConnectToServer(t, s) + defer nc.Close() + + nc.Publish("foo", []byte("Hello World!")) + nc.Flush() + + stats := mset.Stats() + if stats.Msgs != 1 { + t.Fatalf("Expected 1 message, got %d", stats.Msgs) + } + if stats.Bytes == 0 { + t.Fatalf("Expected non-zero bytes") + } + + nc.Publish("foo", []byte("Hello World Again!")) + nc.Flush() + + stats = mset.Stats() + if stats.Msgs != 2 { + t.Fatalf("Expected 2 messages, got %d", stats.Msgs) + } + + if err := s.JetStreamDeleteMsgSet(mset); err != nil { + t.Fatalf("Got an error deleting the message set: %v", err) + } +} + +func expectOKResponse(t *testing.T, m *nats.Msg) { + t.Helper() + if m == nil { + t.Fatalf("No response, possible timeout?") + } + if string(m.Data) != server.JsOK { + t.Fatalf("Expected a JetStreamPubAck, got %q", m.Data) + } +} + +func TestJetStreamBasicAckPublish(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo.*"}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc := clientConnectToServer(t, s) + defer nc.Close() + + for i := 0; i < 50; i++ { + resp, _ := nc.Request("foo.bar", []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + } + stats := mset.Stats() + if stats.Msgs != 50 { + t.Fatalf("Expected 50 messages, got %d", stats.Msgs) + } +} + +func TestJetStreamRequestEnabled(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + resp, _ := nc.Request(server.JsEnabled, nil, time.Second) + expectOKResponse(t, resp) +} + +func TestJetStreamCreateObservable(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo", Subjects: []string{"foo", "bar"}}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + // Check for basic errors. + if _, err := mset.AddObservable(nil); err == nil { + t.Fatalf("Expected an error for no config") + } + + // Check for delivery subject errors. + // Empty delivery subject + if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: ""}); err == nil { + t.Fatalf("Expected an error on empty delivery subject") + } + // No literal delivery subject allowed. + if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "foo.*"}); err == nil { + t.Fatalf("Expected an error on bad delivery subject") + } + // Check for cycles + if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "foo"}); err == nil { + t.Fatalf("Expected an error on delivery subject that forms a cycle") + } + if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "bar"}); err == nil { + t.Fatalf("Expected an error on delivery subject that forms a cycle") + } + if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "*"}); err == nil { + t.Fatalf("Expected an error on delivery subject that forms a cycle") + } + + // StartPosition conflicts + if _, err := mset.AddObservable(&server.ObservableConfig{ + Delivery: "A", + StartSeq: 1, + StartTime: time.Now(), + }); err == nil { + t.Fatalf("Expected an error on start position conflicts") + } + if _, err := mset.AddObservable(&server.ObservableConfig{ + Delivery: "A", + StartTime: time.Now(), + DeliverAll: true, + }); err == nil { + t.Fatalf("Expected an error on start position conflicts") + } + if _, err := mset.AddObservable(&server.ObservableConfig{ + Delivery: "A", + DeliverAll: true, + DeliverLast: true, + }); err == nil { + t.Fatalf("Expected an error on start position conflicts") + } + + // Non-Durables need to have subscription to delivery subject. + delivery := nats.NewInbox() + if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery}); err == nil { + t.Fatalf("Expected an error on unsubscribed delivery subject") + } + + // This should work.. + nc := clientConnectToServer(t, s) + defer nc.Close() + sub, _ := nc.SubscribeSync(delivery) + defer sub.Unsubscribe() + nc.Flush() + + o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + + if err := mset.DeleteObservable(o); err != nil { + t.Fatalf("Expected no error on delete, got %v", err) + } +} + +func TestJetStreamBasicDelivery(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "MSET", Subjects: []string{"foo.*"}}) + if err != nil { + t.Fatalf("Unexpected error adding message set: %v", err) + } + defer s.JetStreamDeleteMsgSet(mset) + + nc := clientConnectToServer(t, s) + defer nc.Close() + + toSend := 100 + sendSubj := "foo.bar" + for i := 0; i < toSend; i++ { + resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + } + stats := mset.Stats() + if stats.Msgs != uint64(toSend) { + t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs) + } + + // Now create an observable. Use different connection. + nc2 := clientConnectToServer(t, s) + defer nc2.Close() + + delivery := nats.NewInbox() + sub, _ := nc2.SubscribeSync(delivery) + defer sub.Unsubscribe() + nc2.Flush() + + o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverAll: true}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + // Check for our messages. + checkMsgs := func(seqOff int) { + t.Helper() + + checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error { + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend) + } + return nil + }) + + // Now let's check the messages + for i := 0; i < toSend; i++ { + m, _ := sub.NextMsg(time.Millisecond) + // JetStream will have the subject match the stream subject, not delivery subject. + if m.Subject != sendSubj { + t.Fatalf("Expected original subject of %q, but got %q", sendSubj, m.Subject) + } + // Now check that reply subject exists and has a sequence as the last token. + if seq := o.SeqFromReply(m.Reply); seq != uint64(i+seqOff) { + t.Fatalf("Expected sequence of %d , got %d", i+seqOff, seq) + } + // Ack the message here. + m.Respond(nil) + } + } + + checkMsgs(1) + + // Now send more and make sure delivery picks back up. + for i := 0; i < toSend; i++ { + resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) + expectOKResponse(t, resp) + } + stats = mset.Stats() + if stats.Msgs != uint64(toSend*2) { + t.Fatalf("Expected %d messages, got %d", toSend*2, stats.Msgs) + } + + checkMsgs(101) + + checkSubEmpty := func() { + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 0 { + t.Fatalf("Expected sub to have no pending") + } + } + checkSubEmpty() + o.Delete() + + // Now check for deliver last, deliver new and deliver by seq. + o, err = mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverLast: true}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error { + if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 1 { + return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend) + } + return nil + }) + m, _ := sub.NextMsg(time.Millisecond) + if seq := o.SeqFromReply(m.Reply); seq != 200 { + t.Fatalf("Expected sequence to be 200, but got %d", seq) + } + + checkSubEmpty() + o.Delete() + + o, err = mset.AddObservable(&server.ObservableConfig{Delivery: delivery}) // Default is deliver new only. + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + if m, err := sub.NextMsg(time.Millisecond); err == nil { + t.Fatalf("Expected no msg, got %+v", m) + } + + checkSubEmpty() + o.Delete() + + // Now try by sequence number. + o, err = mset.AddObservable(&server.ObservableConfig{Delivery: delivery, StartSeq: 101}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + checkMsgs(101) +} diff --git a/test/test.go b/test/test.go index 6bec1d2d..1db98366 100644 --- a/test/test.go +++ b/test/test.go @@ -50,6 +50,12 @@ func RunDefaultServer() *server.Server { return RunServer(&DefaultTestOptions) } +func RunRandClientPortServer() *server.Server { + opts := DefaultTestOptions + opts.Port = -1 + return RunServer(&opts) +} + // To turn on server tracing and debugging and logging which are // normally suppressed. var (