Merge pull request #2585 from nats-io/rollups

Rollup functionality.
This commit is contained in:
Derek Collison
2021-09-30 04:05:17 -07:00
committed by GitHub
2 changed files with 112 additions and 0 deletions

View File

@@ -16,6 +16,7 @@ package server
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
@@ -25,6 +26,7 @@ import (
"path"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
@@ -9006,6 +9008,82 @@ func TestJetStreamSeal(t *testing.T) {
t.Run("Clustered", func(t *testing.T) { testSeal(t, c.randomServer(), 3) })
}
func TestJetStreamRollups(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
cfg := &nats.StreamConfig{
Name: "SENSORS",
Subjects: []string{"sensor.*.temp"},
MaxMsgsPerSubject: 10,
Replicas: 2,
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var bt [16]byte
var le = binary.LittleEndian
// Generate 1000 random measurements for 10 sensors
for i := 0; i < 1000; i++ {
id, temp := strconv.Itoa(rand.Intn(9)+1), rand.Int31n(42)+60 // 60-102 degrees.
le.PutUint16(bt[0:], uint16(temp))
js.PublishAsync(fmt.Sprintf("sensor.%v.temp", id), bt[:])
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
// Grab random sensor and do a rollup by averaging etc.
sensor := fmt.Sprintf("sensor.%v.temp", strconv.Itoa(rand.Intn(9)+1))
sub, err := js.SubscribeSync(sensor)
require_NoError(t, err)
var total, samples int
for m, err := sub.NextMsg(time.Second); err == nil; m, err = sub.NextMsg(time.Second) {
total += int(le.Uint16(m.Data))
samples++
}
sub.Unsubscribe()
avg := uint16(total / samples)
le.PutUint16(bt[0:], avg)
rollup := nats.NewMsg(sensor)
rollup.Data = bt[:]
rollup.Header.Set(JSMsgRollup, JSMsgRollupSubject)
_, err = js.PublishMsg(rollup)
require_NoError(t, err)
sub, err = js.SubscribeSync(sensor)
require_NoError(t, err)
// Make sure only 1 left.
checkSubsPending(t, sub, 1)
sub.Unsubscribe()
// Now do all.
rollup.Header.Set(JSMsgRollup, JSMsgRollupAll)
_, err = js.PublishMsg(rollup)
require_NoError(t, err)
// Same thing as above should hold true.
sub, err = js.SubscribeSync(sensor)
require_NoError(t, err)
// Make sure only 1 left.
checkSubsPending(t, sub, 1)
sub.Unsubscribe()
// Also should only be 1 msgs in total stream left with JSMsgRollupAll
si, err := js.StreamInfo("SENSORS")
require_NoError(t, err)
if si.State.Msgs != 1 {
t.Fatalf("Expected only 1 msg left after rollup all, got %+v", si.State)
}
}
// Support functions
// Used to setup superclusters for tests.

View File

@@ -221,6 +221,13 @@ const (
JSLastConsumerSeq = "Nats-Last-Consumer"
JSLastStreamSeq = "Nats-Last-Stream"
JSConsumerStalled = "Nats-Consumer-Stalled"
JSMsgRollup = "Nats-Rollup"
)
// Rollups, can be subject only or all messages.
const (
JSMsgRollupSubject = "sub"
JSMsgRollupAll = "all"
)
// Dedupe entry
@@ -2514,6 +2521,15 @@ func getExpectedLastSeq(hdr []byte) uint64 {
return uint64(parseInt64(bseq))
}
// Fast lookup of rollups.
func getRollup(hdr []byte) string {
r := getHeader(JSMsgRollup, hdr)
if len(r) == 0 {
return _EMPTY_
}
return strings.ToLower(string(r))
}
// Fast lookup of expected stream sequence per subject.
func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) {
bseq := getHeader(JSExpectedLastSubjSeq, hdr)
@@ -2693,6 +2709,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Process additional msg headers if still present.
var msgId string
var rollupSub, rollupAll bool
if len(hdr) > 0 {
outq := mset.outq
@@ -2772,6 +2790,17 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, lseq)
}
}
// Check for any rollups.
if rollup := getRollup(hdr); rollup != _EMPTY_ {
switch rollup {
case JSMsgRollupSubject:
rollupSub = true
case JSMsgRollupAll:
rollupAll = true
default:
return fmt.Errorf("rollup value invalid: %q", rollup)
}
}
}
// Response Ack.
@@ -2931,6 +2960,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if msgId != _EMPTY_ {
mset.storeMsgId(&ddentry{msgId, seq, ts})
}
if rollupSub {
mset.purge(&JSApiStreamPurgeRequest{Subject: subject, Keep: 1})
} else if rollupAll {
mset.purge(&JSApiStreamPurgeRequest{Keep: 1})
}
if canRespond {
response = append(pubAck, strconv.FormatUint(seq, 10)...)
response = append(response, '}')