mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Pulled out K/V support for now
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -139,12 +139,6 @@ const (
|
||||
// JetStreamMsgBySeqPre is the prefix for direct requests for a message by message set sequence number.
|
||||
JetStreamMsgBySeqPre = "$JS.BYSEQ"
|
||||
|
||||
// JetStreamLastMsg is used for direct access to the last message of a message set.
|
||||
JetStreamLastMsg = "$JS.%s.LAST"
|
||||
|
||||
// JetStreamUpdateMsgWithRevision is used to update a message set with a new message iff revision/sequence matches last one.
|
||||
JetStreamUpdateMsgWithRevision = "$JS.%s.UPDATE"
|
||||
|
||||
// JetStreamObservableAckSamplePre is the prefix for sample messages from observables
|
||||
JetStreamObservableAckSamplePre = "$JS.OBSERVABLE.ACKSAMPLE"
|
||||
)
|
||||
|
||||
@@ -264,17 +264,6 @@ func (mset *MsgSet) subscribeToMsgSet() error {
|
||||
if _, err := mset.subscribeInternal(subj, mset.processMsgBySeq); err != nil {
|
||||
return err
|
||||
}
|
||||
// For direct access to the last message.
|
||||
subj = fmt.Sprintf(JetStreamLastMsg, mset.config.Name)
|
||||
if _, err := mset.subscribeInternal(subj, mset.processGetLastMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
// This is to update and add a new message iff the last revision/sequence number matches. The
|
||||
// payload here should be a StoredMsg.
|
||||
subj = fmt.Sprintf(JetStreamUpdateMsgWithRevision, mset.config.Name)
|
||||
if _, err := mset.subscribeInternal(subj, mset.processUpdateLastMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -340,56 +329,6 @@ func (mset *MsgSet) setupStore(storeDir string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// processUpdateLastMsg will add a new message to a message set iff the sequence is the last sequence.
|
||||
func (mset *MsgSet) processUpdateLastMsg(_ *subscription, c *client, subject, reply string, msg []byte) {
|
||||
var sm StoredMsg
|
||||
if err := json.Unmarshal(msg, &sm); err != nil {
|
||||
c.Debugf(" Error unmarshalling update for last message: %v", err)
|
||||
response := []byte("-ERR 'could not unmarshal update request'")
|
||||
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
|
||||
return
|
||||
}
|
||||
mset.mu.Lock()
|
||||
last := mset.store.Stats().LastSeq
|
||||
mset.mu.Unlock()
|
||||
|
||||
if last != sm.Sequence {
|
||||
response := []byte("-ERR 'revision did not match last'")
|
||||
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
|
||||
return
|
||||
}
|
||||
// FIXME(dlc) - This is not guaranteed. Need to rework.
|
||||
mset.processInboundJetStreamMsg(nil, c, sm.Subject, reply, sm.Data)
|
||||
}
|
||||
|
||||
// processGetLastMsg will return the last message, or an -ERR if not found.
|
||||
func (mset *MsgSet) processGetLastMsg(_ *subscription, _ *client, subject, reply string, _ []byte) {
|
||||
mset.mu.Lock()
|
||||
store := mset.store
|
||||
c := mset.client
|
||||
name := mset.config.Name
|
||||
mset.mu.Unlock()
|
||||
|
||||
var response []byte
|
||||
|
||||
seq := store.Stats().LastSeq
|
||||
subj, msg, ts, err := store.LoadMsg(seq)
|
||||
if err != nil {
|
||||
c.Debugf("JetStream request for last message: %q - %q - %d error %v", c.acc.Name, name, seq, err)
|
||||
response = []byte("-ERR 'could not load last message from storage'")
|
||||
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
|
||||
return
|
||||
}
|
||||
sm := &StoredMsg{
|
||||
Subject: subj,
|
||||
Sequence: seq,
|
||||
Data: msg,
|
||||
Time: time.Unix(0, ts),
|
||||
}
|
||||
response, _ = json.MarshalIndent(sm, "", " ")
|
||||
mset.sendq <- &jsPubMsg{reply, _EMPTY_, _EMPTY_, response, nil, 0}
|
||||
}
|
||||
|
||||
// processMsgBySeq will return the message at the given sequence, or an -ERR if not found.
|
||||
func (mset *MsgSet) processMsgBySeq(_ *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
mset.mu.Lock()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019 The NATS Authors
|
||||
// Copyright 2019-2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -3962,116 +3962,6 @@ func TestJetStreamTemplateFileStoreRecovery(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamKVBasics(t *testing.T) {
|
||||
s := RunBasicJetStreamServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
config := s.JetStreamConfig()
|
||||
if config == nil {
|
||||
t.Fatalf("Expected non-nil config")
|
||||
}
|
||||
defer os.RemoveAll(config.StoreDir)
|
||||
|
||||
acc := s.GlobalAccount()
|
||||
|
||||
mcfg := &server.MsgSetConfig{
|
||||
Subjects: []string{"kv.*"},
|
||||
Retention: server.StreamPolicy,
|
||||
MaxAge: time.Hour,
|
||||
MaxMsgs: 5,
|
||||
Storage: server.MemoryStorage,
|
||||
Replicas: 1,
|
||||
MaxMsgSize: 1024,
|
||||
}
|
||||
template := &server.StreamTemplateConfig{
|
||||
Name: "kv",
|
||||
Config: mcfg,
|
||||
MaxMsgSets: 1024,
|
||||
}
|
||||
|
||||
if _, err := acc.AddStreamTemplate(template); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
nc := clientConnectToServer(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
type Value struct {
|
||||
Data []byte
|
||||
Revision uint64
|
||||
}
|
||||
|
||||
kvPut := func(key string, value []byte) error {
|
||||
subj := "kv." + key
|
||||
sendStreamMsg(t, nc, subj, string(value))
|
||||
return nil
|
||||
}
|
||||
kvGet := func(key string) (Value, error) {
|
||||
subj := fmt.Sprintf(server.JetStreamLastMsg, "kv."+key)
|
||||
resp, err := nc.Request(subj, nil, time.Second)
|
||||
if err != nil {
|
||||
return Value{}, err
|
||||
}
|
||||
if strings.HasPrefix(string(resp.Data), "-ERR ") {
|
||||
return Value{}, fmt.Errorf("%s", resp.Data)
|
||||
}
|
||||
var raw server.StoredMsg
|
||||
if err := json.Unmarshal(resp.Data, &raw); err != nil {
|
||||
return Value{}, err
|
||||
}
|
||||
v := Value{Data: raw.Data, Revision: raw.Sequence}
|
||||
return v, nil
|
||||
}
|
||||
kvUpdate := func(key string, value []byte, revision uint64) error {
|
||||
sm := server.StoredMsg{
|
||||
Subject: "kv." + key,
|
||||
Sequence: revision,
|
||||
Data: value,
|
||||
}
|
||||
req, _ := json.MarshalIndent(sm, "", " ")
|
||||
subj := fmt.Sprintf(server.JetStreamUpdateMsgWithRevision, "kv."+key)
|
||||
resp, err := nc.Request(subj, req, time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if string(resp.Data) != server.OK {
|
||||
return fmt.Errorf("%s", string(resp.Data))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
kvDelete := func(key string) error {
|
||||
mname := "kv." + key
|
||||
mset, err := s.GlobalAccount().LookupMsgSet(mname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mset.Purge()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sample of how K/V may work, with set/put, get and update.
|
||||
kvPut("derek", []byte("22"))
|
||||
if v, err := kvGet("derek"); err != nil || string(v.Data) != "22" {
|
||||
t.Fatalf("Expected %q, got %q", "22", v.Data)
|
||||
}
|
||||
kvPut("derek", []byte("33"))
|
||||
// Now try to update with the current revision, should fail.
|
||||
if err := kvUpdate("derek", []byte("44"), 1); err == nil {
|
||||
t.Fatalf("Update with bad revision should have failed")
|
||||
}
|
||||
// This one should work.
|
||||
if err := kvUpdate("derek", []byte("88"), 2); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if v, err := kvGet("derek"); err != nil || string(v.Data) != "88" {
|
||||
t.Fatalf("Expected %q, got %q", "88", v.Data)
|
||||
}
|
||||
kvDelete("derek")
|
||||
if _, err := kvGet("derek"); err == nil {
|
||||
t.Fatalf("Eexpected an error but got none")
|
||||
}
|
||||
}
|
||||
|
||||
// Benchmark placeholder
|
||||
func TestJetStreamPubSubPerf(t *testing.T) {
|
||||
// Uncomment to run, holding place for now.
|
||||
|
||||
Reference in New Issue
Block a user