Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Fix OPC-UA adapter #2114

Merged
merged 15 commits into from
Apr 1, 2024
34 changes: 34 additions & 0 deletions internal/groups/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ const (
groupList = groupPrefix + "list"
groupListMemberships = groupPrefix + "list_by_user"
groupRemove = groupPrefix + "remove"
groupAssign = groupPrefix + "assign"
groupUnassign = groupPrefix + "unassign"
)

var (
_ events.Event = (*assignEvent)(nil)
_ events.Event = (*unassignEvent)(nil)
_ events.Event = (*createGroupEvent)(nil)
_ events.Event = (*updateGroupEvent)(nil)
_ events.Event = (*changeStatusGroupEvent)(nil)
Expand All @@ -34,6 +38,36 @@ var (
_ events.Event = (*listGroupMembershipEvent)(nil)
)

type assignEvent struct {
memberIDs []string
groupID string
}

func (cge assignEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupAssign,
"member_ids": cge.memberIDs,
"group_id": cge.groupID,
}

return val, nil
}

type unassignEvent struct {
memberIDs []string
groupID string
}

func (cge unassignEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupUnassign,
"member_ids": cge.memberIDs,
"group_id": cge.groupID,
}

return val, nil
}

type createGroupEvent struct {
groups.Group
}
Expand Down
27 changes: 26 additions & 1 deletion internal/groups/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,35 @@ func (es eventStore) EnableGroup(ctx context.Context, token, id string) (groups.
}

func (es eventStore) Assign(ctx context.Context, token, groupID, relation, memberKind string, memberIDs ...string) error {
return es.svc.Assign(ctx, token, groupID, relation, memberKind, memberIDs...)
if err := es.svc.Assign(ctx, token, groupID, relation, memberKind, memberIDs...); err != nil {
return err
}

event := assignEvent{
groupID: groupID,
memberIDs: memberIDs,
}

if err := es.Publish(ctx, event); err != nil {
return err
}

return nil
}

func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, memberKind string, memberIDs ...string) error {
if err := es.svc.Unassign(ctx, token, groupID, relation, memberKind, memberIDs...); err != nil {
return err
}

event := unassignEvent{
groupID: groupID,
memberIDs: memberIDs,
}

if err := es.Publish(ctx, event); err != nil {
return err
}
return es.svc.Unassign(ctx, token, groupID, relation, memberKind, memberIDs...)
}

Expand Down
117 changes: 85 additions & 32 deletions opcua/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package opcua

import (
"context"
"encoding/base64"
"fmt"
"log/slog"
"regexp"
"strconv"

"github.com/absmach/magistrala/opcua/db"
)
Expand All @@ -33,27 +36,30 @@ type Service interface {
RemoveChannel(ctx context.Context, chanID string) error

// ConnectThing creates thingID:channelID route-map
ConnectThing(ctx context.Context, chanID, thingID string) error
ConnectThing(ctx context.Context, chanID string, thingIDs []string) error

// DisconnectThing removes thingID:channelID route-map
DisconnectThing(ctx context.Context, chanID, thingID string) error
DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error

// Browse browses available nodes for a given OPC-UA Server URI and NodeID
Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error)
Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error)
}

// Config OPC-UA Server.
type Config struct {
ServerURI string
NodeID string
Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"`
Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""`
Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""`
CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""`
KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""`
Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"`
Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""`
Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""`
CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""`
KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""`
}

var _ Service = (*adapterService)(nil)
var (
_ Service = (*adapterService)(nil)
guidRegex = regexp.MustCompile(`^\{?[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\}?$`)
)

type adapterService struct {
subscriber Subscriber
Expand Down Expand Up @@ -102,46 +108,93 @@ func (as *adapterService) RemoveChannel(ctx context.Context, chanID string) erro
return as.channelsRM.Remove(ctx, chanID)
}

func (as *adapterService) ConnectThing(ctx context.Context, chanID, thingID string) error {
func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thingIDs []string) error {
serverURI, err := as.channelsRM.Get(ctx, chanID)
if err != nil {
return err
}

nodeID, err := as.thingsRM.Get(ctx, thingID)
if err != nil {
return err
}
for _, thingID := range thingIDs {
nodeID, err := as.thingsRM.Get(ctx, thingID)
if err != nil {
return err
}

as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI
as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI

c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Save(ctx, c, c); err != nil {
return err
}
c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Save(ctx, c, c); err != nil {
return err
}

go func() {
if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil {
as.logger.Warn("subscription failed", slog.Any("error", err))
}
}()

go func() {
if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil {
as.logger.Warn(fmt.Sprintf("subscription failed: %s", err))
// Store subscription details
if err := db.Save(serverURI, nodeID); err != nil {
return err
}
}()
}

// Store subscription details
return db.Save(serverURI, nodeID)
return nil
}

func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error) {
nodeID := fmt.Sprintf("%s;%s", namespace, identifier)

func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) {
idFormat := "s"
switch identifierType {
case "string":
break
case "numeric":
if _, err := strconv.Atoi(identifier); err != nil {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.Any("error", err),
}
as.logger.Warn("failed to parse numeric identifier", args...)
break
}
idFormat = "i"
case "guid":
if !guidRegex.MatchString(identifier) {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
}
as.logger.Warn("GUID identifier has invalid format", args...)
break
}
idFormat = "g"
case "opaque":
if _, err := base64.StdEncoding.DecodeString(identifier); err != nil {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.Any("error", err),
}
as.logger.Warn("opaque identifier has invalid base64 format", args...)
break
}
idFormat = "b"
}
nodeID := fmt.Sprintf("ns=%s;%s=%s", namespace, idFormat, identifier)
nodes, err := as.browser.Browse(serverURI, nodeID)
if err != nil {
return nil, err
}
return nodes, nil
}

func (as *adapterService) DisconnectThing(ctx context.Context, chanID, thingID string) error {
c := fmt.Sprintf("%s:%s", chanID, thingID)
return as.connectRM.Remove(ctx, c)
func (as *adapterService) DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error {
for _, thingID := range thingIDs {
c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Remove(ctx, c); err != nil {
return err
}
}
return nil
}
2 changes: 1 addition & 1 deletion opcua/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func browseEndpoint(svc opcua.Service) endpoint.Endpoint {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}

nodes, err := svc.Browse(ctx, req.ServerURI, req.Namespace, req.Identifier)
nodes, err := svc.Browse(ctx, req.ServerURI, req.Namespace, req.Identifier, req.IdentifierType)
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions opcua/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ func (lm loggingMiddleware) RemoveChannel(ctx context.Context, mgxChanID string)
return lm.svc.RemoveChannel(ctx, mgxChanID)
}

func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThingID string) (err error) {
func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", mgxChanID),
slog.String("thing_id", mgxThingID),
slog.Any("thing_ids", mgxThingIDs),
}
if err != nil {
args = append(args, slog.Any("error", err))
Expand All @@ -149,15 +149,15 @@ func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThin
lm.logger.Info("Connect thing to channel completed successfully", args...)
}(time.Now())

return lm.svc.ConnectThing(ctx, mgxChanID, mgxThingID)
return lm.svc.ConnectThing(ctx, mgxChanID, mgxThingIDs)
}

func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxThingID string) (err error) {
func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", mgxChanID),
slog.String("thing_id", mgxThingID),
slog.Any("thing_ids", mgxThingIDs),
}
if err != nil {
args = append(args, slog.Any("error", err))
Expand All @@ -167,16 +167,17 @@ func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxT
lm.logger.Info("Disconnect thing from channel completed successfully", args...)
}(time.Now())

return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID)
return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingIDs)
}

func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier string) (nodes []opcua.BrowsedNode, err error) {
func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) (nodes []opcua.BrowsedNode, err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("server_uri", serverURI),
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.String("identifier_type", identifierType),
}
if err != nil {
args = append(args, slog.Any("error", err))
Expand All @@ -186,5 +187,5 @@ func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, id
lm.logger.Info("Browse available nodes completed successfully", args...)
}(time.Now())

return lm.svc.Browse(ctx, serverURI, namespace, identifier)
return lm.svc.Browse(ctx, serverURI, namespace, identifier, identifierType)
}
12 changes: 6 additions & 6 deletions opcua/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,29 @@ func (mm *metricsMiddleware) RemoveChannel(ctx context.Context, mgxChanID string
return mm.svc.RemoveChannel(ctx, mgxChanID)
}

func (mm *metricsMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThingID string) error {
func (mm *metricsMiddleware) ConnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) error {
defer func(begin time.Time) {
mm.counter.With("method", "connect_thing").Add(1)
mm.latency.With("method", "connect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.ConnectThing(ctx, mgxChanID, mgxThingID)
return mm.svc.ConnectThing(ctx, mgxChanID, mgxThingIDs)
}

func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxThingID string) error {
func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) error {
defer func(begin time.Time) {
mm.counter.With("method", "disconnect_thing").Add(1)
mm.latency.With("method", "disconnect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID)
return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingIDs)
}

func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]opcua.BrowsedNode, error) {
func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]opcua.BrowsedNode, error) {
defer func(begin time.Time) {
mm.counter.With("method", "browse").Add(1)
mm.latency.With("method", "browse").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.Browse(ctx, serverURI, namespace, identifier)
return mm.svc.Browse(ctx, serverURI, namespace, identifier, identifierType)
}
7 changes: 4 additions & 3 deletions opcua/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package api
import "github.com/absmach/magistrala/internal/apiutil"

type browseReq struct {
ServerURI string
Namespace string
Identifier string
ServerURI string
Namespace string
Identifier string
IdentifierType string
}

func (req *browseReq) validate() error {
Expand Down
Loading
Loading