Skip to content

Commit

Permalink
Fix Blob payload support + Add buffer metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Konstantin Ilichev <konstantin.ilichev@intel.com>
  • Loading branch information
ko80 committed Feb 28, 2025
1 parent a3df86b commit b56981d
Show file tree
Hide file tree
Showing 24 changed files with 644 additions and 250 deletions.
20 changes: 20 additions & 0 deletions control-plane-agent/api/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ func TestProxyAPI_RegisterConnection(t *testing.T) {
ProxyId: "123",
Kind: "tx",
Config: &sdk.ConnectionConfig{
BufParts: &sdk.BufferPartitions{
Payload: &sdk.BufferPartition{},
Metadata: &sdk.BufferPartition{},
Sysdata: &sdk.BufferPartition{},
},
Conn: &sdk.ConnectionConfig_MultipointGroup{
MultipointGroup: &sdk.ConfigMultipointGroup{
Urn: "abc",
Expand All @@ -272,6 +277,11 @@ func TestProxyAPI_RegisterConnection(t *testing.T) {
ProxyId: "234",
Kind: "rx",
Config: &sdk.ConnectionConfig{
BufParts: &sdk.BufferPartitions{
Payload: &sdk.BufferPartition{},
Metadata: &sdk.BufferPartition{},
Sysdata: &sdk.BufferPartition{},
},
Conn: &sdk.ConnectionConfig_MultipointGroup{
MultipointGroup: &sdk.ConfigMultipointGroup{
Urn: "ABC",
Expand All @@ -295,6 +305,11 @@ func TestProxyAPI_RegisterConnection(t *testing.T) {
ProxyId: "345",
Kind: "tx",
Config: &sdk.ConnectionConfig{
BufParts: &sdk.BufferPartitions{
Payload: &sdk.BufferPartition{},
Metadata: &sdk.BufferPartition{},
Sysdata: &sdk.BufferPartition{},
},
Conn: &sdk.ConnectionConfig_St2110{
St2110: &sdk.ConfigST2110{
RemoteIpAddr: "192.168.96.10",
Expand All @@ -319,6 +334,11 @@ func TestProxyAPI_RegisterConnection(t *testing.T) {
ProxyId: "456",
Kind: "rx",
Config: &sdk.ConnectionConfig{
BufParts: &sdk.BufferPartitions{
Payload: &sdk.BufferPartition{},
Metadata: &sdk.BufferPartition{},
Sysdata: &sdk.BufferPartition{},
},
Conn: &sdk.ConnectionConfig_St2110{
St2110: &sdk.ConfigST2110{
RemoteIpAddr: "192.168.97.10",
Expand Down
1 change: 0 additions & 1 deletion control-plane-agent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ require (
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb // indirect
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 // indirect
)
18 changes: 16 additions & 2 deletions control-plane-agent/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI
github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
Expand All @@ -21,6 +27,16 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE=
go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY=
go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk=
go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0=
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc=
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys=
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand All @@ -32,8 +48,6 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 h1:F29+wU6Ee6qgu9TddPgooOdaqsxTMunOoj8KA5yuS5A=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1/go.mod h1:5KF+wpkbTSbGcR9zteSqZV6fqFOWBl4Yde8En8MryZA=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
44 changes: 44 additions & 0 deletions control-plane-agent/internal/model/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
"control-plane-agent/api/proxy/proto/sdk"
)

type SDKBufferPartition struct {
Offset uint32 `json:"offset"`
Size uint32 `json:"size"`
}

type SDKBufferPartitions struct {
Payload SDKBufferPartition `json:"payload"`
Metadata SDKBufferPartition `json:"metadata"`
Sysdata SDKBufferPartition `json:"sysdata"`
}

type SDKConfigMultipointGroup struct {
URN string `json:"urn"`
}
Expand Down Expand Up @@ -54,6 +65,8 @@ type SDKConnectionConfig struct {
MaxMetadataSize uint32 `json:"maxMetadataSize"`
CalculatedPayloadSize uint32 `json:"calculatedPayloadSize"`

BufParts SDKBufferPartitions `json:"bufPartitions"`

Conn struct {
MultipointGroup *SDKConfigMultipointGroup `json:"multipointGroup,omitempty"`
ST2110 *SDKConfigST2110 `json:"st2110,omitempty"`
Expand Down Expand Up @@ -125,6 +138,22 @@ func (s *SDKConnectionConfig) AssignFromPb(cfg *sdk.ConnectionConfig) error {
s.MaxMetadataSize = cfg.MaxMetadataSize
s.CalculatedPayloadSize = cfg.CalculatedPayloadSize

if cfg.BufParts == nil ||
cfg.BufParts.Payload == nil ||
cfg.BufParts.Metadata == nil ||
cfg.BufParts.Sysdata == nil {
return errors.New("sdk buf parts cfg is nil")
}

s.BufParts.Payload.Offset = cfg.BufParts.Payload.Offset
s.BufParts.Payload.Size = cfg.BufParts.Payload.Size

s.BufParts.Metadata.Offset = cfg.BufParts.Metadata.Offset
s.BufParts.Metadata.Size = cfg.BufParts.Metadata.Size

s.BufParts.Sysdata.Offset = cfg.BufParts.Sysdata.Offset
s.BufParts.Sysdata.Size = cfg.BufParts.Sysdata.Size

switch conn := cfg.Conn.(type) {
case *sdk.ConnectionConfig_MultipointGroup:
s.Conn.MultipointGroup = &SDKConfigMultipointGroup{
Expand Down Expand Up @@ -178,6 +207,21 @@ func (s *SDKConnectionConfig) AssignToPb(cfg *sdk.ConnectionConfig) {
cfg.MaxMetadataSize = s.MaxMetadataSize
cfg.CalculatedPayloadSize = s.CalculatedPayloadSize

cfg.BufParts = &sdk.BufferPartitions{
Payload: &sdk.BufferPartition{
Offset: s.BufParts.Payload.Offset,
Size: s.BufParts.Payload.Size,
},
Metadata: &sdk.BufferPartition{
Offset: s.BufParts.Metadata.Offset,
Size: s.BufParts.Metadata.Size,
},
Sysdata: &sdk.BufferPartition{
Offset: s.BufParts.Sysdata.Offset,
Size: s.BufParts.Sysdata.Size,
},
}

switch {
case s.Conn.MultipointGroup != nil:
cfg.Conn = &sdk.ConnectionConfig_MultipointGroup{
Expand Down
49 changes: 49 additions & 0 deletions media-proxy/include/mesh/buf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#ifndef BUF_H
#define BUF_H

#include <cstdint>
#include <cstddef>

namespace mesh::connection {

/**
* Buffer partition definition structure
*/
class BufferPartition {
public:
uint32_t size;
uint32_t offset;
};

/**
* Buffer partitioning definition structure
*/
class BufferPartitions {
public:
BufferPartition payload;
BufferPartition metadata;
BufferPartition sysdata;

size_t total_size() const;
};

/**
* System data structure transmitted within every buffer
*/
class BufferSysData {
public:
int64_t timestamp_ms;
uint32_t seq;
uint32_t payload_len;
uint32_t metadata_len;
};

} // namespace mesh::connection

#endif // BUF_H
5 changes: 5 additions & 0 deletions media-proxy/include/mesh/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <atomic>
#include <cstddef>
#include "buf.h"
#include "concurrency.h"
#include "metrics.h"
#include "sdk.grpc.pb.h"
Expand Down Expand Up @@ -70,6 +71,7 @@ enum class Result {
error_general_failure,
error_context_cancelled,
error_conn_config_invalid,
error_buf_config_invalid,
error_payload_config_invalid,

error_already_initialized,
Expand Down Expand Up @@ -117,6 +119,7 @@ class Config {
public:
Result assign_from_pb(const sdk::ConnectionConfig& config);
void assign_to_pb(sdk::ConnectionConfig& config) const;
void copy_buf_parts_from(const Config& config);

sdk::ConnectionKind kind;

Expand All @@ -126,6 +129,8 @@ class Config {

uint32_t calculated_payload_size;

BufferPartitions buf_parts;

ConnectionType conn_type;

struct {
Expand Down
28 changes: 27 additions & 1 deletion media-proxy/include/mesh/st2110rx.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,44 @@ class ST2110Rx : public ST2110<FRAME, HANDLE, OPS> {

private:
void frame_thread() {
if (this->transfer_size > this->config.buf_parts.payload.size) {
log::error("ST2110Rx frame thread transfer size larger than buf payload size")
("transfer_size", this->transfer_size)
("payload.size", this->config.buf_parts.payload.size);
return;
}

auto buf_sz = this->config.buf_parts.total_size();
auto buf = new char[buf_sz];
if (buf == nullptr) {
log::error("ST2110Rx frame thread buf out of memory");
return;
}

auto sysdata = (BufferSysData *)(buf + this->config.buf_parts.sysdata.offset);
auto payload_ptr = (void *)(buf + this->config.buf_parts.payload.offset);

sysdata->timestamp_ms = 0;
sysdata->seq = 0;
sysdata->payload_len = this->transfer_size;
sysdata->metadata_len = 0;

while (!this->_ctx.cancelled()) {
// Get full buffer from MTL
FRAME *frame_ptr = this->get_frame(this->mtl_session);
if (frame_ptr) {
std::memcpy(payload_ptr, get_frame_data_ptr(frame_ptr), this->transfer_size);

// Forward buffer to emulated receiver
this->transmit(this->_ctx, get_frame_data_ptr(frame_ptr), this->transfer_size);
this->transmit(this->_ctx, buf, buf_sz);
// Return used buffer to MTL
this->put_frame(this->mtl_session, frame_ptr);
} else {
this->wait_frame_available();
}
}

delete[] buf;
}
};

Expand Down
17 changes: 13 additions & 4 deletions media-proxy/include/mesh/st2110tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class ST2110Tx : public ST2110<FRAME, HANDLE, OPS> {
}

Result on_receive(context::Context& ctx, void *ptr, uint32_t sz, uint32_t& sent) override {
int copy_size = this->transfer_size > sz ? sz : this->transfer_size;

FRAME *frame;
for (;;) {
if (ctx.cancelled() || this->_ctx.cancelled())
Expand All @@ -60,12 +58,23 @@ class ST2110Tx : public ST2110<FRAME, HANDLE, OPS> {
this->wait_frame_available();
}

auto buf_total_size = this->config.buf_parts.total_size();
if (sz > buf_total_size)
sz = buf_total_size;

auto base_ptr = (char *)ptr;
auto sysdata = (BufferSysData *)(base_ptr + this->config.buf_parts.sysdata.offset);
auto payload_ptr = (void *)(base_ptr + this->config.buf_parts.payload.offset);

int copy_size = sysdata->payload_len > this->transfer_size ?
this->transfer_size : sysdata->payload_len;

// Copy data from emulated transmitter to MTL empty buffer
mtl_memcpy(get_frame_data_ptr(frame), ptr, copy_size);
mtl_memcpy(get_frame_data_ptr(frame), payload_ptr, copy_size);
// Return full buffer to MTL
this->put_frame(this->mtl_session, frame);

sent = this->transfer_size;
sent = sz; // TODO: Probably replace with copy_size?
return this->set_result(Result::success);
};
};
Expand Down
15 changes: 15 additions & 0 deletions media-proxy/src/mesh/buf.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#include "buf.h"

namespace mesh::connection {

size_t BufferPartitions::total_size() const {
return payload.size + metadata.size + sysdata.size;
}

} // namespace mesh::connection
Loading

0 comments on commit b56981d

Please sign in to comment.