JetStream first pass basics.

This is the first checkin for JetStream. Has some rudimentary basics working.

TODO
1. Push vs pull mode for observables. (work queues)
2. Disk/File store, memory only for now.
3. clustering code - design shaping up well.
4. Finalize account import semantics.
5. Lots of other little things.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-10-03 10:27:24 -07:00
parent 2b8ebfcf07
commit dd116fcfd4
22 changed files with 2275 additions and 58 deletions

View File

@@ -31,10 +31,10 @@ Server Options:
-m, --http_port <port> Use port for http monitoring
-ms,--https_port <port> Use port for https monitoring
-c, --config <file> Configuration file
-t Test configuration and exit
-sl,--signal <signal>[=<pid>] Send signal to nats-server process (stop, quit, reopen, reload)
<pid> can be either a PID (e.g. 1) or the path to a PID file (e.g. /var/run/nats-server.pid)
--client_advertise <string> Client URL to advertise to other servers
-t Test configuration and exit
Logging Options:
-l, --log <file> File to redirect log output
@@ -47,6 +47,9 @@ Logging Options:
-DV Debug and trace
-DVV Debug and verbose trace (traces system account as well)
JetStream Options:
-js, --jetstream Enable JetStream functionality.
Authorization Options:
--user <user> User required for connections
--pass <password> Password required for connections
@@ -66,7 +69,6 @@ Cluster Options:
--cluster_advertise <string> Cluster URL to advertise to other servers
--connect_retries <number> For implicit routes, number of connect retries
Common Options:
-h, --help Show this message
-v, --version Show version

View File

@@ -30,7 +30,7 @@ import (
// For backwards compatibility with NATS < 2.0, users who are not explicitly defined into an
// account will be grouped in the default global account.
const globalAccountName = "$G"
const globalAccountName = DEFAULT_GLOBAL_ACCOUNT
// Account are subject namespace definitions. By default no messages are shared between accounts.
// You can share via Exports and Imports of Streams and Services.

View File

@@ -44,6 +44,8 @@ const (
SYSTEM
// LEAF is for leaf node connections.
LEAF
// JETSTREAM is an internal jetstream client.
JETSTREAM
)
const (
@@ -382,6 +384,7 @@ type subscription struct {
client *client
im *streamImport // This is for import stream support.
shadow []*subscription // This is to track shadowed accounts.
icb msgHandler
subject []byte
queue []byte
sid []byte
@@ -489,6 +492,8 @@ func (c *client) initClient() {
c.ncs = fmt.Sprintf("%s - lid:%d", conn, c.cid)
case SYSTEM:
c.ncs = "SYSTEM"
case JETSTREAM:
c.ncs = "JETSTREAM"
}
}
@@ -813,6 +818,7 @@ func (c *client) writeLoop() {
// message that now is being delivered.
func (c *client) flushClients(budget time.Duration) time.Time {
last := time.Now()
// Check pending clients for flush.
for cp := range c.pcd {
// TODO(dlc) - Wonder if it makes more sense to create a new map?
@@ -976,7 +982,7 @@ func (c *client) readLoop() {
return
}
if cpacc && start.Sub(lpacc) >= closedSubsCheckInterval {
if cpacc && (start.Sub(lpacc)) >= closedSubsCheckInterval {
c.pruneClosedSubFromPerAccountCache()
lpacc = time.Now()
}
@@ -1053,15 +1059,17 @@ func (c *client) flushOutbound() bool {
c.mu.Unlock()
// flush here
now := time.Now()
start := time.Now()
// FIXME(dlc) - writev will do multiple IOs past 1024 on
// most platforms, need to account for that with deadline?
nc.SetWriteDeadline(now.Add(wdl))
nc.SetWriteDeadline(start.Add(wdl))
// Actual write to the socket.
n, err := nb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})
lft := time.Since(now)
lft := time.Since(start)
// Re-acquire client lock.
c.mu.Lock()
@@ -1561,7 +1569,7 @@ func (c *client) maxPayloadViolation(sz int, max int32) {
}
// queueOutbound queues data for a clientconnection.
// Return if the data is referenced or not. If referenced, the caller
// Returns if the data is referenced or not. If referenced, the caller
// should not reuse the `data` array.
// Lock should be held.
func (c *client) queueOutbound(data []byte) bool {
@@ -1933,8 +1941,8 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error)
sid := string(sub.sid)
// This check does not apply to SYSTEM clients (because they don't have a `nc`...)
if kind != SYSTEM && c.isClosed() {
// This check does not apply to SYSTEM or JETSTREAM clients (because they don't have a `nc`...)
if c.isClosed() && (kind != SYSTEM && kind != JETSTREAM) {
c.mu.Unlock()
return sub, nil
}
@@ -2490,11 +2498,15 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b
atomic.AddInt64(&srv.outMsgs, 1)
atomic.AddInt64(&srv.outBytes, msgSize)
// Check for internal subscription.
if client.kind == SYSTEM {
// Check for internal subscriptions.
if client.kind == SYSTEM || client.kind == JETSTREAM {
s := client.srv
client.mu.Unlock()
s.deliverInternalMsg(sub, c, subject, c.pa.reply, msg[:msgSize])
if sub.icb == nil {
s.Debugf("Received internal callback with no registered handler")
return false
}
sub.icb(sub, c, string(subject), string(c.pa.reply), msg[:msgSize])
return true
}
@@ -2865,7 +2877,13 @@ func (c *client) processInboundClientMsg(msg []byte) {
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
flag |= pmrCollectQueueNames
}
qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, flag)
// With JetStream we now have times where we want to match a subsctiption
// on one subject, but deliver it with abother. e.g. JetStream deliverables.
subj := c.pa.subject
if len(c.pa.deliver) > 0 {
subj = c.pa.deliver
}
qnames = c.processMsgResults(c.acc, r, msg, subj, c.pa.reply, flag)
}
// Now deal with gateways
@@ -3526,6 +3544,14 @@ func (c *client) closeConnection(reason ClosedState) {
func (c *client) teardownConn() {
c.mu.Lock()
// Be consistent with the creation: for routes and gateways,
// we use Noticef on create, so use that too for delete.
if c.kind == ROUTER || c.kind == GATEWAY {
c.Noticef("%s connection closed", c.typeString())
} else { // Client, System, Jetstream and Leafnode connections.
c.Debugf("%s connection closed", c.typeString())
}
c.clearAuthTimer()
c.clearPingTimer()
// Unblock anyone who is potentially stalled waiting on us.
@@ -3576,8 +3602,8 @@ func (c *client) teardownConn() {
c.mu.Unlock()
// Remove client's or leaf node subscriptions.
if (kind == CLIENT || kind == LEAF) && acc != nil {
// Remove client's or leaf node or jetstream subscriptions.
if acc != nil && (kind == CLIENT || kind == LEAF || kind == JETSTREAM) {
acc.sl.RemoveBatch(subs)
} else if kind == ROUTER {
go c.removeRemoteSubs()

View File

@@ -182,4 +182,10 @@ const (
// DEFAULT_SERVICE_LATENCY_SAMPLING is the default sampling rate for service
// latency metrics
DEFAULT_SERVICE_LATENCY_SAMPLING = 100
// DEFAULT_SYSTEM_ACCOUNT
DEFAULT_SYSTEM_ACCOUNT = "$SYS"
// DEFAULT GLOBAL_ACCOUNT
DEFAULT_GLOBAL_ACCOUNT = "$G"
)

View File

@@ -67,11 +67,10 @@ type internal struct {
account *Account
client *client
seq uint64
sid uint64
sid int
servers map[string]*serverUpdate
sweeper *time.Timer
stmr *time.Timer
subs map[string]msgHandler
replies map[string]msgHandler
sendq chan *pubMsg
resetCh chan struct{}
@@ -255,7 +254,14 @@ RESET:
}
var b []byte
if pm.msg != nil {
b, _ = json.MarshalIndent(pm.msg, _EMPTY_, " ")
switch v := pm.msg.(type) {
case string:
b = []byte(v)
case []byte:
b = v
default:
b, _ = json.MarshalIndent(pm.msg, _EMPTY_, " ")
}
}
c.mu.Lock()
// We can have an override for account here.
@@ -312,7 +318,6 @@ func (s *Server) sendShutdownEvent() {
// Stop any more messages from queueing up.
s.sys.sendq = nil
// Unhook all msgHandlers. Normal client cleanup will deal with subs, etc.
s.sys.subs = nil
s.sys.replies = nil
s.mu.Unlock()
// Send to the internal queue and mark as last.
@@ -1096,19 +1101,6 @@ func (s *Server) sendAuthErrorEvent(c *client) {
// required to be copied.
type msgHandler func(sub *subscription, client *client, subject, reply string, msg []byte)
func (s *Server) deliverInternalMsg(sub *subscription, c *client, subject, reply, msg []byte) {
s.mu.Lock()
if !s.eventsEnabled() || s.sys.subs == nil {
s.mu.Unlock()
return
}
cb := s.sys.subs[string(sub.sid)]
s.mu.Unlock()
if cb != nil {
cb(sub, c, string(subject), string(reply), msg)
}
}
// Create an internal subscription. No support for queue groups atm.
func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) {
return s.systemSubscribe(subject, false, cb)
@@ -1127,11 +1119,10 @@ func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandle
return nil, fmt.Errorf("undefined message handler")
}
s.mu.Lock()
sid := strconv.FormatInt(int64(s.sys.sid), 10)
s.sys.subs[sid] = cb
s.sys.sid++
c := s.sys.client
trace := c.trace
s.sys.sid++
sid := s.sys.sid
s.mu.Unlock()
arg := []byte(subject + " " + sid)
@@ -1140,7 +1131,14 @@ func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandle
}
// Now create the subscription
return c.processSub(arg, internalOnly)
sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(sid)), internalOnly)
if err != nil {
return nil, err
}
c.mu.Lock()
sub.icb = cb
c.mu.Unlock()
return sub, nil
}
func (s *Server) sysUnsubscribe(sub *subscription) {
@@ -1150,7 +1148,6 @@ func (s *Server) sysUnsubscribe(sub *subscription) {
s.mu.Lock()
acc := s.sys.account
c := s.sys.client
delete(s.sys.subs, string(sub.sid))
s.mu.Unlock()
c.unsubscribe(acc, sub, true, true)
}

356
server/jetstream.go Normal file
View File

@@ -0,0 +1,356 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"strings"
"sync"
"github.com/nats-io/nats-server/v2/server/sysmem"
)
// JetStreamConfig determines this server's configuration.
// MaxMemory and MaxStore are in bytes.
type JetStreamConfig struct {
MaxMemory int64
MaxStore int64
StoreDir string
}
// This is for internal accounting for JetStream for this server.
type jetStream struct {
mu sync.RWMutex
srv *Server
config JetStreamConfig
accounts map[*Account]*jsAccount
// TODO(dlc) - need to track
// memUsage int64
// diskUsage int64
}
// TODO(dlc) - decide on what these look like for single vs cluster vs supercluster.
// TODO(dlc) - need to track and rollup against server limits, etc.
type JetStreamAccountLimits struct {
MaxMemory int64
MaxStore int64
MaxMsgSets int
}
const (
// OK response
JsOK = "+OK"
// JsEnabled allows a user to dynamically check if JetStream is enabled
// with a simple request.
// Will return +OK on success and will timeout if not imported.
JsEnabledExport = "$JS.*.ENABLED"
JsEnabled = "$JS.ENABLED"
// jsInfo is for obtaining general information about JetStream for this account.
// Will return JSON response.
JsInfoExport = "$JS.*.INFO"
JsInfo = "$JS.INFO"
// JsCreatObservable is the endpoint to create observers for a message set.
// Will return +OK on success and -ERR on failure.
JsCreateObservableExport = "$JS.*.OBSERVABLE.CREATE"
JsCreateObservable = "$JS.OBSERVABLE.CREATE"
// JsCreatMsgSet is the endpoint to create message sets.
// Will return +OK on success and -ERR on failure.
JsCreateMsgSetExport = "$JS.*.MSGSET.CREATE"
JsCreateMsgSet = "$JS.MSGSET.CREATE"
// JsMsgSetInfo is for obtaining general information about a named message set.
// Will return JSON response.
JsMsgSetInfoExport = "$JS.*.MSGSET.INFO"
JsMsgSetInfo = "$JS.MSGSET.INFO"
// JsAckPre is the prefix for the ack stream coming back to observable.
JsAckPre = "$JS.A"
)
// For easier handling of exports and imports.
var allJsExports = []string{JsEnabledExport, JsInfoExport, JsCreateObservableExport, JsCreateMsgSetExport, JsMsgSetInfoExport}
// This represents a jetstream enabled account.
// Worth noting that we include the js ptr, this is because
// in general we want to be very efficient when receiving messages on
// and internal sub for a msgSet, so we will direct link to the msgSet
// and walk backwards as needed vs multiple hash lookups and locks, etc.
type jsAccount struct {
mu sync.Mutex
js *jetStream
account *Account
limits JetStreamAccountLimits
msgSets map[string]*MsgSet
}
// EnableJetStream will enable JetStream support on this server with
// the given configuration. A nil configuration will dynamically choose
// the limits and temporary file storage directory.
// If this server is part of a cluster, a system account will need to be defined.
// This allows the server to send and receive messages.
func (s *Server) EnableJetStream(config *JetStreamConfig) error {
s.mu.Lock()
if !s.standAloneMode() {
s.mu.Unlock()
return fmt.Errorf("jetstream restricted to single server mode")
}
if s.js != nil {
s.mu.Unlock()
return fmt.Errorf("jetstream already enabled")
}
s.Noticef("Starting jetstream")
if config == nil {
s.Debugf("Jetstream creating dynamic configuration - 75%% system memory, %s disk", friendlyBytes(JetStreamMaxStoreDefault))
config = s.dynJetStreamConfig()
}
s.js = &jetStream{srv: s, config: *config, accounts: make(map[*Account]*jsAccount)}
s.mu.Unlock()
if stat, err := os.Stat(config.StoreDir); os.IsNotExist(err) {
if err := os.MkdirAll(config.StoreDir, 0755); err != nil {
return fmt.Errorf("could not create storage directory - %v", err)
}
} else {
// Make sure its a directory and that we can write to it.
if stat == nil || !stat.IsDir() {
return fmt.Errorf("storage directory is not a directory")
}
tmpfile, err := ioutil.TempFile(config.StoreDir, "_test_")
if err != nil {
return fmt.Errorf("storage directory is not writable")
}
os.Remove(tmpfile.Name())
// TODO(dlc) - Recover state
}
// JetStream is an internal service so we need to make sure we have a system account.
// This system account will export the JetStream service endpoints.
if sacc := s.SystemAccount(); sacc == nil {
s.SetDefaultSystemAccount()
}
// Setup our internal subscriptions.
if _, err := s.sysSubscribe(JsEnabledExport, s.isJsEnabledRequest); err != nil {
return fmt.Errorf("Error setting up internal jetstream subscriptions: %v", err)
}
s.Noticef("----------- jetstream config -----------")
s.Noticef(" MaxMemory: %s", friendlyBytes(config.MaxMemory))
s.Noticef(" MaxStore: %s", friendlyBytes(config.MaxStore))
s.Noticef(" StoreDir: %q", config.StoreDir)
// If not in operator mode, setup our internal system exports if needed.
// If we are in operator mode, the system account will need to have them
// added externally.
if !s.inOperatorMode() {
sacc := s.SystemAccount()
// FIXME(dlc) - Should we lock these down?
s.Debugf(" Exports:")
for _, export := range allJsExports {
s.Debugf(" %s", export)
if err := sacc.AddServiceExport(export, nil); err != nil {
return fmt.Errorf("Error setting up jetstream service exports: %v", err)
}
}
}
s.Noticef("----------------------------------------")
// If we have no configured accounts setup then setup imports on global account.
if s.globalAccountOnly() && !s.inOperatorMode() {
if err := s.JetStreamEnableAccount(s.GlobalAccount(), JetStreamAccountLimitsNoLimits); err != nil {
return fmt.Errorf("Error enabling jetstream on the global account")
}
}
return nil
}
// JetStreamEnabled reports if jetstream is enabled.
func (s *Server) JetStreamEnabled() bool {
s.mu.Lock()
enabled := s.js != nil
s.mu.Unlock()
return enabled
}
// JetStreamConfig will return the current config. Useful if the system
// created a dynamic configuration. A copy is returned.
func (s *Server) JetStreamConfig() *JetStreamConfig {
var c *JetStreamConfig
s.mu.Lock()
if s.js != nil {
copy := s.js.config
c = &(copy)
}
s.mu.Unlock()
return c
}
// JetStreamAccountLimitsNoLimits defines no limits for the given account.
var JetStreamAccountLimitsNoLimits = &JetStreamAccountLimits{-1, -1, -1}
// JetStreamEnableAccount enables jetstream capabilties for the given account with the defined limits.
func (s *Server) JetStreamEnableAccount(a *Account, limits *JetStreamAccountLimits) error {
js := s.getJetStream()
if js == nil {
return fmt.Errorf("jetstream not enabled")
}
if a, err := s.LookupAccount(a.Name); err != nil || a == nil {
return fmt.Errorf("jetstream unknown account")
}
// TODO(dlc) - Should we have accounts marked to allow?
// TODO(dlc) - Should we check and warn if these limits are above the server? Maybe only if single server?
js.mu.Lock()
if _, ok := js.accounts[a]; ok {
js.mu.Unlock()
return fmt.Errorf("jetstream already enabled for account")
}
jsa := &jsAccount{js: js, account: a, limits: *limits, msgSets: make(map[string]*MsgSet)}
js.accounts[a] = jsa
js.mu.Unlock()
// If we are not in operator mode we should create the proper imports here.
if !s.inOperatorMode() {
sys := s.SystemAccount()
for _, export := range allJsExports {
importTo := strings.Replace(export, "*", a.Name, -1)
importFrom := strings.Replace(export, ".*.", tsep, -1)
// fmt.Printf("export is %q %q \n", importTo, importFrom)
if err := a.AddServiceImport(sys, importFrom, importTo); err != nil {
return fmt.Errorf("Error setting up jetstream service imports for global account: %v", err)
}
}
}
s.Debugf("Enabled jetstream for account: %q", a.Name)
return nil
}
func (s *Server) JetStreamDisableAccount(a *Account) error {
js := s.getJetStream()
if js == nil {
return fmt.Errorf("jetstream not enabled")
}
js.mu.Lock()
defer js.mu.Unlock()
if _, ok := js.accounts[a]; !ok {
return fmt.Errorf("jetstream not enabled for account")
}
delete(js.accounts, a)
return nil
}
// JetStreamEnabled is a helper to determine if jetstream is enabled for an account.
func (a *Account) JetStreamEnabled() bool {
if a == nil {
return false
}
a.mu.RLock()
s := a.srv
a.mu.RUnlock()
if s == nil {
return false
}
js := s.getJetStream()
if js == nil {
return false
}
js.mu.Lock()
_, ok := js.accounts[a]
js.mu.Unlock()
return ok
}
// JetStreamNumAccounts returns the number of enabled accounts this server is tracking.
func (s *Server) JetStreamNumAccounts() int {
js := s.getJetStream()
if js == nil {
return 0
}
js.mu.Lock()
defer js.mu.Unlock()
return len(js.accounts)
}
func (s *Server) getJetStream() *jetStream {
s.mu.Lock()
js := s.js
s.mu.Unlock()
return js
}
func (js *jetStream) lookupAccount(a *Account) *jsAccount {
js.mu.RLock()
jsa := js.accounts[a]
js.mu.RUnlock()
return jsa
}
// Request to check if jetstream is enabled.
func (s *Server) isJsEnabledRequest(sub *subscription, c *client, subject, reply string, msg []byte) {
if c != nil && c.acc != nil && c.acc.JetStreamEnabled() {
s.sendInternalAccountMsg(c.acc, reply, JsOK)
}
}
const (
// JetStreamStoreDir is the prefix we use.
JetStreamStoreDir = "nats-jetstream"
// JetStreamMaxStoreDefault is the default disk storage limit. 1TB
JetStreamMaxStoreDefault = 1024 * 1024 * 1024 * 1024
// JetStreamMaxMemDefault is only used when we can't determine system memory. 256MB
JetStreamMaxMemDefault = 1024 * 1024 * 256
)
// Dynamically create a config with a tmp based directory (repeatable) and 75% of system memory.
func (s *Server) dynJetStreamConfig() *JetStreamConfig {
jsc := &JetStreamConfig{}
jsc.StoreDir = filepath.Join(os.TempDir(), JetStreamStoreDir)
jsc.MaxStore = JetStreamMaxStoreDefault
// Estimate to 75% of total memory if we can determine system memory.
if sysMem := sysmem.Memory(); sysMem > 0 {
jsc.MaxMemory = sysMem / 4 * 3
} else {
jsc.MaxMemory = JetStreamMaxMemDefault
}
return jsc
}
// friendlyBytes returns a string with the given bytes int64
// represented as a size, such as 1KB, 10MB, etc...
func friendlyBytes(bytes int64) string {
fbytes := float64(bytes)
base := 1024
pre := []string{"K", "M", "G", "T", "P", "E"}
if fbytes < float64(base) {
return fmt.Sprintf("%v B", fbytes)
}
exp := int(math.Log(fbytes) / math.Log(float64(base)))
index := exp - 1
return fmt.Sprintf("%.2f %sB", fbytes/math.Pow(float64(base), float64(exp)), pre[index])
}

187
server/memstore.go Normal file
View File

@@ -0,0 +1,187 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"fmt"
"sort"
"sync"
"time"
)
// TODO(dlc) - This is a fairly simplistic approach but should do for now.
type memStore struct {
mu sync.RWMutex
stats MsgSetStats
msgs map[uint64]*storedMsg
ageChk *time.Timer
config MsgSetConfig
}
type storedMsg struct {
subj string
msg []byte
seq uint64
ts int64 // nanoseconds
}
func newMemStore(cfg *MsgSetConfig) (*memStore, error) {
if cfg == nil {
return nil, fmt.Errorf("config required for MsgSetStore")
}
if cfg.Storage != MemoryStorage {
return nil, fmt.Errorf("memStore requires memory storage type in cfg")
}
ms := &memStore{msgs: make(map[uint64]*storedMsg), config: *cfg}
// This only happens once, so ok to call here.
return ms, nil
}
// Store stores a message.
// both arguments should be copied.
func (ms *memStore) StoreMsg(subj string, msg []byte) (uint64, error) {
ms.mu.Lock()
seq := ms.stats.LastSeq + 1
if ms.stats.FirstSeq == 0 {
ms.stats.FirstSeq = seq
}
// Make copies - https://github.com/go101/go101/wiki
// TODO(dlc) - Maybe be smarter here.
msg = append(msg[:0:0], msg...)
ms.msgs[seq] = &storedMsg{subj, msg, seq, time.Now().UnixNano()}
ms.stats.Msgs++
ms.stats.Bytes += memStoreMsgSize(subj, msg)
ms.stats.LastSeq = seq
// Limits checks and enforcement.
ms.enforceMsgLimit()
ms.enforceBytesLimit()
// Check it we have and need age expiration timer running.
if ms.ageChk == nil && ms.config.MaxAge != 0 {
ms.startAgeChk()
}
ms.mu.Unlock()
return seq, nil
}
// GetSeqFromTime looks for the first sequence number that has the message
// with >= timestamp.
func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
ts := t.UnixNano()
ms.mu.RLock()
defer ms.mu.RUnlock()
if len(ms.msgs) == 0 {
return ms.stats.LastSeq + 1
}
if ts <= ms.msgs[ms.stats.FirstSeq].ts {
return ms.stats.FirstSeq
}
last := ms.msgs[ms.stats.LastSeq].ts
if ts == last {
return ms.stats.LastSeq
}
if ts > last {
return ms.stats.LastSeq + 1
}
index := sort.Search(len(ms.msgs), func(i int) bool {
return ms.msgs[uint64(i)+ms.stats.FirstSeq].ts >= ts
})
return uint64(index) + ms.stats.FirstSeq
}
// Will check the msg limit and drop firstSeq msg if needed.
// Lock should be held.
func (ms *memStore) enforceMsgLimit() {
if ms.config.MaxMsgs == 0 || ms.stats.Msgs <= ms.config.MaxMsgs {
return
}
ms.deleteFirstMsgOrPanic()
}
// Will check the bytes limit and drop msgs if needed.
// Lock should be held.
func (ms *memStore) enforceBytesLimit() {
if ms.config.MaxBytes == 0 || ms.stats.Bytes <= ms.config.MaxBytes {
return
}
for bs := ms.stats.Bytes; bs > ms.config.MaxBytes; bs = ms.stats.Bytes {
ms.deleteFirstMsgOrPanic()
}
}
// Will start the age check timer.
// Lock should be held.
func (ms *memStore) startAgeChk() {
if ms.ageChk == nil && ms.config.MaxAge != 0 {
ms.ageChk = time.AfterFunc(ms.config.MaxAge, ms.expireMsgs)
}
}
// Will expire msgs that are too old.
func (ms *memStore) expireMsgs() {
ms.mu.Lock()
defer ms.mu.Unlock()
minAge := time.Now().UnixNano() - int64(ms.config.MaxAge)
for {
if sm, ok := ms.msgs[ms.stats.FirstSeq]; ok && sm.ts < minAge {
ms.deleteFirstMsgOrPanic()
} else {
return
}
}
}
func (ms *memStore) deleteFirstMsgOrPanic() {
if !ms.deleteFirstMsg() {
panic("jetstream memstore has inconsistent state, can't find firstSeq msg")
}
}
func (ms *memStore) deleteFirstMsg() bool {
sm, ok := ms.msgs[ms.stats.FirstSeq]
if !ok || sm == nil {
return false
}
delete(ms.msgs, ms.stats.FirstSeq)
ms.stats.FirstSeq++
ms.stats.Msgs--
ms.stats.Bytes -= memStoreMsgSize(sm.subj, sm.msg)
return true
}
func (ms *memStore) Lookup(seq uint64) (string, []byte, int64, error) {
ms.mu.RLock()
sm, ok := ms.msgs[seq]
ms.mu.RUnlock()
if !ok || sm == nil {
return "", nil, 0, ErrStoreMsgNotFound
}
return sm.subj, sm.msg, sm.ts, nil
}
func (ms *memStore) Stats() MsgSetStats {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.stats
}
func memStoreMsgSize(subj string, msg []byte) uint64 {
return uint64(len(subj) + len(msg) + 16) // 8*2 for seq + age
}

179
server/memstore_test.go Normal file
View File

@@ -0,0 +1,179 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"testing"
"time"
)
func TestMemStoreBasics(t *testing.T) {
ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
subj, msg := "foo", []byte("Hello World")
if seq, err := ms.StoreMsg(subj, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
} else if seq != 1 {
t.Fatalf("Expected sequence to be 1, got %d", seq)
}
stats := ms.Stats()
if stats.Msgs != 1 {
t.Fatalf("Expected 1 msg, got %d", stats.Msgs)
}
expectedSize := memStoreMsgSize(subj, msg)
if stats.Bytes != expectedSize {
t.Fatalf("Expected %d bytes, got %d", expectedSize, stats.Bytes)
}
nsubj, nmsg, _, err := ms.Lookup(1)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
if nsubj != subj {
t.Fatalf("Subjects don't match, original %q vs %q", subj, nsubj)
}
if !bytes.Equal(nmsg, msg) {
t.Fatalf("Msgs don't match, original %q vs %q", msg, nmsg)
}
}
func TestMemStoreMsgLimit(t *testing.T) {
maxMsgs := uint64(10)
ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage, MaxMsgs: maxMsgs})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
subj, msg := "foo", []byte("Hello World")
for i := uint64(0); i < maxMsgs; i++ {
ms.StoreMsg(subj, msg)
}
stats := ms.Stats()
if stats.Msgs != maxMsgs {
t.Fatalf("Expected %d msgs, got %d", maxMsgs, stats.Msgs)
}
if _, err := ms.StoreMsg(subj, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
}
stats = ms.Stats()
if stats.Msgs != maxMsgs {
t.Fatalf("Expected %d msgs, got %d", maxMsgs, stats.Msgs)
}
if stats.LastSeq != 11 {
t.Fatalf("Expected the last sequence to be 11 now, but got %d", stats.LastSeq)
}
if stats.FirstSeq != 2 {
t.Fatalf("Expected the first sequence to be 2 now, but got %d", stats.FirstSeq)
}
// Make sure we can not lookup seq 1.
if _, _, _, err := ms.Lookup(1); err == nil {
t.Fatalf("Expected error looking up seq 1 but got none")
}
}
func TestMemStoreBytesLimit(t *testing.T) {
subj, msg := "foo", make([]byte, 512)
storedMsgSize := memStoreMsgSize(subj, msg)
toStore := uint64(1024)
maxBytes := storedMsgSize * toStore
ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage, MaxBytes: maxBytes})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
for i := uint64(0); i < toStore; i++ {
ms.StoreMsg(subj, msg)
}
stats := ms.Stats()
if stats.Msgs != toStore {
t.Fatalf("Expected %d msgs, got %d", toStore, stats.Msgs)
}
if stats.Bytes != storedMsgSize*toStore {
t.Fatalf("Expected bytes to be %d, got %d", storedMsgSize*toStore, stats.Bytes)
}
// Now send 10 more and check that bytes limit enforced.
for i := 0; i < 10; i++ {
if _, err := ms.StoreMsg(subj, msg); err != nil {
t.Fatalf("Error storing msg: %v", err)
}
}
stats = ms.Stats()
if stats.Msgs != toStore {
t.Fatalf("Expected %d msgs, got %d", toStore, stats.Msgs)
}
if stats.Bytes != storedMsgSize*toStore {
t.Fatalf("Expected bytes to be %d, got %d", storedMsgSize*toStore, stats.Bytes)
}
if stats.FirstSeq != 11 {
t.Fatalf("Expected first sequence to be 11, got %d", stats.FirstSeq)
}
if stats.LastSeq != toStore+10 {
t.Fatalf("Expected last sequence to be %d, got %d", toStore+10, stats.LastSeq)
}
}
func TestMemStoreAgeLimit(t *testing.T) {
maxAge := 10 * time.Millisecond
ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage, MaxAge: maxAge})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
// Store some messages. Does not really matter how many.
subj, msg := "foo", []byte("Hello World")
toStore := 100
for i := 0; i < toStore; i++ {
ms.StoreMsg(subj, msg)
}
stats := ms.Stats()
if stats.Msgs != uint64(toStore) {
t.Fatalf("Expected %d msgs, got %d", toStore, stats.Msgs)
}
// Let them expire
time.Sleep(maxAge * 10)
stats = ms.Stats()
if stats.Msgs != 0 {
t.Fatalf("Expected no msgs, got %d", stats.Msgs)
}
if stats.Bytes != 0 {
t.Fatalf("Expected no bytes, got %d", stats.Bytes)
}
}
func TestMemStoreTimeStamps(t *testing.T) {
ms, err := newMemStore(&MsgSetConfig{Storage: MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error creating store: %v", err)
}
last := time.Now().UnixNano()
subj, msg := "foo", []byte("Hello World")
for i := 0; i < 10; i++ {
ms.StoreMsg(subj, msg)
time.Sleep(5 * time.Microsecond)
}
for seq := uint64(1); seq <= 10; seq++ {
_, _, ts, err := ms.Lookup(seq)
if err != nil {
t.Fatalf("Unexpected error looking up msg: %v", err)
}
// These should be different
if ts <= last {
t.Fatalf("Expected different timestamps, got %v", ts)
}
last = ts
}
}

355
server/msgset.go Normal file
View File

@@ -0,0 +1,355 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
)
// MsgSet is a jetstream message set. When we receive a message internally destined
// for a MsgSet we will direct link from the client to this MsgSet structure.
type MsgSet struct {
mu sync.Mutex
sg *sync.Cond
sgw int
jsa *jsAccount
client *client
sid int
sendq chan *jsPubMsg
store MsgSetStore
obs map[string]*Observable
config MsgSetConfig
}
// MsgSetConfig will determine the name, subjects and retention policy
// for a given message set. If subjects is empty the name will be used.
type MsgSetConfig struct {
Name string
Subjects []string
Retention RetentionPolicy
MaxMsgs uint64
MaxBytes uint64
MaxAge time.Duration
Storage StorageType
Replicas int
}
// RetentionPolicy determines how messages in a set are retained.
type RetentionPolicy int
const (
// StreamPolicy (default) means that messages are retained until any possible given limit is reached.
// This could be any one of MaxMsgs, MaxBytes, or MaxAge.
StreamPolicy RetentionPolicy = iota
// InterestPolicy specifies that when all known subscribers have acknowledged a message it can be removed.
InterestPolicy
// WorkQueuePolicy specifies that when the first subscriber acknowledges the message it can be removed.
WorkQueuePolicy
)
// JetStreamAddMsgSet adds a message set for the given account.
func (s *Server) JetStreamAddMsgSet(a *Account, config *MsgSetConfig) (*MsgSet, error) {
js := s.getJetStream()
if js == nil {
return nil, fmt.Errorf("jetstream not enabled")
}
jsa := js.lookupAccount(a)
if jsa == nil {
return nil, fmt.Errorf("jetstream not enabled for account")
}
// TODO(dlc) - check config for conflicts, e.g replicas > 1 in single server mode.
jsa.mu.Lock()
if _, ok := jsa.msgSets[config.Name]; ok {
jsa.mu.Unlock()
return nil, fmt.Errorf("message set name already taken")
}
c := s.createInternalJetStreamClient()
mset := &MsgSet{jsa: jsa, config: *config, client: c, obs: make(map[string]*Observable)}
mset.sg = sync.NewCond(&mset.mu)
if len(mset.config.Subjects) == 0 {
mset.config.Subjects = append(mset.config.Subjects, mset.config.Name)
}
jsa.msgSets[config.Name] = mset
jsa.mu.Unlock()
// Bind to the account.
c.registerWithAccount(a)
// Create the appropriate storage
if err := mset.setupStore(); err != nil {
mset.delete()
return nil, err
}
// Setup our internal send go routine.
mset.setupSendCapabilities()
// Setup subscriptions
if err := mset.subscribeToMsgSet(); err != nil {
mset.delete()
return nil, err
}
return mset, nil
}
// JetStreamDeleteMsgSet will delete a message set.
func (s *Server) JetStreamDeleteMsgSet(mset *MsgSet) error {
return mset.Delete()
}
// Delete deletes a message set.
func (mset *MsgSet) Delete() error {
mset.mu.Lock()
jsa := mset.jsa
mset.mu.Unlock()
if jsa == nil {
return fmt.Errorf("jetstream not enabled for account")
}
jsa.mu.Lock()
delete(jsa.msgSets, mset.config.Name)
jsa.mu.Unlock()
return mset.delete()
}
func (mset *MsgSet) Purge() error {
return fmt.Errorf("NO IMPL")
}
// Will create internal subscriptions for the msgSet.
// Lock should be held.
func (mset *MsgSet) subscribeToMsgSet() error {
for _, subject := range mset.config.Subjects {
if _, err := mset.subscribeInternal(subject, mset.processInboundJetStreamMsg); err != nil {
return err
}
}
return nil
}
// FIXME(dlc) - This only works in single server mode for the moment. Need to fix as we expand to clusters.
func (mset *MsgSet) subscribeInternal(subject string, cb msgHandler) (*subscription, error) {
return mset.nmsSubscribeInternal(subject, true, cb)
}
func (mset *MsgSet) nmsSubscribeInternal(subject string, internalOnly bool, cb msgHandler) (*subscription, error) {
c := mset.client
if c == nil {
return nil, fmt.Errorf("invalid message set")
}
if !c.srv.eventsEnabled() {
return nil, ErrNoSysAccount
}
if cb == nil {
return nil, fmt.Errorf("undefined message handler")
}
mset.sid++
// Now create the subscription
sub, err := c.processSub([]byte(subject+" "+strconv.Itoa(mset.sid)), internalOnly)
if err != nil {
return nil, err
}
c.mu.Lock()
sub.icb = cb
c.mu.Unlock()
return sub, nil
}
// Lock should be held.
func (mset *MsgSet) unsubscribe(sub *subscription) {
if sub == nil || mset.client == nil {
return
}
mset.client.unsubscribe(mset.client.acc, sub, true, true)
}
func (mset *MsgSet) setupStore() error {
mset.mu.Lock()
defer mset.mu.Unlock()
switch mset.config.Storage {
case MemoryStorage:
ms, err := newMemStore(&mset.config)
if err != nil {
return err
}
mset.store = ms
case DiskStorage:
return fmt.Errorf("NO IMPL")
}
return nil
}
func (mset *MsgSet) processInboundJetStreamMsg(_ *subscription, _ *client, subject, reply string, msg []byte) {
mset.mu.Lock()
store := mset.store
c := mset.client
mset.mu.Unlock()
if c == nil {
return
}
// FIXME(dlc) - Not inline unless memory based.
msg = append(msg[:0:0], msg...)
if _, err := store.StoreMsg(subject, msg); err != nil {
mset.mu.Lock()
s := c.srv
accName := c.acc.Name
name := mset.config.Name
mset.mu.Unlock()
s.Errorf("Jetstream failed to store msg on account: %q message set: %q - %v", accName, name, err)
// TODO(dlc) - Send err here
return
}
// Send Ack here.
if len(reply) > 0 {
mset.sendq <- &jsPubMsg{reply, "", "", []byte(JsOK)}
}
mset.signalObservers()
}
func (mset *MsgSet) signalObservers() {
mset.mu.Lock()
if mset.sgw > 0 {
mset.sg.Broadcast()
}
mset.mu.Unlock()
}
type jsPubMsg struct {
subj string
dsubj string
reply string
msg []byte
}
// TODO(dlc) - Maybe look at onering instead of chan - https://github.com/pltr/onering
const nmsSendQSize = 1024
// This is similar to system semantics but did not want to overload the single system sendq,
// or require system account when doing simple setup with jetstream.
func (mset *MsgSet) setupSendCapabilities() {
mset.mu.Lock()
defer mset.mu.Unlock()
if mset.sendq != nil {
return
}
mset.sendq = make(chan *jsPubMsg, nmsSendQSize)
go mset.internalSendLoop()
}
func (mset *MsgSet) internalSendLoop() {
mset.mu.Lock()
c := mset.client
if c == nil {
mset.mu.Unlock()
return
}
s := c.srv
sendq := mset.sendq
name := mset.config.Name
mset.mu.Unlock()
// Warn when internal send queue is backed up past 75%
warnThresh := 3 * nmsSendQSize / 4
warnFreq := time.Second
last := time.Now().Add(-warnFreq)
for {
if len(sendq) > warnThresh && time.Since(last) >= warnFreq {
s.Warnf("Jetstream internal send queue > 75% for account: %q message set: %q", c.acc.Name, name)
last = time.Now()
}
select {
case pm := <-sendq:
if pm == nil {
return
}
c.pa.subject = []byte(pm.subj)
c.pa.deliver = []byte(pm.dsubj)
c.pa.size = len(pm.msg)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.reply)
msg := append(pm.msg, _CRLF_...)
c.processInboundClientMsg(msg)
c.pa.szb = nil
c.flushClients(0)
case <-s.quitCh:
return
}
}
}
func (mset *MsgSet) delete() error {
var obs []*Observable
mset.mu.Lock()
if mset.sendq != nil {
mset.sendq <- nil
}
c := mset.client
mset.client = nil
for _, o := range mset.obs {
obs = append(obs, o)
}
mset.mu.Unlock()
c.closeConnection(ClientClosed)
for _, o := range obs {
o.Delete()
}
// TODO(dlc) - Clean up any storage, memory and disk
return nil
}
// Returns a name that can be used as a single token for subscriptions.
// Really only need to replace token separators.
// Lock should be held
func (mset *MsgSet) cleanName() string {
return strings.Replace(mset.config.Name, tsep, "-", -1)
}
// Stats will return the current stats for this message set.
func (mset *MsgSet) Stats() MsgSetStats {
// Currently rely on store.
// TODO(dlc) - This will need to change with clusters.
return mset.store.Stats()
}
func (mset *MsgSet) waitForMsgs() {
mset.mu.Lock()
defer mset.mu.Unlock()
if mset.client == nil {
return
}
mset.sgw++
mset.sg.Wait()
mset.sgw--
}

290
server/observable.go Normal file
View File

@@ -0,0 +1,290 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"crypto/rand"
"crypto/sha256"
"fmt"
"sync"
"sync/atomic"
"time"
)
type ObservableConfig struct {
Delivery string `json:"delivery_subject"`
Durable string `json:"durable_name,omitempty"`
StartSeq uint64 `json:"start_seq,omitempty"`
StartTime time.Time `json:"start_time,omitempty"`
DeliverAll bool `json:"deliver_all,omitempty"`
DeliverLast bool `json:"deliver_last,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
}
// AckPolicy determines how the observable shoulc acknowledge delivered messages.
type AckPolicy int
const (
// AckExplicit requires ack or nack for all messages.
AckExplicit AckPolicy = iota
// When acking a sequence number, this implicitly acks all sequences below this one as well.
AckAll
// AckNone requires no acks for delivered messages.
AckNone
)
// Observable is a jetstream observable/subscriber.
type Observable struct {
mu sync.Mutex
name string
mset *MsgSet
seq uint64
dsubj string
ackSub *subscription
ackReply string
config ObservableConfig
}
func (mset *MsgSet) AddObservable(config *ObservableConfig) (*Observable, error) {
if config == nil {
return nil, fmt.Errorf("observable config required")
}
// For now expect a literal subject that is not empty.
// FIXME(dlc) - Empty == Worker mode
if config.Delivery == "" {
return nil, fmt.Errorf("observable delivery subject is empty")
}
if !subjectIsLiteral(config.Delivery) {
return nil, fmt.Errorf("observable delivery subject has wildcards")
}
if mset.deliveryFormsCycle(config.Delivery) {
return nil, fmt.Errorf("observable delivery subject forms a cycle")
}
// Check on start position conflicts.
noTime := time.Time{}
if config.StartSeq > 0 && (config.StartTime != noTime || config.DeliverAll || config.DeliverLast) {
return nil, fmt.Errorf("observable starting position conflict")
} else if config.StartTime != noTime && (config.DeliverAll || config.DeliverLast) {
return nil, fmt.Errorf("observable starting position conflict")
} else if config.DeliverAll && config.DeliverLast {
return nil, fmt.Errorf("observable starting position conflict")
}
// Check if we are not durable that the delivery subject has interest.
if config.Durable == "" {
if mset.noInterest(config.Delivery) {
return nil, fmt.Errorf("observable requires interest for delivery subject when ephemeral")
}
}
// Set name, which will be durable name if set, otherwise we create one at random.
o := &Observable{mset: mset, config: *config, dsubj: config.Delivery}
if isDurableObservable(config) {
o.name = config.Durable
} else {
o.name = createObservableName()
}
// Select starting sequence number
o.selectStartingSeqNo()
// Now register with mset and create ack subscription.
mset.mu.Lock()
c := mset.client
if c == nil {
mset.mu.Unlock()
return nil, fmt.Errorf("message set not valid")
}
s, a := c.srv, c.acc
if _, ok := mset.obs[o.name]; ok {
mset.mu.Unlock()
return nil, fmt.Errorf("observable already exists")
}
// Set up the ack subscription for this observable. Will use wildcard for all acks.
// We will remember the template to generate replaies with sequence numbers and use
// that to scanf them back in.
cn := mset.cleanName()
o.ackReply = fmt.Sprintf("%s.%s.%s.%%d", JsAckPre, cn, o.name)
ackSubj := fmt.Sprintf("%s.%s.%s.*", JsAckPre, cn, o.name)
if sub, err := mset.subscribeInternal(ackSubj, o.processObservableAck); err != nil {
return nil, err
} else {
o.ackSub = sub
}
mset.obs[o.name] = o
mset.mu.Unlock()
// Now start up Go routine to deliver msgs.
go o.loopAndDeliverMsgs(s, a)
return o, nil
}
func (o *Observable) msgSet() *MsgSet {
o.mu.Lock()
mset := o.mset
o.mu.Unlock()
return mset
}
func (o *Observable) processObservableAck(_ *subscription, _ *client, subject, _ string, msg []byte) {
// No-op for now.
}
func (o *Observable) loopAndDeliverMsgs(s *Server, a *Account) {
var mset *MsgSet
for {
// observable is closed when mset is set to nil.
if mset = o.msgSet(); mset == nil {
return
}
// Deliver all the msgs we have now, once done or on a condition, we wait.
for {
seq := atomic.LoadUint64(&o.seq)
subj, msg, _, err := mset.store.Lookup(seq)
if err == nil {
atomic.AddUint64(&o.seq, 1)
o.deliverMsg(mset, subj, msg, seq)
} else if err != ErrStoreMsgNotFound {
s.Warnf("Jetstream internal storage error on lookup: %v", err)
return
} else {
break
}
}
mset.waitForMsgs()
}
}
// Deliver a msg to the observable.
func (o *Observable) deliverMsg(mset *MsgSet, subj string, msg []byte, seq uint64) {
mset.sendq <- &jsPubMsg{o.dsubj, subj, fmt.Sprintf(o.ackReply, seq), msg}
}
// SeqFromReply will extract a sequence number from a reply ack subject.
func (o *Observable) SeqFromReply(reply string) (seq uint64) {
n, err := fmt.Sscanf(reply, o.ackReply, &seq)
if err != nil || n != 1 {
return 0
}
return
}
// Will select the starting sequence.
func (o *Observable) selectStartingSeqNo() {
stats := o.mset.Stats()
noTime := time.Time{}
if o.config.StartSeq == 0 {
if o.config.DeliverAll {
o.seq = stats.FirstSeq
} else if o.config.DeliverLast {
o.seq = stats.LastSeq
} else if o.config.StartTime != noTime {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
o.seq = o.mset.store.GetSeqFromTime(o.config.StartTime)
} else {
// Default is deliver new only.
o.seq = stats.LastSeq + 1
}
} else {
o.seq = o.config.StartSeq
}
if o.seq < stats.FirstSeq {
o.seq = stats.FirstSeq
} else if o.seq > stats.LastSeq {
o.seq = stats.LastSeq + 1
}
}
// Test whether a config represents a durable subscriber.
func isDurableObservable(config *ObservableConfig) bool {
return config != nil && config.Durable != _EMPTY_
}
const randObservableNameLen = 6
func createObservableName() string {
var b [64]byte
rand.Read(b[:])
sha := sha256.New()
sha.Write(b[:])
return fmt.Sprintf("%x", sha.Sum(nil))[:randObservableNameLen]
}
// DeleteObservable will delete the observable from this message set.
func (mset *MsgSet) DeleteObservable(o *Observable) error {
return o.Delete()
}
// Delete will delete the observable for the associated message set.
func (o *Observable) Delete() error {
o.mu.Lock()
// TODO(dlc) - Do cleanup here.
mset := o.mset
o.mset = nil
ackSub := o.ackSub
o.ackSub = nil
o.mu.Unlock()
if mset == nil {
return nil
}
mset.mu.Lock()
// Break us out of the readLoop.
// TODO(dlc) - Should not be bad for small amounts of observables, maybe
// even into thousands. Above that should check what this might do
// performance wise.
mset.sg.Broadcast()
mset.unsubscribe(ackSub)
delete(mset.obs, o.name)
mset.mu.Unlock()
return nil
}
// Checks to see if there is registered interest in the delivery subject.
// Note that since we require delivery to be a literal this is just like
// a publish match.
//
// TODO(dlc) - if gateways are enabled we need to do some more digging for the
// real answer.
func (mset *MsgSet) noInterest(delivery string) bool {
var acc *Account
mset.mu.Lock()
if mset.client != nil {
acc = mset.client.acc
}
mset.mu.Unlock()
if acc == nil {
return true
}
r := acc.sl.Match(delivery)
return len(r.psubs)+len(r.qsubs) == 0
}
func (mset *MsgSet) deliveryFormsCycle(deliverySubject string) bool {
mset.mu.Lock()
defer mset.mu.Unlock()
for _, subject := range mset.config.Subjects {
if subjectIsSubsetMatch(deliverySubject, subject) {
return true
}
}
return false
}

View File

@@ -183,6 +183,7 @@ type Options struct {
Cluster ClusterOpts `json:"cluster,omitempty"`
Gateway GatewayOpts `json:"gateway,omitempty"`
LeafNode LeafNodeOpts `json:"leaf,omitempty"`
JetStream bool `json:"jetstream"`
ProfPort int `json:"-"`
PidFile string `json:"-"`
PortsFileDir string `json:"-"`
@@ -203,6 +204,7 @@ type Options struct {
WriteDeadline time.Duration `json:"-"`
MaxClosedClients int `json:"-"`
LameDuckDuration time.Duration `json:"-"`
// MaxTracedMsgLen is the maximum printable length for traced messages.
MaxTracedMsgLen int `json:"-"`
@@ -3117,34 +3119,35 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&configFile, "c", "", "Configuration file.")
fs.StringVar(&configFile, "config", "", "Configuration file.")
fs.BoolVar(&opts.CheckConfig, "t", false, "Check configuration and exit.")
fs.StringVar(&signal, "sl", "", "Send signal to nats-server process (stop, quit, reopen, reload)")
fs.StringVar(&signal, "signal", "", "Send signal to nats-server process (stop, quit, reopen, reload)")
fs.StringVar(&signal, "sl", "", "Send signal to nats-server process (stop, quit, reopen, reload).")
fs.StringVar(&signal, "signal", "", "Send signal to nats-server process (stop, quit, reopen, reload).")
fs.StringVar(&opts.PidFile, "P", "", "File to store process pid.")
fs.StringVar(&opts.PidFile, "pid", "", "File to store process pid.")
fs.StringVar(&opts.PortsFileDir, "ports_file_dir", "", "Creates a ports file in the specified directory (<executable_name>_<pid>.ports)")
fs.StringVar(&opts.PortsFileDir, "ports_file_dir", "", "Creates a ports file in the specified directory (<executable_name>_<pid>.ports).")
fs.StringVar(&opts.LogFile, "l", "", "File to store logging output.")
fs.StringVar(&opts.LogFile, "log", "", "File to store logging output.")
fs.Int64Var(&opts.LogSizeLimit, "log_size_limit", 0, "Logfile size limit being auto-rotated")
fs.BoolVar(&opts.Syslog, "s", false, "Enable syslog as log method.")
fs.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method..")
fs.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method.")
fs.StringVar(&opts.RemoteSyslog, "r", "", "Syslog server addr (udp://127.0.0.1:514).")
fs.StringVar(&opts.RemoteSyslog, "remote_syslog", "", "Syslog server addr (udp://127.0.0.1:514).")
fs.BoolVar(&showVersion, "version", false, "Print version information.")
fs.BoolVar(&showVersion, "v", false, "Print version information.")
fs.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port")
fs.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port.")
fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")
fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.")
fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.")
fs.StringVar(&opts.Cluster.Advertise, "cluster_advertise", "", "Cluster URL to advertise to other servers.")
fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.")
fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries")
fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries.")
fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.")
fs.BoolVar(&opts.TLS, "tls", false, "Enable TLS.")
fs.BoolVar(&opts.TLSVerify, "tlsverify", false, "Enable TLS with client verification.")
fs.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.")
fs.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.")
fs.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.")
fs.IntVar(&opts.MaxTracedMsgLen, "max_traced_msg_len", 0, "Maximum printable length for traced messages. 0 for unlimited")
fs.IntVar(&opts.MaxTracedMsgLen, "max_traced_msg_len", 0, "Maximum printable length for traced messages. 0 for unlimited.")
fs.BoolVar(&opts.JetStream, "js", false, "Enable JetStream.")
// The flags definition above set "default" values to some of the options.
// Calling Parse() here will override the default options with any value

View File

@@ -22,6 +22,7 @@ type pubArg struct {
pacache []byte
account []byte
subject []byte
deliver []byte
reply []byte
szb []byte
queues [][]byte

View File

@@ -71,6 +71,7 @@ type Info struct {
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
MaxPayload int32 `json:"max_payload"`
JetStream bool `json:"jetstream,omitempty"`
IP string `json:"ip,omitempty"`
CID uint64 `json:"client_id,omitempty"`
ClientIP string `json:"client_ip,omitempty"`
@@ -110,6 +111,7 @@ type Server struct {
listener net.Listener
gacc *Account
sys *internal
js *jetStream
accounts sync.Map
tmpAccounts sync.Map // Temporarily stores accounts that are being built
activeAccounts int32
@@ -185,7 +187,7 @@ type Server struct {
// Trusted public operator keys.
trustedKeys []string
// We use this to minimize mem copies for request to monitoring
// We use this to minimize mem copies for requests to monitoring
// endpoint /varz (when it comes from http).
varzMu sync.Mutex
varz *Varz
@@ -264,9 +266,11 @@ func NewServer(opts *Options) (*Server, error) {
TLSRequired: tlsReq,
TLSVerify: verify,
MaxPayload: opts.MaxPayload,
JetStream: opts.JetStream,
}
now := time.Now()
s := &Server{
kp: kp,
configFile: opts.ConfigFile,
@@ -594,6 +598,24 @@ func (s *Server) generateRouteInfoJSON() {
s.routeInfoJSON = bytes.Join(pcs, []byte(" "))
}
// Determines if we are in operator mode.
// Uses opts instead of server lock.
func (s *Server) inOperatorMode() bool {
return len(s.getOpts().TrustedOperators) > 0
}
// Determines if we are in pre NATS 2.0 setup with no accounts.
// Uses opts instead of server lock.
func (s *Server) globalAccountOnly() bool {
return len(s.getOpts().Accounts) == 0
}
// Determines if this server is in standalone mode, meaning no routes or gateways or leafnodes.
func (s *Server) standAloneMode() bool {
opts := s.getOpts()
return opts.Cluster.Port == 0 && opts.LeafNode.Port == 0 && opts.Gateway.Port == 0
}
// isTrustedIssuer will check that the issuer is a trusted public key.
// This is used to make sure an account was signed by a trusted operator.
func (s *Server) isTrustedIssuer(issuer string) bool {
@@ -809,8 +831,25 @@ func (s *Server) SystemAccount() *Account {
return sacc
}
// GlobalAccount returns the global account.
// Default clients will use the global account.
func (s *Server) GlobalAccount() *Account {
s.mu.Lock()
defer s.mu.Unlock()
return s.gacc
}
// SetDefaultSystemAccount will create a default system account if one is not present.
func (s *Server) SetDefaultSystemAccount() error {
if _, isNew := s.LookupOrRegisterAccount(DEFAULT_SYSTEM_ACCOUNT); !isNew {
return nil
}
s.Debugf("Created system account: %q", DEFAULT_SYSTEM_ACCOUNT)
return s.SetSystemAccount(DEFAULT_SYSTEM_ACCOUNT)
}
// For internal sends.
const internalSendQLen = 4096
const internalSendQLen = 8192
// Assign a system account. Should only be called once.
// This sets up a server to send and receive messages from
@@ -847,11 +886,10 @@ func (s *Server) setSystemAccount(acc *Account) error {
now := time.Now()
s.sys = &internal{
account: acc,
client: &client{srv: s, kind: SYSTEM, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now},
client: s.createInternalSystemClient(),
seq: 1,
sid: 1,
servers: make(map[string]*serverUpdate),
subs: make(map[string]msgHandler),
replies: make(map[string]msgHandler),
sendq: make(chan *pubMsg, internalSendQLen),
resetCh: make(chan struct{}),
@@ -859,8 +897,6 @@ func (s *Server) setSystemAccount(acc *Account) error {
orphMax: 5 * eventsHBInterval,
chkOrph: 3 * eventsHBInterval,
}
s.sys.client.initClient()
s.sys.client.echo = false
s.sys.wg.Add(1)
s.mu.Unlock()
@@ -894,6 +930,39 @@ func (s *Server) setSystemAccount(acc *Account) error {
return nil
}
func (s *Server) systemAccount() *Account {
var sacc *Account
s.mu.Lock()
if s.sys != nil {
sacc = s.sys.account
}
s.mu.Unlock()
return sacc
}
// Creates an internal system client.
func (s *Server) createInternalSystemClient() *client {
return s.createInternalClient(SYSTEM)
}
// Creates and internal jetstream client.
func (s *Server) createInternalJetStreamClient() *client {
return s.createInternalClient(JETSTREAM)
}
// Internal clients. kind should be SYSTEM or JETSTREAM
func (s *Server) createInternalClient(kind int) *client {
if kind != SYSTEM && kind != JETSTREAM {
return nil
}
now := time.Now()
c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}
c.initClient()
c.echo = false
c.flags.set(noReconnect)
return c
}
// Determine if accounts should track subscriptions for
// efficient propagation.
// Lock should be held on entry.
@@ -1167,12 +1236,6 @@ func (s *Server) Start() {
}
}
// Start monitoring if needed
if err := s.StartMonitoring(); err != nil {
s.Fatalf("Can't start monitoring: %v", err)
return
}
// Setup system account which will start the eventing stack.
if sa := opts.SystemAccount; sa != _EMPTY_ {
if err := s.SetSystemAccount(sa); err != nil {
@@ -1185,6 +1248,22 @@ func (s *Server) Start() {
// this server is configured with gateway or not.
s.startGWReplyMapExpiration()
// Check if JetStream has been enabled. This needs to be after
// the system account setup above. JetStream will create its
// own system account if one is not present.
if opts.JetStream {
if err := s.EnableJetStream(nil); err != nil {
s.Fatalf("Can't start jetstream: %v", err)
return
}
}
// Start monitoring if needed
if err := s.StartMonitoring(); err != nil {
s.Fatalf("Can't start monitoring: %v", err)
return
}
// Start up gateway if needed. Do this before starting the routes, because
// we want to resolve the gateway host:port so that this information can
// be sent to other routes.
@@ -2270,7 +2349,7 @@ func (s *Server) getNonLocalIPsIfHostIsIPAny(host string, all bool) (bool, []str
ip = nil
continue
}
s.Debugf(" ip=%s", ipStr)
s.Debugf(" ip=%s", ipStr)
ips = append(ips, ipStr)
if !all {
break

48
server/store.go Normal file
View File

@@ -0,0 +1,48 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"errors"
"time"
)
// StorageType determines how messages are stored for retention.
type StorageType int
const (
// Memory specifies in memory only.
MemoryStorage StorageType = iota
// Disk specifies on disk, designated by the JetStream config StoreDir.
DiskStorage
)
type MsgSetStore interface {
StoreMsg(subj string, msg []byte) (uint64, error)
Lookup(seq uint64) (subj string, msg []byte, ts int64, err error)
GetSeqFromTime(t time.Time) uint64
Stats() MsgSetStats
}
// MsgSetStats are stats about this given message set.
type MsgSetStats struct {
Msgs uint64
Bytes uint64
FirstSeq uint64
LastSeq uint64
}
var (
ErrStoreMsgNotFound = errors.New("no message found")
)

20
server/sysmem/mem_bsd.go Normal file
View File

@@ -0,0 +1,20 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build freebsd openbsd dragonfly netbsd
package sysmem
func Memory() int64 {
return sysctlInt64("hw.physmem")
}

View File

@@ -0,0 +1,20 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build darwin
package sysmem
func Memory() int64 {
return sysctlInt64("hw.memsize")
}

View File

@@ -0,0 +1,27 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build linux
package sysmem
import "syscall"
func Memory() int64 {
var info syscall.Sysinfo_t
err := syscall.Sysinfo(&info)
if err != nil {
return 0
}
return int64(info.Totalram) * int64(info.Unit)
}

View File

@@ -0,0 +1,44 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build windows
import (
"syscall"
"unsafe"
)
// https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/ns-sysinfoapi-memorystatusex
type _memoryStatusEx struct {
dwLength uint32
dwMemoryLoad uint32
ullTotalPhys uint64
unused [6]uint64 // ignore rest of struct
}
func Memory() int64 {
k32, err := syscall.LoadDLL("kernel32.dll")
if err != nil {
return 0
}
globalMemoryStatusEx, err := kernel32.FindProc("GlobalMemoryStatusEx")
if err != nil {
return 0
}
msx := &_memoryStatusEx{dwLength: 64}
res, _, _ := globalMemoryStatusEx.Call(uintptr(unsafe.Pointer(msx)))
if res == 0 {
return 0
}
return msx.ullTotalPhys
}

31
server/sysmem/sysctl.go Normal file
View File

@@ -0,0 +1,31 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build darwin freebsd openbsd dragonfly netbsd
package sysmem
import (
"syscall"
"unsafe"
)
func sysctlInt64(name string) int64 {
s, err := syscall.Sysctl(name)
if err != nil {
return 0
}
// hack because the string conversion above drops a \0
b := []byte(s)
return *(*int64)(unsafe.Pointer(&b[0]))
}

View File

@@ -25,10 +25,12 @@ import (
"math/rand"
"net"
"net/url"
"runtime"
"testing"
"time"
"github.com/nats-io/nats-server/v2/server"
nats "github.com/nats-io/nats.go"
)
const PERF_PORT = 8422
@@ -1394,3 +1396,115 @@ func Benchmark_____RoutedIntGraph(b *testing.B) {
}
b.StopTimer()
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
func Benchmark_JetStreamPubWithAck(b *testing.B) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo"})
if err != nil {
b.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc, err := nats.Connect(s.ClientURL())
if err != nil {
b.Fatalf("Failed to create client: %v", err)
}
defer nc.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
nc.Request("foo", []byte("Hello World!"), 50*time.Millisecond)
}
b.StopTimer()
stats := mset.Stats()
if int(stats.Msgs) != b.N {
b.Fatalf("Expected %d messages, got %d", b.N, stats.Msgs)
}
}
func Benchmark_JetStreamPubNoAck(b *testing.B) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo"})
if err != nil {
b.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc, err := nats.Connect(s.ClientURL())
if err != nil {
b.Fatalf("Failed to create client: %v", err)
}
defer nc.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := nc.Publish("foo", []byte("Hello World!")); err != nil {
b.Fatalf("Unexpected error: %v", err)
}
}
nc.Flush()
b.StopTimer()
stats := mset.Stats()
if int(stats.Msgs) != b.N {
b.Fatalf("Expected %d messages, got %d", b.N, stats.Msgs)
}
}
func Benchmark_JetStreamPubAsyncAck(b *testing.B) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo"})
if err != nil {
b.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc, err := nats.Connect(s.ClientURL(), nats.NoReconnect())
if err != nil {
b.Fatalf("Failed to create client: %v", err)
}
defer nc.Close()
// Put ack stream on its own connection.
anc, err := nats.Connect(s.ClientURL())
if err != nil {
b.Fatalf("Failed to create client: %v", err)
}
defer anc.Close()
acks := nats.NewInbox()
sub, _ := anc.Subscribe(acks, func(m *nats.Msg) {
// Just eat them for this test.
})
// set max pending to unlimited.
sub.SetPendingLimits(-1, -1)
defer sub.Unsubscribe()
anc.Flush()
runtime.GC()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := nc.PublishRequest("foo", acks, []byte("Hello World!")); err != nil {
b.Fatalf("[%d] Unexpected error: %v", i, err)
}
}
nc.Flush()
b.StopTimer()
stats := mset.Stats()
if int(stats.Msgs) != b.N {
b.Fatalf("Expected %d messages, got %d", b.N, stats.Msgs)
}
}

426
test/jetstream_test.go Normal file
View File

@@ -0,0 +1,426 @@
// Copyright 2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"fmt"
"os"
"path/filepath"
"testing"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-server/v2/server/sysmem"
"github.com/nats-io/nats.go"
)
func TestJetStreamBasicNilConfig(t *testing.T) {
s := RunRandClientPortServer()
defer s.Shutdown()
if err := s.EnableJetStream(nil); err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if !s.JetStreamEnabled() {
t.Fatalf("Expected JetStream to be enabled")
}
if s.SystemAccount() == nil {
t.Fatalf("Expected system account to be created automatically")
}
// Grab our config since it was dynamically generated.
config := s.JetStreamConfig()
if config == nil {
t.Fatalf("Expected non-nil config")
}
// Check dynamic max memory.
hwMem := sysmem.Memory()
if hwMem != 0 {
// Make sure its about 75%
est := hwMem / 4 * 3
if config.MaxMemory != est {
t.Fatalf("Expected memory to be 80 percent of system memory, got %v vs %v", config.MaxMemory, est)
}
}
// Check store path, should be tmpdir.
expectedDir := filepath.Join(os.TempDir(), server.JetStreamStoreDir)
if config.StoreDir != expectedDir {
t.Fatalf("Expected storage directory of %q, but got %q", expectedDir, config.StoreDir)
}
// Make sure it was created.
stat, err := os.Stat(config.StoreDir)
if err != nil {
t.Fatalf("Expected the store directory to be present, %v", err)
}
if stat == nil || !stat.IsDir() {
t.Fatalf("Expected a directory")
}
}
func RunBasicJetStreamServer() *server.Server {
opts := DefaultTestOptions
opts.Port = -1
opts.JetStream = true
return RunServer(&opts)
}
func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn {
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
return nc
}
func TestJetStreamEnableAndDisableAccount(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
// Global in simple setup should be enabled already.
if !s.GlobalAccount().JetStreamEnabled() {
t.Fatalf("Expected to have jetstream enabled on global account")
}
if na := s.JetStreamNumAccounts(); na != 1 {
t.Fatalf("Expected 1 account, got %d", na)
}
acc, _ := s.LookupOrRegisterAccount("$FOO")
if err := s.JetStreamEnableAccount(acc, server.JetStreamAccountLimitsNoLimits); err != nil {
t.Fatalf("Did not expect error on enabling account: %v", err)
}
if na := s.JetStreamNumAccounts(); na != 2 {
t.Fatalf("Expected 2 accounts, got %d", na)
}
if err := s.JetStreamDisableAccount(acc); err != nil {
t.Fatalf("Did not expect error on disabling account: %v", err)
}
if na := s.JetStreamNumAccounts(); na != 1 {
t.Fatalf("Expected 1 account, got %d", na)
}
// We should get error if disabling something not enabled.
acc, _ = s.LookupOrRegisterAccount("$BAR")
if err := s.JetStreamDisableAccount(acc); err == nil {
t.Fatalf("Expected error on disabling account that was not enabled")
}
// Should get an error for trying to enable a non-registered account.
acc = server.NewAccount("$BAZ")
if err := s.JetStreamEnableAccount(acc, server.JetStreamAccountLimitsNoLimits); err == nil {
t.Fatalf("Expected error on enabling account that was not registered")
}
if err := s.JetStreamDisableAccount(s.GlobalAccount()); err != nil {
t.Fatalf("Did not expect error on disabling account: %v", err)
}
if na := s.JetStreamNumAccounts(); na != 0 {
t.Fatalf("Expected no accounts, got %d", na)
}
}
func TestJetStreamAddMsgSet(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mconfig := &server.MsgSetConfig{
Name: "foo",
Retention: server.StreamPolicy,
MaxAge: time.Hour,
Storage: server.MemoryStorage,
Replicas: 1,
}
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), mconfig)
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
nc := clientConnectToServer(t, s)
defer nc.Close()
nc.Publish("foo", []byte("Hello World!"))
nc.Flush()
stats := mset.Stats()
if stats.Msgs != 1 {
t.Fatalf("Expected 1 message, got %d", stats.Msgs)
}
if stats.Bytes == 0 {
t.Fatalf("Expected non-zero bytes")
}
nc.Publish("foo", []byte("Hello World Again!"))
nc.Flush()
stats = mset.Stats()
if stats.Msgs != 2 {
t.Fatalf("Expected 2 messages, got %d", stats.Msgs)
}
if err := s.JetStreamDeleteMsgSet(mset); err != nil {
t.Fatalf("Got an error deleting the message set: %v", err)
}
}
func expectOKResponse(t *testing.T, m *nats.Msg) {
t.Helper()
if m == nil {
t.Fatalf("No response, possible timeout?")
}
if string(m.Data) != server.JsOK {
t.Fatalf("Expected a JetStreamPubAck, got %q", m.Data)
}
}
func TestJetStreamBasicAckPublish(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo.*"})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc := clientConnectToServer(t, s)
defer nc.Close()
for i := 0; i < 50; i++ {
resp, _ := nc.Request("foo.bar", []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
}
stats := mset.Stats()
if stats.Msgs != 50 {
t.Fatalf("Expected 50 messages, got %d", stats.Msgs)
}
}
func TestJetStreamRequestEnabled(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
nc := clientConnectToServer(t, s)
defer nc.Close()
resp, _ := nc.Request(server.JsEnabled, nil, time.Second)
expectOKResponse(t, resp)
}
func TestJetStreamCreateObservable(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "foo", Subjects: []string{"foo", "bar"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
// Check for basic errors.
if _, err := mset.AddObservable(nil); err == nil {
t.Fatalf("Expected an error for no config")
}
// Check for delivery subject errors.
// Empty delivery subject
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: ""}); err == nil {
t.Fatalf("Expected an error on empty delivery subject")
}
// No literal delivery subject allowed.
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "foo.*"}); err == nil {
t.Fatalf("Expected an error on bad delivery subject")
}
// Check for cycles
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "foo"}); err == nil {
t.Fatalf("Expected an error on delivery subject that forms a cycle")
}
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "bar"}); err == nil {
t.Fatalf("Expected an error on delivery subject that forms a cycle")
}
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: "*"}); err == nil {
t.Fatalf("Expected an error on delivery subject that forms a cycle")
}
// StartPosition conflicts
if _, err := mset.AddObservable(&server.ObservableConfig{
Delivery: "A",
StartSeq: 1,
StartTime: time.Now(),
}); err == nil {
t.Fatalf("Expected an error on start position conflicts")
}
if _, err := mset.AddObservable(&server.ObservableConfig{
Delivery: "A",
StartTime: time.Now(),
DeliverAll: true,
}); err == nil {
t.Fatalf("Expected an error on start position conflicts")
}
if _, err := mset.AddObservable(&server.ObservableConfig{
Delivery: "A",
DeliverAll: true,
DeliverLast: true,
}); err == nil {
t.Fatalf("Expected an error on start position conflicts")
}
// Non-Durables need to have subscription to delivery subject.
delivery := nats.NewInbox()
if _, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery}); err == nil {
t.Fatalf("Expected an error on unsubscribed delivery subject")
}
// This should work..
nc := clientConnectToServer(t, s)
defer nc.Close()
sub, _ := nc.SubscribeSync(delivery)
defer sub.Unsubscribe()
nc.Flush()
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
if err := mset.DeleteObservable(o); err != nil {
t.Fatalf("Expected no error on delete, got %v", err)
}
}
func TestJetStreamBasicDelivery(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.JetStreamAddMsgSet(s.GlobalAccount(), &server.MsgSetConfig{Name: "MSET", Subjects: []string{"foo.*"}})
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer s.JetStreamDeleteMsgSet(mset)
nc := clientConnectToServer(t, s)
defer nc.Close()
toSend := 100
sendSubj := "foo.bar"
for i := 0; i < toSend; i++ {
resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
}
stats := mset.Stats()
if stats.Msgs != uint64(toSend) {
t.Fatalf("Expected %d messages, got %d", toSend, stats.Msgs)
}
// Now create an observable. Use different connection.
nc2 := clientConnectToServer(t, s)
defer nc2.Close()
delivery := nats.NewInbox()
sub, _ := nc2.SubscribeSync(delivery)
defer sub.Unsubscribe()
nc2.Flush()
o, err := mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverAll: true})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
// Check for our messages.
checkMsgs := func(seqOff int) {
t.Helper()
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != toSend {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend)
}
return nil
})
// Now let's check the messages
for i := 0; i < toSend; i++ {
m, _ := sub.NextMsg(time.Millisecond)
// JetStream will have the subject match the stream subject, not delivery subject.
if m.Subject != sendSubj {
t.Fatalf("Expected original subject of %q, but got %q", sendSubj, m.Subject)
}
// Now check that reply subject exists and has a sequence as the last token.
if seq := o.SeqFromReply(m.Reply); seq != uint64(i+seqOff) {
t.Fatalf("Expected sequence of %d , got %d", i+seqOff, seq)
}
// Ack the message here.
m.Respond(nil)
}
}
checkMsgs(1)
// Now send more and make sure delivery picks back up.
for i := 0; i < toSend; i++ {
resp, _ := nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond)
expectOKResponse(t, resp)
}
stats = mset.Stats()
if stats.Msgs != uint64(toSend*2) {
t.Fatalf("Expected %d messages, got %d", toSend*2, stats.Msgs)
}
checkMsgs(101)
checkSubEmpty := func() {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 0 {
t.Fatalf("Expected sub to have no pending")
}
}
checkSubEmpty()
o.Delete()
// Now check for deliver last, deliver new and deliver by seq.
o, err = mset.AddObservable(&server.ObservableConfig{Delivery: delivery, DeliverLast: true})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 1 {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend)
}
return nil
})
m, _ := sub.NextMsg(time.Millisecond)
if seq := o.SeqFromReply(m.Reply); seq != 200 {
t.Fatalf("Expected sequence to be 200, but got %d", seq)
}
checkSubEmpty()
o.Delete()
o, err = mset.AddObservable(&server.ObservableConfig{Delivery: delivery}) // Default is deliver new only.
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
if m, err := sub.NextMsg(time.Millisecond); err == nil {
t.Fatalf("Expected no msg, got %+v", m)
}
checkSubEmpty()
o.Delete()
// Now try by sequence number.
o, err = mset.AddObservable(&server.ObservableConfig{Delivery: delivery, StartSeq: 101})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
checkMsgs(101)
}

View File

@@ -50,6 +50,12 @@ func RunDefaultServer() *server.Server {
return RunServer(&DefaultTestOptions)
}
func RunRandClientPortServer() *server.Server {
opts := DefaultTestOptions
opts.Port = -1
return RunServer(&opts)
}
// To turn on server tracing and debugging and logging which are
// normally suppressed.
var (