Skip to content

Commit

Permalink
NOISSUE - Fix OPC-UA adapter (absmach#2114)
Browse files Browse the repository at this point in the history
Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>
Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
  • Loading branch information
WashingtonKK authored and nyagamunene committed Apr 5, 2024
1 parent 48d8fd4 commit 11c908c
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 85 deletions.
34 changes: 34 additions & 0 deletions internal/groups/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ const (
groupList = groupPrefix + "list"
groupListMemberships = groupPrefix + "list_by_user"
groupRemove = groupPrefix + "remove"
groupAssign = groupPrefix + "assign"
groupUnassign = groupPrefix + "unassign"
)

var (
_ events.Event = (*assignEvent)(nil)
_ events.Event = (*unassignEvent)(nil)
_ events.Event = (*createGroupEvent)(nil)
_ events.Event = (*updateGroupEvent)(nil)
_ events.Event = (*changeStatusGroupEvent)(nil)
Expand All @@ -34,6 +38,36 @@ var (
_ events.Event = (*listGroupMembershipEvent)(nil)
)

type assignEvent struct {
memberIDs []string
groupID string
}

func (cge assignEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupAssign,
"member_ids": cge.memberIDs,
"group_id": cge.groupID,
}

return val, nil
}

type unassignEvent struct {
memberIDs []string
groupID string
}

func (cge unassignEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupUnassign,
"member_ids": cge.memberIDs,
"group_id": cge.groupID,
}

return val, nil
}

type createGroupEvent struct {
groups.Group
}
Expand Down
27 changes: 26 additions & 1 deletion internal/groups/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,35 @@ func (es eventStore) EnableGroup(ctx context.Context, token, id string) (groups.
}

func (es eventStore) Assign(ctx context.Context, token, groupID, relation, memberKind string, memberIDs ...string) error {
return es.svc.Assign(ctx, token, groupID, relation, memberKind, memberIDs...)
if err := es.svc.Assign(ctx, token, groupID, relation, memberKind, memberIDs...); err != nil {
return err
}

event := assignEvent{
groupID: groupID,
memberIDs: memberIDs,
}

if err := es.Publish(ctx, event); err != nil {
return err
}

return nil
}

func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, memberKind string, memberIDs ...string) error {
if err := es.svc.Unassign(ctx, token, groupID, relation, memberKind, memberIDs...); err != nil {
return err
}

event := unassignEvent{
groupID: groupID,
memberIDs: memberIDs,
}

if err := es.Publish(ctx, event); err != nil {
return err
}
return es.svc.Unassign(ctx, token, groupID, relation, memberKind, memberIDs...)
}

Expand Down
117 changes: 85 additions & 32 deletions opcua/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package opcua

import (
"context"
"encoding/base64"
"fmt"
"log/slog"
"regexp"
"strconv"

"github.com/absmach/magistrala/opcua/db"
)
Expand All @@ -33,27 +36,30 @@ type Service interface {
RemoveChannel(ctx context.Context, chanID string) error

// ConnectThing creates thingID:channelID route-map
ConnectThing(ctx context.Context, chanID, thingID string) error
ConnectThing(ctx context.Context, chanID string, thingIDs []string) error

// DisconnectThing removes thingID:channelID route-map
DisconnectThing(ctx context.Context, chanID, thingID string) error
DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error

// Browse browses available nodes for a given OPC-UA Server URI and NodeID
Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error)
Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error)
}

// Config OPC-UA Server.
type Config struct {
ServerURI string
NodeID string
Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"`
Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""`
Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""`
CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""`
KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""`
Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"`
Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""`
Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""`
CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""`
KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""`
}

var _ Service = (*adapterService)(nil)
var (
_ Service = (*adapterService)(nil)
guidRegex = regexp.MustCompile(`^\{?[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\}?$`)
)

type adapterService struct {
subscriber Subscriber
Expand Down Expand Up @@ -102,46 +108,93 @@ func (as *adapterService) RemoveChannel(ctx context.Context, chanID string) erro
return as.channelsRM.Remove(ctx, chanID)
}

func (as *adapterService) ConnectThing(ctx context.Context, chanID, thingID string) error {
func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thingIDs []string) error {
serverURI, err := as.channelsRM.Get(ctx, chanID)
if err != nil {
return err
}

nodeID, err := as.thingsRM.Get(ctx, thingID)
if err != nil {
return err
}
for _, thingID := range thingIDs {
nodeID, err := as.thingsRM.Get(ctx, thingID)
if err != nil {
return err
}

as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI
as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI

c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Save(ctx, c, c); err != nil {
return err
}
c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Save(ctx, c, c); err != nil {
return err
}

go func() {
if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil {
as.logger.Warn("subscription failed", slog.Any("error", err))
}
}()

go func() {
if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil {
as.logger.Warn(fmt.Sprintf("subscription failed: %s", err))
// Store subscription details
if err := db.Save(serverURI, nodeID); err != nil {
return err
}
}()
}

// Store subscription details
return db.Save(serverURI, nodeID)
return nil
}

func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error) {
nodeID := fmt.Sprintf("%s;%s", namespace, identifier)

func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) {
idFormat := "s"
switch identifierType {
case "string":
break
case "numeric":
if _, err := strconv.Atoi(identifier); err != nil {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.Any("error", err),
}
as.logger.Warn("failed to parse numeric identifier", args...)
break
}
idFormat = "i"
case "guid":
if !guidRegex.MatchString(identifier) {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
}
as.logger.Warn("GUID identifier has invalid format", args...)
break
}
idFormat = "g"
case "opaque":
if _, err := base64.StdEncoding.DecodeString(identifier); err != nil {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.Any("error", err),
}
as.logger.Warn("opaque identifier has invalid base64 format", args...)
break
}
idFormat = "b"
}
nodeID := fmt.Sprintf("ns=%s;%s=%s", namespace, idFormat, identifier)
nodes, err := as.browser.Browse(serverURI, nodeID)
if err != nil {
return nil, err
}
return nodes, nil
}

func (as *adapterService) DisconnectThing(ctx context.Context, chanID, thingID string) error {
c := fmt.Sprintf("%s:%s", chanID, thingID)
return as.connectRM.Remove(ctx, c)
func (as *adapterService) DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error {
for _, thingID := range thingIDs {
c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Remove(ctx, c); err != nil {
return err
}
}
return nil
}
2 changes: 1 addition & 1 deletion opcua/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func browseEndpoint(svc opcua.Service) endpoint.Endpoint {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}

nodes, err := svc.Browse(ctx, req.ServerURI, req.Namespace, req.Identifier)
nodes, err := svc.Browse(ctx, req.ServerURI, req.Namespace, req.Identifier, req.IdentifierType)
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions opcua/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ func (lm loggingMiddleware) RemoveChannel(ctx context.Context, mgxChanID string)
return lm.svc.RemoveChannel(ctx, mgxChanID)
}

func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThingID string) (err error) {
func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", mgxChanID),
slog.String("thing_id", mgxThingID),
slog.Any("thing_ids", mgxThingIDs),
}
if err != nil {
args = append(args, slog.Any("error", err))
Expand All @@ -149,15 +149,15 @@ func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThin
lm.logger.Info("Connect thing to channel completed successfully", args...)
}(time.Now())

return lm.svc.ConnectThing(ctx, mgxChanID, mgxThingID)
return lm.svc.ConnectThing(ctx, mgxChanID, mgxThingIDs)
}

func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxThingID string) (err error) {
func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", mgxChanID),
slog.String("thing_id", mgxThingID),
slog.Any("thing_ids", mgxThingIDs),
}
if err != nil {
args = append(args, slog.Any("error", err))
Expand All @@ -167,16 +167,17 @@ func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxT
lm.logger.Info("Disconnect thing from channel completed successfully", args...)
}(time.Now())

return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID)
return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingIDs)
}

func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier string) (nodes []opcua.BrowsedNode, err error) {
func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) (nodes []opcua.BrowsedNode, err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("server_uri", serverURI),
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.String("identifier_type", identifierType),
}
if err != nil {
args = append(args, slog.Any("error", err))
Expand All @@ -186,5 +187,5 @@ func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, id
lm.logger.Info("Browse available nodes completed successfully", args...)
}(time.Now())

return lm.svc.Browse(ctx, serverURI, namespace, identifier)
return lm.svc.Browse(ctx, serverURI, namespace, identifier, identifierType)
}
12 changes: 6 additions & 6 deletions opcua/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,29 @@ func (mm *metricsMiddleware) RemoveChannel(ctx context.Context, mgxChanID string
return mm.svc.RemoveChannel(ctx, mgxChanID)
}

func (mm *metricsMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThingID string) error {
func (mm *metricsMiddleware) ConnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) error {
defer func(begin time.Time) {
mm.counter.With("method", "connect_thing").Add(1)
mm.latency.With("method", "connect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.ConnectThing(ctx, mgxChanID, mgxThingID)
return mm.svc.ConnectThing(ctx, mgxChanID, mgxThingIDs)
}

func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxThingID string) error {
func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) error {
defer func(begin time.Time) {
mm.counter.With("method", "disconnect_thing").Add(1)
mm.latency.With("method", "disconnect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID)
return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingIDs)
}

func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]opcua.BrowsedNode, error) {
func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]opcua.BrowsedNode, error) {
defer func(begin time.Time) {
mm.counter.With("method", "browse").Add(1)
mm.latency.With("method", "browse").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.Browse(ctx, serverURI, namespace, identifier)
return mm.svc.Browse(ctx, serverURI, namespace, identifier, identifierType)
}
7 changes: 4 additions & 3 deletions opcua/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package api
import "github.com/absmach/magistrala/internal/apiutil"

type browseReq struct {
ServerURI string
Namespace string
Identifier string
ServerURI string
Namespace string
Identifier string
IdentifierType string
}

func (req *browseReq) validate() error {
Expand Down
Loading

0 comments on commit 11c908c

Please sign in to comment.