Merge pull request #1855 from nats-io/memstore

[FIXED] Fixed bug to allow memory store for clustering
This commit is contained in:
Derek Collison
2021-01-27 17:26:41 -07:00
committed by GitHub
2 changed files with 70 additions and 24 deletions

View File

@@ -897,7 +897,7 @@ func (rg *raftGroup) setPreferred() {
}
// createRaftGroup is called to spin up this raft group if needed.
func (js *jetStream) createRaftGroup(rg *raftGroup) {
func (js *jetStream) createRaftGroup(rg *raftGroup) error {
js.mu.Lock()
defer js.mu.Unlock()
@@ -906,31 +906,31 @@ func (js *jetStream) createRaftGroup(rg *raftGroup) {
// If this is a single peer raft group or we are not a member return.
if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) {
// Nothing to do here.
return
return nil
}
// We already have this assigned.
if node := s.lookupRaftNode(rg.Name); node != nil {
s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name)
return
return nil
}
s.Debugf("JetStream cluster creating raft group:%+v", rg)
sysAcc := s.SystemAccount()
if sysAcc == nil {
s.Debugf("JetStream cluster detected shutdown processing raft group:%+v", rg)
return
s.Debugf("JetStream cluster detected shutdown processing raft group: %+v", rg)
return errors.New("shutting down")
}
stateDir := path.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name)
fs, bootstrap, err := newFileStore(
FileStoreConfig{StoreDir: stateDir, BlockSize: 32 * 1024 * 1024},
StreamConfig{Name: rg.Name, Storage: rg.Storage},
StreamConfig{Name: rg.Name, Storage: FileStorage},
)
if err != nil {
s.Errorf("Error creating filestore: %v", err)
return
return err
}
cfg := &RaftConfig{Name: rg.Name, Store: stateDir, Log: fs}
@@ -941,7 +941,7 @@ func (js *jetStream) createRaftGroup(rg *raftGroup) {
n, err := s.startRaftNode(cfg)
if err != nil {
s.Debugf("Error creating raft group: %v", err)
return
return err
}
rg.node = n
@@ -949,6 +949,8 @@ func (js *jetStream) createRaftGroup(rg *raftGroup) {
if n.ID() == rg.Preferred {
n.Campaign()
}
return nil
}
func (mset *Stream) raftNode() RaftNode {
@@ -1325,33 +1327,21 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) {
// Check if this is for us..
if isMember {
js.processClusterCreateStream(sa)
js.processClusterCreateStream(acc, sa)
}
}
// processClusterCreateStream is called when we have a stream assignment that
// has been committed and this server is a member of the peer group.
func (js *jetStream) processClusterCreateStream(sa *streamAssignment) {
func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignment) {
if sa == nil {
return
}
js.mu.RLock()
s := js.srv
acc, err := s.LookupAccount(sa.Client.Account)
if err != nil {
s.Warnf("JetStream cluster failed to lookup account %q: %v", sa.Client.Account, err)
js.mu.RUnlock()
return
}
rg := sa.Group
s, rg := js.srv, sa.Group
js.mu.RUnlock()
// Process the raft group and make sure it's running if needed.
js.createRaftGroup(rg)
var mset *Stream
// If we are restoring, create the stream if we are R>1 and not the preferred who handles the
// receipt of the snapshot itself.
shouldCreate := true
@@ -1360,8 +1350,14 @@ func (js *jetStream) processClusterCreateStream(sa *streamAssignment) {
shouldCreate = false
}
}
var mset *Stream
// Process the raft group and make sure it's running if needed.
err := js.createRaftGroup(rg)
// Process here if not restoring or not the leader.
if shouldCreate {
if shouldCreate && err == nil {
// Go ahead and create or update the stream.
mset, err = acc.LookupStream(sa.Config.Name)
if err == nil && mset != nil {

View File

@@ -247,6 +247,56 @@ func TestJetStreamClusterMultiReplicaStreams(t *testing.T) {
}
}
func TestJetStreamClusterMemoryStore(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3M", 3)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// FIXME(dlc) - This should be default.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
Storage: nats.MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Send in 100 messages.
msg, toSend := []byte("Hello MemoryStore"), 100
for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
// Now grab info for this stream.
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "TEST" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
if si.Cluster == nil || len(si.Cluster.Replicas) != 2 {
t.Fatalf("Cluster info is incorrect: %+v", si.Cluster)
}
// Check active state as well, shows that the owner answered.
if si.State.Msgs != uint64(toSend) {
t.Fatalf("Expected %d msgs, got bad state: %+v", toSend, si.State)
}
// Do a normal sub.
sub, err := js.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
checkSubsPending(t, sub, toSend)
}
func TestJetStreamClusterCompaction(t *testing.T) {
// This test takes a long time to observe compactions.
// Once moved to server we can adjust and re-enable.