From 156ca1d1e2b82a1b9a1af937fdaa0dd47c73f2b9 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Thu, 14 Mar 2024 16:19:24 +0300 Subject: [PATCH 01/15] Fix opcua adapter Signed-off-by: WashingtonKK --- internal/groups/events/events.go | 38 +++++++++++++++++++++++++ internal/groups/events/streams.go | 32 ++++++++++++++++++++- opcua/adapter.go | 2 +- opcua/events/streams.go | 47 ++++++++++++++++++------------- 4 files changed, 98 insertions(+), 21 deletions(-) diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index 880c7eddf3..8b823bf0f5 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -21,9 +21,11 @@ const ( groupList = groupPrefix + "list" groupListMemberships = groupPrefix + "list_by_user" groupRemove = groupPrefix + "remove" + groupAssign = groupPrefix + "assign" ) var ( + _ events.Event = (*assignEvent)(nil) _ events.Event = (*createGroupEvent)(nil) _ events.Event = (*updateGroupEvent)(nil) _ events.Event = (*changeStatusGroupEvent)(nil) @@ -34,6 +36,42 @@ var ( _ events.Event = (*listGroupMembershipEvent)(nil) ) +type assignEvent struct { + operation string + memberID []string + groupID string + createdAt time.Time +} + +func (cge assignEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": groupAssign, + "member_id": cge.memberID, + "group_id": cge.groupID, + "created_at": cge.createdAt, + } + + return val, nil +} + +type unassignEvent struct { + operation string + memberID []string + groupID string + createdAt time.Time +} + +func (cge unassignEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": groupAssign, + "member_id": cge.memberID, + "group_id": cge.groupID, + "created_at": cge.createdAt, + } + + return val, nil +} + type createGroupEvent struct { groups.Group } diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index 0f5bc23110..253574b772 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -5,6 +5,7 @@ package events import ( "context" + "time" "github.com/absmach/magistrala/pkg/events" "github.com/absmach/magistrala/pkg/events/store" @@ -140,10 +141,39 @@ func (es eventStore) EnableGroup(ctx context.Context, token, id string) (groups. } func (es eventStore) Assign(ctx context.Context, token, groupID, relation, memberKind string, memberIDs ...string) error { - return es.svc.Assign(ctx, token, groupID, relation, memberKind, memberIDs...) + if err := es.svc.Assign(ctx, token, groupID, relation, memberKind, memberIDs...); err != nil { + return err + } + + event := assignEvent{ + operation: "group.assign", + groupID: groupID, + memberID: memberIDs, + createdAt: time.Now(), + } + + if err := es.Publish(ctx, event); err != nil { + return err + } + + return nil } func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, memberKind string, memberIDs ...string) error { + if err := es.svc.Unassign(ctx, token, groupID, relation, memberKind, memberIDs...); err != nil { + return err + } + + event := unassignEvent{ + operation: "group.unassign", + groupID: groupID, + memberID: memberIDs, + createdAt: time.Now(), + } + + if err := es.Publish(ctx, event); err != nil { + return err + } return es.svc.Unassign(ctx, token, groupID, relation, memberKind, memberIDs...) } diff --git a/opcua/adapter.go b/opcua/adapter.go index b07801dfbd..53e205e1f4 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -132,7 +132,7 @@ func (as *adapterService) ConnectThing(ctx context.Context, chanID, thingID stri } func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error) { - nodeID := fmt.Sprintf("%s;%s", namespace, identifier) + nodeID := fmt.Sprintf("ns=%s;s=%s", namespace, identifier) nodes, err := as.browser.Browse(serverURI, nodeID) if err != nil { diff --git a/opcua/events/streams.go b/opcua/events/streams.go index 91d1750546..e381ba78cb 100644 --- a/opcua/events/streams.go +++ b/opcua/events/streams.go @@ -5,6 +5,7 @@ package events import ( "context" + "encoding/base64" "encoding/json" "errors" @@ -13,21 +14,21 @@ import ( ) const ( - keyType = "opcua" - keyNodeID = "node_id" - keyServerURI = "server_uri" + keyType = "opcua" + keyNodeID = "node_id" + keyServerURI = "server_uri" + channelPrefix = "group." + thingPrefix = "thing." - thingPrefix = "thing." - thingCreate = thingPrefix + "create" - thingUpdate = thingPrefix + "update" - thingRemove = thingPrefix + "remove" - thingConnect = thingPrefix + "connect" - thingDisconnect = thingPrefix + "disconnect" + thingCreate = thingPrefix + "create" + thingUpdate = thingPrefix + "update" + thingRemove = thingPrefix + "remove" - channelPrefix = "group." - channelCreate = channelPrefix + "create" - channelUpdate = channelPrefix + "update" - channelRemove = channelPrefix + "remove" + channelCreate = channelPrefix + "create" + channelUpdate = channelPrefix + "update" + channelRemove = channelPrefix + "remove" + channelConnect = channelPrefix + "assign" + channelDisconnect = channelPrefix + "unassign" ) var ( @@ -92,10 +93,10 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { case channelRemove: rce := decodeRemoveChannel(msg) err = es.svc.RemoveChannel(ctx, rce.id) - case thingConnect: + case channelConnect: rce := decodeConnectThing(msg) err = es.svc.ConnectThing(ctx, rce.chanID, rce.thingID) - case thingDisconnect: + case channelDisconnect: rce := decodeDisconnectThing(msg) err = es.svc.DisconnectThing(ctx, rce.chanID, rce.thingID) } @@ -108,8 +109,12 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { func decodeCreateThing(event map[string]interface{}) (createThingEvent, error) { strmeta := read(event, "metadata", "{}") + meta, err := base64.StdEncoding.DecodeString(strmeta) + if err != nil { + return createThingEvent{}, err + } var metadata map[string]interface{} - if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil { + if err := json.Unmarshal(meta, &metadata); err != nil { return createThingEvent{}, err } @@ -144,8 +149,12 @@ func decodeRemoveThing(event map[string]interface{}) removeThingEvent { func decodeCreateChannel(event map[string]interface{}) (createChannelEvent, error) { strmeta := read(event, "metadata", "{}") + meta, err := base64.StdEncoding.DecodeString(strmeta) + if err != nil { + return createChannelEvent{}, err + } var metadata map[string]interface{} - if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil { + if err := json.Unmarshal([]byte(meta), &metadata); err != nil { return createChannelEvent{}, err } @@ -180,8 +189,8 @@ func decodeRemoveChannel(event map[string]interface{}) removeChannelEvent { func decodeConnectThing(event map[string]interface{}) connectThingEvent { return connectThingEvent{ - chanID: read(event, "chan_id", ""), - thingID: read(event, "thing_id", ""), + chanID: read(event, "group_id", ""), + thingID: read(event, "member_id", ""), } } From 60e66bf9c8226b160a374b8eb9d3ec7f8faae6b0 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Thu, 14 Mar 2024 16:48:58 +0300 Subject: [PATCH 02/15] Add compile time check: Signed-off-by: WashingtonKK --- internal/groups/events/events.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index 8b823bf0f5..d30b462c66 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -26,6 +26,7 @@ const ( var ( _ events.Event = (*assignEvent)(nil) + _ events.Event = (*unassignEvent)(nil) _ events.Event = (*createGroupEvent)(nil) _ events.Event = (*updateGroupEvent)(nil) _ events.Event = (*changeStatusGroupEvent)(nil) From e89fb6ef5d8d08a39fe2e2e23daaa6ad6224e916 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Thu, 14 Mar 2024 16:58:29 +0300 Subject: [PATCH 03/15] Send single member id Signed-off-by: WashingtonKK --- internal/groups/events/events.go | 4 ++-- internal/groups/events/streams.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index d30b462c66..97dc52b01a 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -39,7 +39,7 @@ var ( type assignEvent struct { operation string - memberID []string + memberID string groupID string createdAt time.Time } @@ -57,7 +57,7 @@ func (cge assignEvent) Encode() (map[string]interface{}, error) { type unassignEvent struct { operation string - memberID []string + memberID string groupID string createdAt time.Time } diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index 253574b772..1834a6ce10 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -148,7 +148,7 @@ func (es eventStore) Assign(ctx context.Context, token, groupID, relation, membe event := assignEvent{ operation: "group.assign", groupID: groupID, - memberID: memberIDs, + memberID: memberIDs[0], createdAt: time.Now(), } @@ -167,7 +167,7 @@ func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, mem event := unassignEvent{ operation: "group.unassign", groupID: groupID, - memberID: memberIDs, + memberID: memberIDs[0], createdAt: time.Now(), } From 2d87f185e9b70735e1a5c23f4c8ee00ab6511da7 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Thu, 14 Mar 2024 19:05:04 +0300 Subject: [PATCH 04/15] feat: accept multiple nodeID formats via env Signed-off-by: WashingtonKK --- docker/.env | 1 + .../addons/opcua-adapter/docker-compose.yml | 1 + opcua/adapter.go | 37 +++++++++++++++---- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/docker/.env b/docker/.env index 2161902502..19958d2459 100644 --- a/docker/.env +++ b/docker/.env @@ -413,6 +413,7 @@ MG_OPCUA_ADAPTER_HTTP_SERVER_CERT= MG_OPCUA_ADAPTER_HTTP_SERVER_KEY= MG_OPCUA_ADAPTER_ROUTE_MAP_URL=redis://opcua-redis:${MG_REDIS_TCP_PORT}/0 MG_OPCUA_ADAPTER_INSTANCE_ID= +MG_OPCUA_ADAPTER_NODE_ID_FORMAT=string ### Cassandra MG_CASSANDRA_CLUSTER=magistrala-cassandra diff --git a/docker/addons/opcua-adapter/docker-compose.yml b/docker/addons/opcua-adapter/docker-compose.yml index a2088c8c25..02c1abdc2b 100644 --- a/docker/addons/opcua-adapter/docker-compose.yml +++ b/docker/addons/opcua-adapter/docker-compose.yml @@ -43,6 +43,7 @@ services: MG_JAEGER_TRACE_RATIO: ${MG_JAEGER_TRACE_RATIO} MG_SEND_TELEMETRY: ${MG_SEND_TELEMETRY} MG_OPCUA_ADAPTER_INSTANCE_ID: ${MG_OPCUA_ADAPTER_INSTANCE_ID} + MG_OPCUA_ADAPTER_NODE_ID_FORMAT: ${MG_OPCUA_ADAPTER_NODE_ID_FORMAT} ports: - ${MG_OPCUA_ADAPTER_HTTP_PORT}:${MG_OPCUA_ADAPTER_HTTP_PORT} networks: diff --git a/opcua/adapter.go b/opcua/adapter.go index 53e205e1f4..10ad62cd06 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "log/slog" + "strconv" "github.com/absmach/magistrala/opcua/db" ) @@ -44,13 +45,14 @@ type Service interface { // Config OPC-UA Server. type Config struct { - ServerURI string - NodeID string - Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"` - Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""` - Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""` - CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""` - KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""` + ServerURI string + NodeID string + Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"` + Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""` + Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""` + CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""` + KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""` + NodeIDFormat string `env:"MG_OPCUA_ADAPTER_NODE_ID_FORMAT" envDefault:"string"` } var _ Service = (*adapterService)(nil) @@ -132,7 +134,26 @@ func (as *adapterService) ConnectThing(ctx context.Context, chanID, thingID stri } func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error) { - nodeID := fmt.Sprintf("ns=%s;s=%s", namespace, identifier) + var nodeID string + + switch as.cfg.NodeIDFormat { + case "string": + nodeID = fmt.Sprintf("ns=%s;s=%s", namespace, identifier) + case "numeric": + numericIdentifier, err := strconv.Atoi(identifier) // Convert identifier to int + if err != nil { + nodeID = fmt.Sprintf("ns=%s;s=%s", namespace, identifier) + as.logger.Warn(fmt.Sprintf("failed to parse numeric nodeID format: %s, defaulting to string", err)) + break + } + nodeID = fmt.Sprintf("ns=%s;i=%d", namespace, numericIdentifier) + case "guid": + nodeID = fmt.Sprintf("ns=%s;g=%s", namespace, identifier) + case "opaque": + nodeID = fmt.Sprintf("ns=%s;b=%s", namespace, identifier) + default: + nodeID = fmt.Sprintf("ns=%s;s=%s", namespace, identifier) + } nodes, err := as.browser.Browse(serverURI, nodeID) if err != nil { From bdc7b7071d86d1ee61b2eaab85d52b311d7bef07 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Thu, 14 Mar 2024 19:10:24 +0300 Subject: [PATCH 05/15] Fix ci Signed-off-by: WashingtonKK --- opcua/adapter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opcua/adapter.go b/opcua/adapter.go index 10ad62cd06..c2cd213b34 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -52,7 +52,7 @@ type Config struct { Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""` CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""` KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""` - NodeIDFormat string `env:"MG_OPCUA_ADAPTER_NODE_ID_FORMAT" envDefault:"string"` + NodeIDFormat string `env:"MG_OPCUA_ADAPTER_NODE_ID_FORMAT" envDefault:"string"` } var _ Service = (*adapterService)(nil) From 8dbc8bcc7eef183f7322f465d56fee426e24f47f Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Thu, 14 Mar 2024 19:59:35 +0300 Subject: [PATCH 06/15] Specify identifier type in browse request Signed-off-by: WashingtonKK --- docker/.env | 1 - .../addons/opcua-adapter/docker-compose.yml | 1 - opcua/adapter.go | 21 +++++++++---------- opcua/api/endpoint.go | 2 +- opcua/api/logging.go | 4 ++-- opcua/api/metrics.go | 4 ++-- opcua/api/requests.go | 7 ++++--- 7 files changed, 19 insertions(+), 21 deletions(-) diff --git a/docker/.env b/docker/.env index 19958d2459..2161902502 100644 --- a/docker/.env +++ b/docker/.env @@ -413,7 +413,6 @@ MG_OPCUA_ADAPTER_HTTP_SERVER_CERT= MG_OPCUA_ADAPTER_HTTP_SERVER_KEY= MG_OPCUA_ADAPTER_ROUTE_MAP_URL=redis://opcua-redis:${MG_REDIS_TCP_PORT}/0 MG_OPCUA_ADAPTER_INSTANCE_ID= -MG_OPCUA_ADAPTER_NODE_ID_FORMAT=string ### Cassandra MG_CASSANDRA_CLUSTER=magistrala-cassandra diff --git a/docker/addons/opcua-adapter/docker-compose.yml b/docker/addons/opcua-adapter/docker-compose.yml index 02c1abdc2b..a2088c8c25 100644 --- a/docker/addons/opcua-adapter/docker-compose.yml +++ b/docker/addons/opcua-adapter/docker-compose.yml @@ -43,7 +43,6 @@ services: MG_JAEGER_TRACE_RATIO: ${MG_JAEGER_TRACE_RATIO} MG_SEND_TELEMETRY: ${MG_SEND_TELEMETRY} MG_OPCUA_ADAPTER_INSTANCE_ID: ${MG_OPCUA_ADAPTER_INSTANCE_ID} - MG_OPCUA_ADAPTER_NODE_ID_FORMAT: ${MG_OPCUA_ADAPTER_NODE_ID_FORMAT} ports: - ${MG_OPCUA_ADAPTER_HTTP_PORT}:${MG_OPCUA_ADAPTER_HTTP_PORT} networks: diff --git a/opcua/adapter.go b/opcua/adapter.go index c2cd213b34..ba333c8741 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -40,19 +40,18 @@ type Service interface { DisconnectThing(ctx context.Context, chanID, thingID string) error // Browse browses available nodes for a given OPC-UA Server URI and NodeID - Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error) + Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) } // Config OPC-UA Server. type Config struct { - ServerURI string - NodeID string - Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"` - Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""` - Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""` - CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""` - KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""` - NodeIDFormat string `env:"MG_OPCUA_ADAPTER_NODE_ID_FORMAT" envDefault:"string"` + ServerURI string + NodeID string + Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"` + Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""` + Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""` + CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""` + KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""` } var _ Service = (*adapterService)(nil) @@ -133,10 +132,10 @@ func (as *adapterService) ConnectThing(ctx context.Context, chanID, thingID stri return db.Save(serverURI, nodeID) } -func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error) { +func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) { var nodeID string - switch as.cfg.NodeIDFormat { + switch identifierType { case "string": nodeID = fmt.Sprintf("ns=%s;s=%s", namespace, identifier) case "numeric": diff --git a/opcua/api/endpoint.go b/opcua/api/endpoint.go index b0dbc4d4d4..b71149ce91 100644 --- a/opcua/api/endpoint.go +++ b/opcua/api/endpoint.go @@ -20,7 +20,7 @@ func browseEndpoint(svc opcua.Service) endpoint.Endpoint { return nil, errors.Wrap(apiutil.ErrValidation, err) } - nodes, err := svc.Browse(ctx, req.ServerURI, req.Namespace, req.Identifier) + nodes, err := svc.Browse(ctx, req.ServerURI, req.Namespace, req.Identifier, req.IdentifierType) if err != nil { return nil, err } diff --git a/opcua/api/logging.go b/opcua/api/logging.go index e58e97abba..279e5b9cb8 100644 --- a/opcua/api/logging.go +++ b/opcua/api/logging.go @@ -170,7 +170,7 @@ func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxT return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID) } -func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier string) (nodes []opcua.BrowsedNode, err error) { +func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) (nodes []opcua.BrowsedNode, err error) { defer func(begin time.Time) { args := []any{ slog.String("duration", time.Since(begin).String()), @@ -186,5 +186,5 @@ func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, id lm.logger.Info("Browse available nodes completed successfully", args...) }(time.Now()) - return lm.svc.Browse(ctx, serverURI, namespace, identifier) + return lm.svc.Browse(ctx, serverURI, namespace, identifier, identifierType) } diff --git a/opcua/api/metrics.go b/opcua/api/metrics.go index 22cdb7718c..e0fa02ceca 100644 --- a/opcua/api/metrics.go +++ b/opcua/api/metrics.go @@ -102,11 +102,11 @@ func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgx return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID) } -func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]opcua.BrowsedNode, error) { +func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]opcua.BrowsedNode, error) { defer func(begin time.Time) { mm.counter.With("method", "browse").Add(1) mm.latency.With("method", "browse").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.Browse(ctx, serverURI, namespace, identifier) + return mm.svc.Browse(ctx, serverURI, namespace, identifier, identifierType) } diff --git a/opcua/api/requests.go b/opcua/api/requests.go index 7288f8a144..73197c69c8 100644 --- a/opcua/api/requests.go +++ b/opcua/api/requests.go @@ -6,9 +6,10 @@ package api import "github.com/absmach/magistrala/internal/apiutil" type browseReq struct { - ServerURI string - Namespace string - Identifier string + ServerURI string + Namespace string + Identifier string + IdentifierType string } func (req *browseReq) validate() error { From 27043bf94fff88f11e3a5f796f822cd3c7e93a8b Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Fri, 15 Mar 2024 00:05:03 +0300 Subject: [PATCH 07/15] Parse identifierType on request Signed-off-by: WashingtonKK --- opcua/api/transport.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/opcua/api/transport.go b/opcua/api/transport.go index ec9ece461b..cb24cdc844 100644 --- a/opcua/api/transport.go +++ b/opcua/api/transport.go @@ -19,12 +19,13 @@ import ( ) const ( - contentType = "application/json" - serverParam = "server" - namespaceParam = "namespace" - identifierParam = "identifier" - defNamespace = "ns=0" // Standard root namespace - defIdentifier = "i=84" // Standard root identifier + contentType = "application/json" + serverParam = "server" + namespaceParam = "namespace" + identifierParam = "identifier" + identifierTypeParam = "identifierType" + defNamespace = "ns=0" // Standard root namespace + defIdentifier = "i=84" // Standard root identifier ) // MakeHandler returns a HTTP handler for API endpoints. @@ -64,15 +65,21 @@ func decodeBrowse(_ context.Context, r *http.Request) (interface{}, error) { return nil, errors.Wrap(apiutil.ErrValidation, err) } + iType, err := apiutil.ReadStringQuery(r, identifierTypeParam, "") + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + if n == "" || i == "" { n = defNamespace i = defIdentifier } req := browseReq{ - ServerURI: s, - Namespace: n, - Identifier: i, + ServerURI: s, + Namespace: n, + Identifier: i, + IdentifierType: iType, } return req, nil From f9bb275e5fa975f60313a0e072a849305508a285 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Mon, 18 Mar 2024 12:33:36 +0300 Subject: [PATCH 08/15] Change unassign event name Signed-off-by: WashingtonKK --- internal/groups/events/events.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index 97dc52b01a..98b13be205 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -22,6 +22,7 @@ const ( groupListMemberships = groupPrefix + "list_by_user" groupRemove = groupPrefix + "remove" groupAssign = groupPrefix + "assign" + groupUnassign = groupPrefix + "unassign" ) var ( @@ -64,7 +65,7 @@ type unassignEvent struct { func (cge unassignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ - "operation": groupAssign, + "operation": groupUnassign, "member_id": cge.memberID, "group_id": cge.groupID, "created_at": cge.createdAt, From 623e15a89af3b06c78557a7ac54ed1c1c8943430 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Tue, 19 Mar 2024 13:28:08 +0300 Subject: [PATCH 09/15] Enable multiple member handling Signed-off-by: WashingtonKK --- internal/groups/events/events.go | 20 +++++------ internal/groups/events/streams.go | 7 ++-- opcua/adapter.go | 55 ++++++++++++++++++------------- opcua/api/logging.go | 12 +++---- opcua/api/metrics.go | 8 ++--- opcua/events/events.go | 4 +-- opcua/events/streams.go | 30 +++++++++++++---- 7 files changed, 79 insertions(+), 57 deletions(-) diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index 98b13be205..477a56bf99 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -40,17 +40,15 @@ var ( type assignEvent struct { operation string - memberID string + memberIDs []string groupID string - createdAt time.Time } func (cge assignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ - "operation": groupAssign, - "member_id": cge.memberID, - "group_id": cge.groupID, - "created_at": cge.createdAt, + "operation": groupAssign, + "member_id": cge.memberIDs, + "group_id": cge.groupID, } return val, nil @@ -58,17 +56,15 @@ func (cge assignEvent) Encode() (map[string]interface{}, error) { type unassignEvent struct { operation string - memberID string + memberIDs []string groupID string - createdAt time.Time } func (cge unassignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ - "operation": groupUnassign, - "member_id": cge.memberID, - "group_id": cge.groupID, - "created_at": cge.createdAt, + "operation": groupUnassign, + "member_id": cge.memberIDs, + "group_id": cge.groupID, } return val, nil diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index 1834a6ce10..aa77de1bf0 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -5,7 +5,6 @@ package events import ( "context" - "time" "github.com/absmach/magistrala/pkg/events" "github.com/absmach/magistrala/pkg/events/store" @@ -148,8 +147,7 @@ func (es eventStore) Assign(ctx context.Context, token, groupID, relation, membe event := assignEvent{ operation: "group.assign", groupID: groupID, - memberID: memberIDs[0], - createdAt: time.Now(), + memberIDs: memberIDs, } if err := es.Publish(ctx, event); err != nil { @@ -167,8 +165,7 @@ func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, mem event := unassignEvent{ operation: "group.unassign", groupID: groupID, - memberID: memberIDs[0], - createdAt: time.Now(), + memberIDs: memberIDs, } if err := es.Publish(ctx, event); err != nil { diff --git a/opcua/adapter.go b/opcua/adapter.go index ba333c8741..fc2c9a6564 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -34,10 +34,10 @@ type Service interface { RemoveChannel(ctx context.Context, chanID string) error // ConnectThing creates thingID:channelID route-map - ConnectThing(ctx context.Context, chanID, thingID string) error + ConnectThing(ctx context.Context, chanID string, thingIDs []string) error // DisconnectThing removes thingID:channelID route-map - DisconnectThing(ctx context.Context, chanID, thingID string) error + DisconnectThing(ctx context.Context, chanID string, thingID []string) error // Browse browses available nodes for a given OPC-UA Server URI and NodeID Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) @@ -103,33 +103,39 @@ func (as *adapterService) RemoveChannel(ctx context.Context, chanID string) erro return as.channelsRM.Remove(ctx, chanID) } -func (as *adapterService) ConnectThing(ctx context.Context, chanID, thingID string) error { +func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thingIDs []string) error { serverURI, err := as.channelsRM.Get(ctx, chanID) if err != nil { return err } - nodeID, err := as.thingsRM.Get(ctx, thingID) - if err != nil { - return err - } + for _, t := range thingIDs { + nodeID, err := as.thingsRM.Get(ctx, t) + if err != nil { + return err + } - as.cfg.NodeID = nodeID - as.cfg.ServerURI = serverURI + as.cfg.NodeID = nodeID + as.cfg.ServerURI = serverURI - c := fmt.Sprintf("%s:%s", chanID, thingID) - if err := as.connectRM.Save(ctx, c, c); err != nil { - return err - } + c := fmt.Sprintf("%s:%s", chanID, t) + if err := as.connectRM.Save(ctx, c, c); err != nil { + return err + } + + go func() { + if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil { + as.logger.Warn(fmt.Sprintf("subscription failed: %s", err)) + } + }() - go func() { - if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil { - as.logger.Warn(fmt.Sprintf("subscription failed: %s", err)) + // Store subscription details + if err := db.Save(serverURI, nodeID); err != nil { + return err } - }() + } - // Store subscription details - return db.Save(serverURI, nodeID) + return nil } func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) { @@ -161,7 +167,12 @@ func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, iden return nodes, nil } -func (as *adapterService) DisconnectThing(ctx context.Context, chanID, thingID string) error { - c := fmt.Sprintf("%s:%s", chanID, thingID) - return as.connectRM.Remove(ctx, c) +func (as *adapterService) DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error { + for _, t := range thingIDs { + c := fmt.Sprintf("%s:%s", chanID, t) + if err := as.connectRM.Remove(ctx, c); err != nil { + return err + } + } + return nil } diff --git a/opcua/api/logging.go b/opcua/api/logging.go index 279e5b9cb8..8a134549c8 100644 --- a/opcua/api/logging.go +++ b/opcua/api/logging.go @@ -134,12 +134,12 @@ func (lm loggingMiddleware) RemoveChannel(ctx context.Context, mgxChanID string) return lm.svc.RemoveChannel(ctx, mgxChanID) } -func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThingID string) (err error) { +func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) (err error) { defer func(begin time.Time) { args := []any{ slog.String("duration", time.Since(begin).String()), slog.String("channel_id", mgxChanID), - slog.String("thing_id", mgxThingID), + slog.Any("thing_id", mgxThingIDs), } if err != nil { args = append(args, slog.Any("error", err)) @@ -149,15 +149,15 @@ func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThin lm.logger.Info("Connect thing to channel completed successfully", args...) }(time.Now()) - return lm.svc.ConnectThing(ctx, mgxChanID, mgxThingID) + return lm.svc.ConnectThing(ctx, mgxChanID, mgxThingIDs) } -func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxThingID string) (err error) { +func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) (err error) { defer func(begin time.Time) { args := []any{ slog.String("duration", time.Since(begin).String()), slog.String("channel_id", mgxChanID), - slog.String("thing_id", mgxThingID), + slog.Any("thing_id", mgxThingIDs), } if err != nil { args = append(args, slog.Any("error", err)) @@ -167,7 +167,7 @@ func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxT lm.logger.Info("Disconnect thing from channel completed successfully", args...) }(time.Now()) - return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID) + return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingIDs) } func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) (nodes []opcua.BrowsedNode, err error) { diff --git a/opcua/api/metrics.go b/opcua/api/metrics.go index e0fa02ceca..18d072f8db 100644 --- a/opcua/api/metrics.go +++ b/opcua/api/metrics.go @@ -84,22 +84,22 @@ func (mm *metricsMiddleware) RemoveChannel(ctx context.Context, mgxChanID string return mm.svc.RemoveChannel(ctx, mgxChanID) } -func (mm *metricsMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThingID string) error { +func (mm *metricsMiddleware) ConnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) error { defer func(begin time.Time) { mm.counter.With("method", "connect_thing").Add(1) mm.latency.With("method", "connect_thing").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.ConnectThing(ctx, mgxChanID, mgxThingID) + return mm.svc.ConnectThing(ctx, mgxChanID, mgxThingIDs) } -func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxThingID string) error { +func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) error { defer func(begin time.Time) { mm.counter.With("method", "disconnect_thing").Add(1) mm.latency.With("method", "disconnect_thing").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID) + return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingIDs) } func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]opcua.BrowsedNode, error) { diff --git a/opcua/events/events.go b/opcua/events/events.go index 30cea2f4b7..d778420b53 100644 --- a/opcua/events/events.go +++ b/opcua/events/events.go @@ -13,8 +13,8 @@ type removeThingEvent struct { } type connectThingEvent struct { - chanID string - thingID string + chanID string + thingIDs []string } type createChannelEvent struct { diff --git a/opcua/events/streams.go b/opcua/events/streams.go index e381ba78cb..3488e35838 100644 --- a/opcua/events/streams.go +++ b/opcua/events/streams.go @@ -95,10 +95,10 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { err = es.svc.RemoveChannel(ctx, rce.id) case channelConnect: rce := decodeConnectThing(msg) - err = es.svc.ConnectThing(ctx, rce.chanID, rce.thingID) + err = es.svc.ConnectThing(ctx, rce.chanID, rce.thingIDs) case channelDisconnect: rce := decodeDisconnectThing(msg) - err = es.svc.DisconnectThing(ctx, rce.chanID, rce.thingID) + err = es.svc.DisconnectThing(ctx, rce.chanID, rce.thingIDs) } if err != nil && err != errMetadataType { return err @@ -109,6 +109,8 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { func decodeCreateThing(event map[string]interface{}) (createThingEvent, error) { strmeta := read(event, "metadata", "{}") + + // Metadata is base64 encoded since it is marshalled as []byte. meta, err := base64.StdEncoding.DecodeString(strmeta) if err != nil { return createThingEvent{}, err @@ -189,15 +191,15 @@ func decodeRemoveChannel(event map[string]interface{}) removeChannelEvent { func decodeConnectThing(event map[string]interface{}) connectThingEvent { return connectThingEvent{ - chanID: read(event, "group_id", ""), - thingID: read(event, "member_id", ""), + chanID: read(event, "group_id", ""), + thingIDs: readMemberIDs(event, "member_id"), } } func decodeDisconnectThing(event map[string]interface{}) connectThingEvent { return connectThingEvent{ - chanID: read(event, "chan_id", ""), - thingID: read(event, "thing_id", ""), + chanID: read(event, "chan_id", ""), + thingIDs: readMemberIDs(event, "member_id"), } } @@ -209,3 +211,19 @@ func read(event map[string]interface{}, key, def string) string { return val } + +func readMemberIDs(event map[string]interface{}, key string) []string { + var memberIDs []string + val, ok := event[key].([]interface{}) + if !ok { + return memberIDs + } + + for _, v := range val { + if str, ok := v.(string); ok { + memberIDs = append(memberIDs, str) + } + } + + return memberIDs +} From 81a5956d961559cbdb12b1b1b84799f37052911e Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Wed, 20 Mar 2024 17:15:20 +0300 Subject: [PATCH 10/15] Fix events stream Signed-off-by: WashingtonKK --- internal/groups/events/events.go | 14 ++++++-------- internal/groups/events/streams.go | 2 -- opcua/adapter.go | 9 +++++++-- opcua/events/streams.go | 4 ++-- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index 477a56bf99..d15a2a1caa 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -39,32 +39,30 @@ var ( ) type assignEvent struct { - operation string memberIDs []string groupID string } func (cge assignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ - "operation": groupAssign, - "member_id": cge.memberIDs, - "group_id": cge.groupID, + "operation": groupAssign, + "member_ids": cge.memberIDs, + "group_id": cge.groupID, } return val, nil } type unassignEvent struct { - operation string memberIDs []string groupID string } func (cge unassignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ - "operation": groupUnassign, - "member_id": cge.memberIDs, - "group_id": cge.groupID, + "operation": groupUnassign, + "member_ids": cge.memberIDs, + "group_id": cge.groupID, } return val, nil diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index aa77de1bf0..10610d6686 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -145,7 +145,6 @@ func (es eventStore) Assign(ctx context.Context, token, groupID, relation, membe } event := assignEvent{ - operation: "group.assign", groupID: groupID, memberIDs: memberIDs, } @@ -163,7 +162,6 @@ func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, mem } event := unassignEvent{ - operation: "group.unassign", groupID: groupID, memberIDs: memberIDs, } diff --git a/opcua/adapter.go b/opcua/adapter.go index fc2c9a6564..3061481c56 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -125,7 +125,7 @@ func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thing go func() { if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil { - as.logger.Warn(fmt.Sprintf("subscription failed: %s", err)) + as.logger.Warn("subscription failed", slog.String("error", err.Error())) } }() @@ -148,7 +148,12 @@ func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, iden numericIdentifier, err := strconv.Atoi(identifier) // Convert identifier to int if err != nil { nodeID = fmt.Sprintf("ns=%s;s=%s", namespace, identifier) - as.logger.Warn(fmt.Sprintf("failed to parse numeric nodeID format: %s, defaulting to string", err)) + args := []any{ + slog.String("namespace", namespace), + slog.String("identifier", identifier), + slog.String("error", err.Error()), + } + as.logger.Warn("failed to parse numeric identifier", args...) break } nodeID = fmt.Sprintf("ns=%s;i=%d", namespace, numericIdentifier) diff --git a/opcua/events/streams.go b/opcua/events/streams.go index 3488e35838..de8ca50605 100644 --- a/opcua/events/streams.go +++ b/opcua/events/streams.go @@ -192,14 +192,14 @@ func decodeRemoveChannel(event map[string]interface{}) removeChannelEvent { func decodeConnectThing(event map[string]interface{}) connectThingEvent { return connectThingEvent{ chanID: read(event, "group_id", ""), - thingIDs: readMemberIDs(event, "member_id"), + thingIDs: readMemberIDs(event, "member_ids"), } } func decodeDisconnectThing(event map[string]interface{}) connectThingEvent { return connectThingEvent{ chanID: read(event, "chan_id", ""), - thingIDs: readMemberIDs(event, "member_id"), + thingIDs: readMemberIDs(event, "member_ids"), } } From 7012b3bb8323426efe6348aa04632dfcd83203dc Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Wed, 20 Mar 2024 17:42:42 +0300 Subject: [PATCH 11/15] Fix thingID naming Signed-off-by: WashingtonKK --- opcua/adapter.go | 12 ++++++------ opcua/api/logging.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/opcua/adapter.go b/opcua/adapter.go index 3061481c56..e7cf5e592b 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -37,7 +37,7 @@ type Service interface { ConnectThing(ctx context.Context, chanID string, thingIDs []string) error // DisconnectThing removes thingID:channelID route-map - DisconnectThing(ctx context.Context, chanID string, thingID []string) error + DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error // Browse browses available nodes for a given OPC-UA Server URI and NodeID Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) @@ -109,8 +109,8 @@ func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thing return err } - for _, t := range thingIDs { - nodeID, err := as.thingsRM.Get(ctx, t) + for _, thingID := range thingIDs { + nodeID, err := as.thingsRM.Get(ctx, thingID) if err != nil { return err } @@ -118,7 +118,7 @@ func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thing as.cfg.NodeID = nodeID as.cfg.ServerURI = serverURI - c := fmt.Sprintf("%s:%s", chanID, t) + c := fmt.Sprintf("%s:%s", chanID, thingID) if err := as.connectRM.Save(ctx, c, c); err != nil { return err } @@ -173,8 +173,8 @@ func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, iden } func (as *adapterService) DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error { - for _, t := range thingIDs { - c := fmt.Sprintf("%s:%s", chanID, t) + for _, thingID := range thingIDs { + c := fmt.Sprintf("%s:%s", chanID, thingID) if err := as.connectRM.Remove(ctx, c); err != nil { return err } diff --git a/opcua/api/logging.go b/opcua/api/logging.go index 8a134549c8..8abb13d874 100644 --- a/opcua/api/logging.go +++ b/opcua/api/logging.go @@ -139,7 +139,7 @@ func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID string, args := []any{ slog.String("duration", time.Since(begin).String()), slog.String("channel_id", mgxChanID), - slog.Any("thing_id", mgxThingIDs), + slog.Any("thing_ids", mgxThingIDs), } if err != nil { args = append(args, slog.Any("error", err)) @@ -157,7 +157,7 @@ func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID strin args := []any{ slog.String("duration", time.Since(begin).String()), slog.String("channel_id", mgxChanID), - slog.Any("thing_id", mgxThingIDs), + slog.Any("thing_ids", mgxThingIDs), } if err != nil { args = append(args, slog.Any("error", err)) From 5c90596f4481bf9f76600a25353131898f72fe4d Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Wed, 20 Mar 2024 17:58:20 +0300 Subject: [PATCH 12/15] Enhance adapter logging Signed-off-by: WashingtonKK --- opcua/adapter.go | 4 ++-- opcua/api/logging.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/opcua/adapter.go b/opcua/adapter.go index e7cf5e592b..2fa1b5cedc 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -125,7 +125,7 @@ func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thing go func() { if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil { - as.logger.Warn("subscription failed", slog.String("error", err.Error())) + as.logger.Warn("subscription failed", slog.Any("error", err)) } }() @@ -151,7 +151,7 @@ func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, iden args := []any{ slog.String("namespace", namespace), slog.String("identifier", identifier), - slog.String("error", err.Error()), + slog.Any("error", err), } as.logger.Warn("failed to parse numeric identifier", args...) break diff --git a/opcua/api/logging.go b/opcua/api/logging.go index 8abb13d874..c104ac2041 100644 --- a/opcua/api/logging.go +++ b/opcua/api/logging.go @@ -177,6 +177,7 @@ func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, id slog.String("server_uri", serverURI), slog.String("namespace", namespace), slog.String("identifier", identifier), + slog.String("identifierType", identifierType), } if err != nil { args = append(args, slog.Any("error", err)) From 430b25b067b90d226169f230957226183c4fd26f Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Fri, 22 Mar 2024 16:20:03 +0300 Subject: [PATCH 13/15] fix: remove unnecessary type conversion Signed-off-by: WashingtonKK --- opcua/api/logging.go | 2 +- opcua/events/streams.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opcua/api/logging.go b/opcua/api/logging.go index c104ac2041..fc728bc908 100644 --- a/opcua/api/logging.go +++ b/opcua/api/logging.go @@ -177,7 +177,7 @@ func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, id slog.String("server_uri", serverURI), slog.String("namespace", namespace), slog.String("identifier", identifier), - slog.String("identifierType", identifierType), + slog.String("identifier_type", identifierType), } if err != nil { args = append(args, slog.Any("error", err)) diff --git a/opcua/events/streams.go b/opcua/events/streams.go index de8ca50605..ccee65e733 100644 --- a/opcua/events/streams.go +++ b/opcua/events/streams.go @@ -156,7 +156,7 @@ func decodeCreateChannel(event map[string]interface{}) (createChannelEvent, erro return createChannelEvent{}, err } var metadata map[string]interface{} - if err := json.Unmarshal([]byte(meta), &metadata); err != nil { + if err := json.Unmarshal(meta, &metadata); err != nil { return createChannelEvent{}, err } From 5f575f6700a36a1f3aa913cafe04861d395016f6 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Thu, 28 Mar 2024 13:19:04 +0300 Subject: [PATCH 14/15] feat: add validation to node id formats Signed-off-by: WashingtonKK --- opcua/adapter.go | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/opcua/adapter.go b/opcua/adapter.go index 2fa1b5cedc..7e6573c7b2 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -5,8 +5,10 @@ package opcua import ( "context" + "encoding/base64" "fmt" "log/slog" + "regexp" "strconv" "github.com/absmach/magistrala/opcua/db" @@ -55,6 +57,7 @@ type Config struct { } var _ Service = (*adapterService)(nil) +var guidRegex = regexp.MustCompile(`^\{?[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\}?$`) type adapterService struct { subscriber Subscriber @@ -139,15 +142,12 @@ func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thing } func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) { - var nodeID string - + idFormat := "s" switch identifierType { case "string": - nodeID = fmt.Sprintf("ns=%s;s=%s", namespace, identifier) + break case "numeric": - numericIdentifier, err := strconv.Atoi(identifier) // Convert identifier to int - if err != nil { - nodeID = fmt.Sprintf("ns=%s;s=%s", namespace, identifier) + if _, err := strconv.Atoi(identifier); err != nil { args := []any{ slog.String("namespace", namespace), slog.String("identifier", identifier), @@ -156,15 +156,30 @@ func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, iden as.logger.Warn("failed to parse numeric identifier", args...) break } - nodeID = fmt.Sprintf("ns=%s;i=%d", namespace, numericIdentifier) + idFormat = "i" case "guid": - nodeID = fmt.Sprintf("ns=%s;g=%s", namespace, identifier) + if !guidRegex.MatchString(identifier) { + args := []any{ + slog.String("namespace", namespace), + slog.String("identifier", identifier), + } + as.logger.Warn("GUID identifier has invalid format", args...) + break + } + idFormat = "g" case "opaque": - nodeID = fmt.Sprintf("ns=%s;b=%s", namespace, identifier) - default: - nodeID = fmt.Sprintf("ns=%s;s=%s", namespace, identifier) + if _, err := base64.StdEncoding.DecodeString(identifier); err != nil { + args := []any{ + slog.String("namespace", namespace), + slog.String("identifier", identifier), + slog.Any("error", err), + } + as.logger.Warn("opaque identifier has invalid base64 format", args...) + break + } + idFormat = "b" } - + nodeID := fmt.Sprintf("ns=%s;%s=%s", namespace, idFormat, identifier) nodes, err := as.browser.Browse(serverURI, nodeID) if err != nil { return nil, err From c9c1da200c5070b40d9935b7ac2c2f9d2e243c32 Mon Sep 17 00:00:00 2001 From: WashingtonKK Date: Thu, 28 Mar 2024 13:20:13 +0300 Subject: [PATCH 15/15] fix: ci Signed-off-by: WashingtonKK --- opcua/adapter.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/opcua/adapter.go b/opcua/adapter.go index 7e6573c7b2..0b0a8a149d 100644 --- a/opcua/adapter.go +++ b/opcua/adapter.go @@ -56,8 +56,10 @@ type Config struct { KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""` } -var _ Service = (*adapterService)(nil) -var guidRegex = regexp.MustCompile(`^\{?[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\}?$`) +var ( + _ Service = (*adapterService)(nil) + guidRegex = regexp.MustCompile(`^\{?[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\}?$`) +) type adapterService struct { subscriber Subscriber