Skip to content

Commit

Permalink
Ensure restores and migrations work properly with RDM. Add restore fr…
Browse files Browse the repository at this point in the history
…om db for HA Clusters. Fixes #2549. Fixes #2649. Fixes #2707
  • Loading branch information
plorenz committed Jan 29, 2025
1 parent ec06ce2 commit 464ee23
Show file tree
Hide file tree
Showing 33 changed files with 1,448 additions and 1,096 deletions.
72 changes: 72 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,74 @@
# Release 1.4

# What's New

* Changes to backup/restore and standalone to HA migrations

## Backup/Restore/HA Migrations

What restoring from a DB snapshot has in common with migrating from a standalone setup to
a RAFT enabled one, is that the controller is changing in a way that the router might not
notice.

Now that routers have a simplified data model, they need know if the controller database
has gone backwards. In the case of a migration to an HA setup, they need to know that
the data model index has changed, likely resetting back to close to zero.

To facilitate this, the database now has a timeline identifier. This is shared among
controllers and is sent to routers along with the data state. When the controller
restores to a snapshot of previous state, or when the the controller moves to a
raft/HA setup, the timeline identifier will change.

When the router requests data model changes, it will send along the current timeline
identifier. If the controller sees that the timeline identifier is different, it knows
to send down the full data state.

### Implementation Notes

In general this is all handled behind the scenes. The current data model index and
timeline identifier can be inspected on controllers and routers using:

```
ziti fabric inspect router-data-model-index
```

**Example**
```
$ ziti fabric inspect router-data-model-index
Results: (3)
ctrl1.router-data-model-index
index: 25
timeline: MMt19ldHR
vEcsw2kJ7Q.router-data-model-index
index: 25
timeline: MMt19ldHR
ctrl2.router-data-model-index
index: 25
timeline: MMt19ldHR
```

Whenever we create a database snapshot now, the snapshot will contain a flag indicating
that the timeline identifier needs to be changed. When a standalone controller starts
up, if that flag is set, the controller changes the timeline identifier and resets the flag.

When an HA cluster is initialized using an existing controller database it also changes the
timeline id.

### HA DB Restore

There's a new command to restore an HA cluster to an older DB snapshot.

```
ziti agent controller restore-from-db </path/to/database.file>
```

Note that when a controller is already up and running and receives a snapshot to apply, it
will move the database into place and then shutdown, expecting to be restarted. This is
because there is caching in various places and restartingi makes sure that everything is
coherent with the changes database.

# Release 1.3.3

# What's New
Expand Down Expand Up @@ -30,6 +101,7 @@

## Component Updates and Bug Fixes


* github.com/openziti/ziti: [v1.3.0 -> v1.3.1](https://github.com/openziti/ziti/compare/v1.3.0...v1.3.1)
* [Issue #2682](https://github.com/openziti/ziti/issues/2682) - HA Controller panics when bootstrapping by setting the db variable in the configuration
* [Issue #2683](https://github.com/openziti/ziti/issues/2683) - Controller fails to save peer configuration on a fresh install
Expand Down
1,736 changes: 883 additions & 853 deletions common/pb/edge_ctrl_pb/edge_ctrl.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions common/pb/edge_ctrl_pb/edge_ctrl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ enum ServicePolicyRelatedEntityType {
message DataState {
repeated Event events = 1;
uint64 endIndex = 2;
string timelineId = 3;

enum Action {
Create = 0;
Expand Down Expand Up @@ -207,6 +208,7 @@ message DataState {
uint64 index = 1;
bool isSynthetic = 2;
repeated Event changes = 3;
string timestampId = 4;
}

message Event {
Expand Down Expand Up @@ -601,4 +603,5 @@ message SubscribeToDataModelRequest {
uint64 currentIndex = 1;
uint32 subscriptionDurationSeconds = 2;
bool renew = 3;
string timelineId = 4;
}
152 changes: 78 additions & 74 deletions common/pb/mgmt_pb/mgmt.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/pb/mgmt_pb/mgmt.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum ContentType {
RaftTransferLeadershipRequestType = 10084;
RaftInitFromDb = 10085;
RaftInit = 10086;
RaftRestoreFromDb = 10087;

// Validate
ValidateTerminatorsRequestType = 10100;
Expand Down
20 changes: 16 additions & 4 deletions common/router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ type RouterDataModel struct {
closeNotify <-chan struct{}
stopNotify chan struct{}
stopped atomic.Bool

// timelineId identifies the database that events are flowing from. This will be reset whenever we change the
// underlying datastore
timelineId string
}

// NewBareRouterDataModel creates a new RouterDataModel that is expected to have no buffers, listeners or subscriptions
Expand All @@ -144,7 +148,7 @@ func NewBareRouterDataModel() *RouterDataModel {

// NewSenderRouterDataModel creates a new RouterDataModel that will store events in a circular buffer of
// logSize. listenerBufferSize affects the buffer size of channels returned to listeners of the data model.
func NewSenderRouterDataModel(logSize uint64, listenerBufferSize uint) *RouterDataModel {
func NewSenderRouterDataModel(timelineId string, logSize uint64, listenerBufferSize uint) *RouterDataModel {
return &RouterDataModel{
EventCache: NewLoggingEventCache(logSize),
ConfigTypes: cmap.New[*ConfigType](),
Expand All @@ -156,6 +160,7 @@ func NewSenderRouterDataModel(logSize uint64, listenerBufferSize uint) *RouterDa
PublicKeys: cmap.New[*edge_ctrl_pb.DataState_PublicKey](),
Revocations: cmap.New[*edge_ctrl_pb.DataState_Revocation](),
listenerBufferSize: listenerBufferSize,
timelineId: timelineId,
}
}

Expand Down Expand Up @@ -200,6 +205,7 @@ func NewReceiverRouterDataModelFromDataState(dataState *edge_ctrl_pb.DataState,
events: make(chan subscriberEvent),
closeNotify: closeNotify,
stopNotify: make(chan struct{}),
timelineId: dataState.TimelineId,
}

go result.processSubscriberEvents()
Expand All @@ -214,7 +220,7 @@ func NewReceiverRouterDataModelFromDataState(dataState *edge_ctrl_pb.DataState,
return result
}

// NewReceiverRouterDataModel creates a new RouterDataModel that does not store events. listenerBufferSize affects the
// NewReceiverRouterDataModelFromExisting creates a new RouterDataModel that does not store events. listenerBufferSize affects the
// buffer size of channels returned to listeners of the data model.
func NewReceiverRouterDataModelFromExisting(existing *RouterDataModel, listenerBufferSize uint, closeNotify <-chan struct{}) *RouterDataModel {
result := &RouterDataModel{
Expand Down Expand Up @@ -308,6 +314,10 @@ func (rdm *RouterDataModel) sendEvent(event *edge_ctrl_pb.DataState_ChangeSet) {
}
}

func (rdm *RouterDataModel) GetTimelineId() string {
return rdm.timelineId
}

// ApplyChangeSet applies the given even to the router data model.
func (rdm *RouterDataModel) ApplyChangeSet(change *edge_ctrl_pb.DataState_ChangeSet) {
changeAccepted := false
Expand Down Expand Up @@ -770,13 +780,15 @@ func (rdm *RouterDataModel) getDataStateAlreadyLocked(index uint64) *edge_ctrl_p
Model: &edge_ctrl_pb.DataState_Event_PublicKey{
PublicKey: v,
},
IsSynthetic: true,
}
events = append(events, newEvent)
})

return &edge_ctrl_pb.DataState{
Events: events,
EndIndex: index,
Events: events,
EndIndex: index,
TimelineId: rdm.timelineId,
}
}

Expand Down
20 changes: 20 additions & 0 deletions controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (self *Controller) bindAgentChannel(binding channel.Binding) error {
binding.AddReceiveHandlerF(int32(mgmt_pb.ContentType_RaftTransferLeadershipRequestType), self.agentOpRaftTransferLeadership)
binding.AddReceiveHandlerF(int32(mgmt_pb.ContentType_RaftInitFromDb), self.agentOpInitFromDb)
binding.AddReceiveHandlerF(int32(mgmt_pb.ContentType_RaftInit), self.agentOpInit)
binding.AddReceiveHandlerF(int32(mgmt_pb.ContentType_RaftRestoreFromDb), self.agentOpRestoreFromDb)

for _, bh := range self.agentBindHandlers {
if err := binding.Bind(bh); err != nil {
Expand Down Expand Up @@ -254,6 +255,25 @@ func (self *Controller) agentOpInitFromDb(m *channel.Message, ch channel.Channel
handler_common.SendOpResult(m, ch, "cluster.init-from-db", fmt.Sprintf("success, initialized from [%v]", sourceDbPath), true)
}

func (self *Controller) agentOpRestoreFromDb(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "cluster.restore-from-db", "controller not running in clustered mode", false)
return
}

sourceDbPath := string(m.Body)
if len(sourceDbPath) == 0 {
handler_common.SendOpResult(m, ch, "cluster.restore-from-db", "source db not supplied", false)
return
}

if err := self.RaftRestoreFromBoltDb(sourceDbPath); err != nil {
handler_common.SendOpResult(m, ch, "cluster.init-from-db", err.Error(), false)
return
}
handler_common.SendOpResult(m, ch, "cluster.init-from-db", fmt.Sprintf("success, initialized from [%v]", sourceDbPath), true)
}

func (self *Controller) agentOpInit(m *channel.Message, ch channel.Channel) {
if self.raftController == nil {
handler_common.SendOpResult(m, ch, "init.edge", "controller not running in clustered mode", false)
Expand Down
15 changes: 8 additions & 7 deletions controller/command/generic_cmds.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package command

import (
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/common/pb/cmd_pb"
"github.com/openziti/ziti/controller/change"
"github.com/openziti/ziti/controller/fields"
"github.com/openziti/ziti/controller/models"
"github.com/openziti/ziti/common/pb/cmd_pb"
"github.com/openziti/storage/boltz"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -145,18 +145,19 @@ func (self *DeleteEntityCommand) GetChangeContext() *change.Context {
}

type SyncSnapshotCommand struct {
SnapshotId string
TimelineId string
Snapshot []byte
SnapshotSink func(cmd *SyncSnapshotCommand) error
SnapshotSink func(cmd *SyncSnapshotCommand, index uint64) error
}

func (self *SyncSnapshotCommand) Apply(boltz.MutateContext) error {
return self.SnapshotSink(self)
func (self *SyncSnapshotCommand) Apply(ctx boltz.MutateContext) error {
changeCtx := change.FromContext(ctx.Context())
return self.SnapshotSink(self, changeCtx.RaftIndex)
}

func (self *SyncSnapshotCommand) Encode() ([]byte, error) {
return cmd_pb.EncodeProtobuf(&cmd_pb.SyncSnapshotCommand{
SnapshotId: self.SnapshotId,
SnapshotId: self.TimelineId,
Snapshot: self.Snapshot,
})
}
Expand Down
4 changes: 4 additions & 0 deletions controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (self *Config) ToJson() (string, error) {
return string(b), err
}

func (self *Config) IsRaftEnabled() bool {
return self.Raft != nil
}

// CtrlOptions extends channel.Options to include support for additional, non-channel specific options
// (e.g. NewListener)
type CtrlOptions struct {
Expand Down
67 changes: 63 additions & 4 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/openziti/ziti/controller/xt_weighted"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/teris-io/shortid"
"math/big"
"os"
"sync"
Expand Down Expand Up @@ -230,7 +231,7 @@ func NewController(cfg *config.Config, versionProvider versions.VersionProvider)

c.xweb = xweb.NewDefaultInstance(c.xwebFactoryRegistry, c.config.Id)

if cfg.Raft != nil {
if cfg.IsRaftEnabled() {
c.raftController = raft.NewController(c, c)
if err := c.raftController.Init(); err != nil {
log.WithError(err).Panic("error starting raft")
Expand Down Expand Up @@ -665,21 +666,79 @@ func (c *Controller) InitializeRaftFromBoltDb(sourceDbPath string) error {
}
}()

log.Infof("initializing from bolt db [%v]", sourceDbPath)
timelineId, err := sourceDb.GetTimelineId(boltz.TimelineModeForceReset, shortid.Generate)
if err != nil {
return err
}
log.WithField("timelineId", timelineId).WithField("path", sourceDbPath).Info("initializing from bolt db")

buf := &bytes.Buffer{}
gzWriter := gzip.NewWriter(buf)
snapshotId, err := sourceDb.SnapshotToWriter(gzWriter)
if err = sourceDb.StreamToWriter(gzWriter); err != nil {
return err
}

if err = gzWriter.Close(); err != nil {
return errors.Wrap(err, "error finishing gz compression of migration snapshot")
}

c.env.InitTimelineId(timelineId)

cmd := &command.SyncSnapshotCommand{
TimelineId: timelineId,
Snapshot: buf.Bytes(),
SnapshotSink: c.network.RestoreSnapshot,
}

if err = c.raftController.Bootstrap(); err != nil {
return fmt.Errorf("unable to bootstrap cluster (%w)", err)
}

return c.raftController.Dispatch(cmd)
}

func (c *Controller) RaftRestoreFromBoltDb(sourceDbPath string) error {
log := pfxlog.Logger()

if c.raftController == nil {
return errors.New("can't initialize non-raft controller using initialize from db")
}

if _, err := os.Stat(sourceDbPath); err != nil {
if os.IsNotExist(err) {
return errors.Wrapf(err, "source db not found at [%v]", sourceDbPath)
}
return errors.Wrapf(err, "invalid db path [%v]", sourceDbPath)
}

sourceDb, err := db.Open(sourceDbPath)
if err != nil {
return err
}
defer func() {
if err = sourceDb.Close(); err != nil {
log.WithError(err).Error("error closing migration source bolt db")
}
}()

timelineId, err := sourceDb.GetTimelineId(boltz.TimelineModeForceReset, shortid.Generate)
if err != nil {
return err
}
log.WithField("timelineId", timelineId).WithField("path", sourceDbPath).Info("restoring from bolt db")

buf := &bytes.Buffer{}
gzWriter := gzip.NewWriter(buf)
if err = sourceDb.StreamToWriter(gzWriter); err != nil {
return err
}

if err = gzWriter.Close(); err != nil {
return errors.Wrap(err, "error finishing gz compression of migration snapshot")
}

cmd := &command.SyncSnapshotCommand{
SnapshotId: snapshotId,
TimelineId: timelineId,
Snapshot: buf.Bytes(),
SnapshotSink: c.network.RestoreSnapshot,
}
Expand Down
2 changes: 1 addition & 1 deletion controller/db/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (stores *Stores) CheckIntegrity(db boltz.Db, ctx context.Context, fix bool,
func (stores *Stores) CheckIntegrityInTx(db boltz.Db, ctx boltz.MutateContext, fix bool, errorHandler func(error, bool)) error {
if fix {
pfxlog.Logger().Info("creating database snapshot before attempting to fix data integrity issues")
if err := db.Snapshot(ctx.Tx()); err != nil {
if _, _, err := db.SnapshotInTx(ctx.Tx(), db.GetDefaultSnapshotPath()); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 464ee23

Please sign in to comment.