Skip to content

Model persister that can emulate a device shadow. Pluggable storage, diff/merge, loggers, advanced filtering/notification mechanism with implementations in memory/DynamoDB/streams.

License

Notifications You must be signed in to change notification settings

mariotoffia/godeviceshadow

Repository files navigation

Go Device Shadow (godeviceshadow)

Introduction

This is an implementation of a device shadow storage and notification for DynamoDB and SQS/SNS. It is pluggable for other storage/notifications.

Modules

godeviceshadow

This is the root module. 💡The core runtime do not have any dependencies except for the go framework. Only plugins may have dependencies.💡

go get github.com/mariotoffia/godeviceshadow@latest
dynamodbpersistence

This module persist the models to DynamoDB. Check the README for more information.

go get github.com/mariotoffia/godeviceshadow/persistence/dynamodbpersistence@latest
dynamodbnotifier

DynamoDB Streams Listener/Notifier. Check the README for more information.

go get github.com/mariotoffia/godeviceshadow/notify/dynamodbnotifier@latest
selectlang

Experimental DSL to render notification filter. Please see the README for more information.

go get github.com/mariotoffia/godeviceshadow/notify/selectlang@latest

Quick Start

This is a somewhat emulation of the existing IoT Core Device Shadow. However, it separates the Reported and Desired states into two different sort keys to allow for more data and better querying and better performance if e.g. most of the time Reported is updated. It is possible to direct the persistence to use combined or separate storage.

It handles reported and desired as with a device shadow but also allows to use ServerIsMaster or ClientIsMaster to control how merging of reported and desired are performed.

Most are pluggable and components may be used separately or it is possible to write your own manager or all other components. E.g. just use storage to read and write models and diff them.

The storage is pluggable, but it comes with a DynamoDB and in-memory implementation out of the box. It is possible to write your own loggers to participate in the diff/merge process to e.g. update a timeseries database upon add/update of sensor data etc.

Example Report, Desire & Loggers
ctx := context.Background()
now := time.Now()
mgr := // (1)

id := persistencemodel.ID{ID: "device123", Name: "homeHub"}

res := mgr.Report(ctx, managermodel.ReportOperation{ // (2)
  ClientID: "myClient",
  Version:  0, // (3)
  ID: id,
  Model: TestModel{
    TimeZone: tz, Sensors: map[string]Sensor{ "temp": {Value: 23.4, TimeStamp: now} },
  },
})

chl := changelogger.Find(res[0].MergeLoggers) // (4)
sns, err := chl.ManagedFromPath(`Sensors\..*`)
sensors := sns.All()

fmt.Printf("%s: %s", sensors[0].Path, sensors[0].NewValue.GetTimestamp().Format(time.RFC3339)) // (5)

res2 := mgr.Desire(ctx, managermodel.DesireOperation{ // (6)
  ClientID: "myClient",
  ID: id,
  Model: TestModel{
    TimeZone: tz, Sensors: map[string]Sensor{ "sp": {Value: 99.2, TimeStamp: now} } },
})

res = mgr.Report(ctx, managermodel.ReportOperation{
  ClientID: "myClient",
  Version:  0, // (7)
  ID: id,
  Model: TestModel{
    TimeZone: tz, Sensors: map[string]Sensor{ "sp": {Value: 99.2, TimeStamp: now} }, // (7)
  },
})
  1. Manager created elsewhere (see below example).

  2. Report the model and thereby merge with model in the persistence and ensure any desired acknowledgements are done. In this case nothing is persisted so it will create a new model and merge it.

  3. The version is 0 and will be incremented by the system. If using zero it will always use the latest version to merge with. If explicit version is use, it will only merge if the version is the same (and then increment the version).

  4. Find the change logger and extract the managed values from the path Sensors\..* (Regular Expression).

  5. Outputs e.q. Sensors.temp: 2025-01-22T13:22:26+01:00

  6. Will add sp to desired state (it is possible to have merge loggers here as well to listen for desire merge).

  7. Will acknowledge and therefore remove it from desired state (can be listen to, just supply desire loggers).

The above sample shows how to report and desired a certain value including how to access a merge logger. It relies on a manager to be configured and built.

Example Create a In-Memory Persistence Manager
mgr := stdmgr.New(). // (1)
  WithPersistence(mempersistence.New()). // (2)
  WithSeparation(persistencemodel.SeparateModels). // (3)
  WithReportedLoggers(changelogger.New()). // (4)
  WithTypeRegistryResolver( // (5)
    types.NewRegistry().RegisterResolver(
      model.NewResolveFunc(func(id, name string) (model.TypeEntry, bool) {
        if name == "homeHub" { // (5)
          return model.TypeEntry{
            Name: "homeHub", Model: reflect.TypeOf(TestModel{}),
          }, true
        }

        return model.TypeEntry{}, false
      }),
    ),
  ).
  Build()
  1. Create a builder to create a new manager.

  2. Use in-memory persistence. Swap this e.g. for DynamoDB persistence via dynamodbpersistence.New(…​).

  3. Separate the model persistence by default - can be overridden on each write operation. Default is to combine desired and reported in persistence. However, it is up to persistence how adhere to this.

  4. Use the change logger to log changes both managed and plain values for post examination. This registers the New function so a new logger is always created on each report. It is possible to create your own or use existing merge loggers to participate in the merge.*

  5. There are a few ways of resolving what type (used in read operation) the model is in. This registers a on-the-fly resolver.

It is then possible to notify using a notification manager. Then it is possible to define selection that will resolve to a target. Thus where to notify may be heavily customized. There is a experimental DSL that can render the selectors and hence not needed to code those (even though they are super simple - just one function).

Example Notification Selection DSL
stmt := `(
      id: /myDevice-\d+/ AND // (1)
      name: 'homeHub' AND
      operation: report,desired
    )
    AND
    (add,update:/^Sensors.indoor-\d+$/ == 'temp'  // (2)
    WHERE ( // (3)
      value > 20 OR (value == /^re-\d+/ AND value != 'apa' OR (value > 99 AND value != /^bubben-\d+$/)))
    )
    OR
    (acknowledge) // (4)
`
  1. One or more primary expressions that matches the ID and which operation.

  2. Zero or more log expressions that interacts with the values being handled

  3. Log expressions may have as many constraints as needed. It is possible to mix value expressions and it will capture only values that it may do with the expression. For example float values will be converted to string when regex etc.

  4. It is also possible to select all acknowledged values

When a Selection returns true, the target may be invoked.

Note
🚨 The DSL is experimental and may change in the future.

The Selection may be used to capture a set of values. Just submit true on the value parameter when processing. Thus, they may be used outside the notification mechanism.

TIP 💡: View All Examples

To view all examples, visit the Examples directory.

Core Concepts

This is a model runtime and not a plain JSON runtime, thus it handles golang models. The main interface is the model.ValueAndTimestamp of which it uses to discover variables and handle them.

ValueAndTimestamp Interface

// ValueAndTimestamp is the interface that fields must implement if they
// support timestamp-based merging.
type ValueAndTimestamp interface {
  // GetTimestamp will return the timestamp associated with the value. This is
  // used to determine which value is newer when a merge is commenced.
  GetTimestamp() time.Time
  // GetValue will return the value that the timestamp is associated with.
  //
  // If multiple values, the instance itself is the value and this method
  // will return the _"default"_ value. If the value is a map[string]any
  // it will return all values where the key is the name of the value.
  //
  // The latter gives the caller a way of knowing what values are relevant
  // to e.g. log instead of iterate the whole struct.
  GetValue() any
}

Those may be anywhere in a structs, maps etc. The system will iterate all and handle all such elements.

Example "DeviceShadow" Model
type HomeTemperatureHub struct {
  *MetaInfo      `json:"meta,omitempty"`
  ClimateSensors *ClimateSensors            `json:"climate,omitempty"`
  IndoorTempSP   *IndoorTemperatureSetPoint `json:"indoor_temp_sp,omitempty"` // Important omitempty when used in desired
}

type MetaInfo struct {
  TimeZone string `json:"tz,omitempty"`
  Owner    string `json:"owner,omitempty"`
}

type Direction string

const (
  DirectionNorth Direction = "north"
  DirectionSouth Direction = "south"
  DirectionEast  Direction = "east"
  DirectionWest  Direction = "west"
)

type IndoorTemperatureSensor struct {
  Floor       int       `json:"floor"`
  Direction   Direction `json:"direction"`
  Temperature float64   `json:"t"`
  Humidity    float64   `json:"h"`
  UpdatedAt   time.Time `json:"ts"`
}

func (idt *IndoorTemperatureSensor) GetTimestamp() time.Time {
  return idt.UpdatedAt
}

func (idt *IndoorTemperatureSensor) GetValue() any {
  return map[string]any{ // (1)
    "floor":       idt.Floor,
    "direction":   idt.Direction,
    "temperature": idt.Temperature,
    "humidity":    idt.Humidity,
  }
}

type OutdoorTemperatureSensor struct {
  Direction   Direction `json:"direction"`
  Temperature float64   `json:"t"`
  Humidity    float64   `json:"h"`
  UpdatedAt   time.Time `json:"ts"`
}

func (ots *OutdoorTemperatureSensor) GetTimestamp() time.Time {
  return ots.UpdatedAt // (2)
}

func (ots *OutdoorTemperatureSensor) GetValue() any {
  return map[string]any{
    "direction":   ots.Direction,
    "temperature": ots.Temperature,
    "humidity":    ots.Humidity,
  }
}

type IndoorTemperatureSetPoint struct {
  SetPoint  float64   `json:"sp"`
  UpdatedAt time.Time `json:"ts"`
}

func (sp *IndoorTemperatureSetPoint) GetTimestamp() time.Time {
  return sp.UpdatedAt
}

func (sp *IndoorTemperatureSetPoint) GetValue() any {
  return sp.SetPoint
}

type ClimateSensors struct {
  Outdoor map[string]OutdoorTemperatureSensor `json:"outdoor,omitempty"`
  Indoor  map[string]IndoorTemperatureSensor  `json:"indoor,omitempty"`
}
  1. When map, it will check all values to determine if any value change has occurred, otherwise just return a plain value.

  2. This is the timestamp it will use to determine if the value is newer or older (or same).

Device Shadow Layout

The device shadow is rather alike the IoT Core Device Shadow but with a few differences. It can split the Reported and Desired states into two different sort keys to allow for more data and better querying and possibly performance.

Loggers

There is a pluggable logger architecture to allow for multiple loggers to participate in report diff or desired acknowledges/diffs. This allows for e.g. output the changes or to store added/changed values in Amazon Aurora DSQL, Time-Stream or similar storage. Loggers may interact with "plain" elements such as simple string or the "managed" (those who implements the model.ValueAndTimestamp interface).

Loggers ar very easy to create since they rely on two functions only to allow for add, remove, changed, and not changed. Thus it is possible to check what has not changed as well!

Logger Interface
type MergeLogger interface {
  Managed(
    path string,
    operation MergeOperation, // (1)
    oldValue, newValue ValueAndTimestamp,
    oldTimeStamp, newTimeStamp time.Time)

  Plain(path string, operation MergeOperation, oldValue, newValue any) // (2)
}
  1. The MergeOperation specifies if it is an add, remove, change or not changed operation.

  2. The Plain method is used for plain values that does not implement the ValueAndTimestamp interface such as a string.

Notifications

When a shadow is updated, a notification can be sent to listeners. This is done by the notification implementation.

Each target registration specifies what type of plugin (e.g. SQS), attributes such as the queue name, topic name, etc.

In addition the attributes specifies what type of events to listen for: * Report, Desired or Both * Regexp for PK and SK combined with a'#' separator. * Old, New, Diff (or any combination of these)

The registrations are stored as JSON with the event lambda itself (for dynamodb stream).

Client SDK

Deviations

There are many deviations from the IoT Core Device Shadow. One of the most prominent is the notion of the device shadow MODEL in go struct instead of plain JSON. This allows for a more type-safe way of handling the device shadow.

In this implementation, it is possible to control how the merge is done i.e. if server is master or client is master where the latter allows for client to delete entries that are not present in the client model. The former do not allow for deletion of entries, instead it only supports addition, updates and no changes.

Timestamps

The timestamps on the items in the device shadow is completely different than for the IoT Core Device Shadow. The timestamps a RFC3339 timestamp (but since it uses the interface, they may be anything). The RFC3339 timestamp may be used when the tz may differ between the different items.

The value and timestamp is clumped together and is accessed via ValueAndTimestamp interface. The underlying struct may be anything. Each item that you want to make the client handle timestamps for must implement this interface.

Example Model
type SensorValue struct {
  ValueAndTimestamp
  Timestamp time.Time `json:"timestamp"` // (1)
  Value any `json:"value"` // (2)
}

type Building struct {
  Controller Controller `json:"controller"`
}

type Controller struct {
  ID string `json:"id"`
  Serial string `json:"serial"`
  Brand string `json:"brand,omitempty"`
  Circuits map[int]Circuit `json:"circuits,omitempty"`
}

type Circuit struct {
   Senors map[string]SensorValue `json:"sensors,omitempty"` // (3)
}
  1. This is the timestamp that the sensor value was read for this example, it is possible to have many different types as long as it implements the ValueAndTimestamp interface.

  2. The value may be anything. If it is a map[string]any, it will compare each entry in the map to determine if it has changed or not. In that way it is possible to present a set of values that this sensor value represents.

  3. Here all sensor values are stored as a map with the sensor name as the key and the value as the value. The value is a struct that implements the ValueAndTimestamp interface.

Creating or Updating the Device Shadow

When writing to the device shadow, for example Report, the SDK will read the whole document and marshal it to the registered model. For example Building it will iterate all the fields and check if they implement the ValueAndTimestamp interface. If they do, it will use it to check if the client model is newer than the device shadow model. If it is, the client model value will be kept, if older, the device shadow model value will be copied to the client model.

If any field is missing in the client model but present in the shadow model, it will be added to the client model. If any field is present in the client model but not in the shadow model, it will be kept (se Deleting an Element for the options).

When done it will write the loaded it back conditionally on version and increment the version (atomically). This is done with an updated timestamp of time.Now.UTC().UnixNano(). If the client supplied a ClientToken string, it will be added to the shadow as well.

On conflict, the client will read the shadow again and redo the merge and write it back again. After n times it will give up and return an conflict error.

Deleting an Element

When iterating merging the structures there are two modes: ClientIsMaster and ServerIsMaster.

When ClientIsMaster it will just check elements that are timestamped and exists on both models. If the server model value is newer, the value will be copied to the client model. Otherwise the client model will be kept as is.

If the ServerIsMaster mode it will not allow the client to delete any property only, add, update or keep values are possible.

In both modes, all values that do not implement ValueAndTimestamp are just used as is on the client model to write the device shadow (i.e. always overwritten without any timestamp handling).

When ServerIsMaster it is not possible to delete elements only add and updates are possible from the client model.

Desired State

This is to denote the desired state and when the client wants to report a state it may also include that the SDK shall load the desired state and clear it when the desired state value are the same as reported.

In this case it will need to do this in a transaction since it is two different sort keys. For example in DynamoDB this is done using the transaction API.

Development

Submodules

When a plugin needs to have a external dependency it is REQUIRED that it will be it’s own module in order to have the core framework free from other dependencies that the go framework and the test framework.

Add the Makefile to do versioning see Makefile and copy the version target to allow for versioning of the plugin. Add the module in this readme under the Modules section so it is clear that this is a submodule that may be referenced in a external project (or this).

About

Model persister that can emulate a device shadow. Pluggable storage, diff/merge, loggers, advanced filtering/notification mechanism with implementations in memory/DynamoDB/streams.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published