Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rx/timing_parser: add compensation for the burst time jitter #703

Merged
merged 4 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/sample/rx_st20p_timing_parser_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ int main(int argc, char** argv) {
ops_rx.output_fmt = ctx.output_fmt;
ops_rx.device = ST_PLUGIN_DEVICE_AUTO;
ops_rx.framebuff_cnt = app[i]->fb_cnt;
ops_rx.rx_burst_size = ctx.rx_burst_size;
ops_rx.flags = ST20P_RX_FLAG_BLOCK_GET;
ops_rx.flags |= ST20P_RX_FLAG_TIMING_PARSER_META;

Expand Down
5 changes: 5 additions & 0 deletions app/sample/sample_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum sample_args_cmd {
SAMPLE_ARG_RSS_MODE,
SAMPLE_ARG_NB_TX_DESC,
SAMPLE_ARG_NB_RX_DESC,
SAMPLE_ARG_RX_BURST_SZ,
SAMPLE_ARG_DHCP,

SAMPLE_ARG_TX_VIDEO_URL = 0x200,
Expand Down Expand Up @@ -102,6 +103,7 @@ static struct option sample_args_options[] = {
{"nb_tx_desc", required_argument, 0, SAMPLE_ARG_NB_TX_DESC},
{"nb_rx_desc", required_argument, 0, SAMPLE_ARG_NB_RX_DESC},
{"dhcp", no_argument, 0, SAMPLE_ARG_DHCP},
{"rx_burst_size", required_argument, 0, SAMPLE_ARG_RX_BURST_SZ},

{"tx_url", required_argument, 0, SAMPLE_ARG_TX_VIDEO_URL},
{"rx_url", required_argument, 0, SAMPLE_ARG_RX_VIDEO_URL},
Expand Down Expand Up @@ -265,6 +267,9 @@ static int _sample_parse_args(struct st_sample_context* ctx, int argc, char** ar
case SAMPLE_ARG_NB_RX_DESC:
p->nb_rx_desc = atoi(optarg);
break;
case SAMPLE_ARG_RX_BURST_SZ:
ctx->rx_burst_size = atoi(optarg);
break;
case SAMPLE_ARG_QUEUES_CNT:
for (int i = 0; i < MTL_PORT_MAX; i++) {
p->rx_queues_cnt[i] = atoi(optarg);
Expand Down
1 change: 1 addition & 0 deletions app/sample/sample_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct st_sample_context {
bool ext_frame;
bool hdr_split;
bool rx_dump;
uint16_t rx_burst_size;

char logo_url[ST_SAMPLE_URL_MAX_LEN];
uint32_t logo_width;
Expand Down
1 change: 1 addition & 0 deletions app/src/app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ struct st_app_context {
bool enable_timing_parser;
bool tx_display;
bool rx_display;
uint16_t rx_burst_size;

bool ptp_systime_sync;
int ptp_sync_cnt;
Expand Down
5 changes: 5 additions & 0 deletions app/src/args.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ enum st_args_cmd {
ST_ARG_PTP_UNICAST_ADDR,
ST_ARG_CNI_THREAD,
ST_ARG_RX_TIMING_PARSER,
ST_ARG_RX_BURST_SZ,
ST_ARG_USER_LCORES,
ST_ARG_SCH_DATA_QUOTA,
ST_ARG_SCH_SESSION_QUOTA,
Expand Down Expand Up @@ -189,6 +190,7 @@ static struct option st_app_args_options[] = {
{"ptp_unicast", no_argument, 0, ST_ARG_PTP_UNICAST_ADDR},
{"cni_thread", no_argument, 0, ST_ARG_CNI_THREAD},
{"rx_timing_parser", no_argument, 0, ST_ARG_RX_TIMING_PARSER},
{"rx_burst_size", required_argument, 0, ST_ARG_RX_BURST_SZ},
{"lcores", required_argument, 0, ST_ARG_USER_LCORES},
{"sch_data_quota", required_argument, 0, ST_ARG_SCH_DATA_QUOTA},
{"sch_session_quota", required_argument, 0, ST_ARG_SCH_SESSION_QUOTA},
Expand Down Expand Up @@ -574,6 +576,9 @@ int st_app_parse_args(struct st_app_context* ctx, struct mtl_init_params* p, int
ctx->enable_timing_parser = true;
p->flags |= MTL_FLAG_ENABLE_HW_TIMESTAMP;
break;
case ST_ARG_RX_BURST_SZ:
ctx->rx_burst_size = atoi(optarg);
break;
case ST_ARG_RX_MONO_POOL:
p->flags |= MTL_FLAG_RX_MONO_POOL;
break;
Expand Down
1 change: 1 addition & 0 deletions app/src/rx_st20p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ static int app_rx_st20p_init(struct st_app_context* ctx,
ops.port.payload_type = st20p ? st20p->base.payload_type : ST_APP_PAYLOAD_TYPE_VIDEO;
ops.device = st20p ? st20p->info.device : ST_PLUGIN_DEVICE_AUTO;
ops.flags |= ST20P_RX_FLAG_BLOCK_GET;
ops.rx_burst_size = ctx->rx_burst_size;
ops.framebuff_cnt = s->framebuff_cnt;
/* always try to enable DMA offload */
ops.flags |= ST20P_RX_FLAG_DMA_OFFLOAD;
Expand Down
1 change: 1 addition & 0 deletions app/src/rx_video_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ static int app_rx_video_init(struct st_app_context* ctx, st_json_video_session_t
ops.rtcp.nack_interval_us = 250;
}
if (ctx->enable_timing_parser) ops.flags |= ST20_RX_FLAG_TIMING_PARSER_STAT;
ops.rx_burst_size = ctx->rx_burst_size;

st_pthread_mutex_init(&s->st20_wake_mutex, NULL);
st_pthread_cond_init(&s->st20_wake_cond, NULL);
Expand Down
3 changes: 3 additions & 0 deletions include/st20_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,9 @@ struct st20_rx_ops {
/** Optional. Flags to control session behaviors. See ST20_RX_FLAG_* for possible value
*/
uint32_t flags;
/* Optional, the size for each mt_rxq_burst, leave to zero to let system select a
* default value */
uint16_t rx_burst_size;

/**
* Mandatory for ST20_TYPE_FRAME_LEVEL/ST20_TYPE_SLICE_LEVEL.
Expand Down
3 changes: 3 additions & 0 deletions include/st_pipeline_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,9 @@ struct st20p_rx_ops {
/** Optional. Flags to control session behaviors. See ST20P_RX_FLAG_* for possible value
*/
uint32_t flags;
/* Optional, the size for each mt_rxq_burst, leave to zero to let system select a
* default value */
uint16_t rx_burst_size;
/**
* Optional. Callback when frame available in the lib.
* And only non-block method can be used within this callback as it run from lcore
Expand Down
1 change: 1 addition & 0 deletions lib/src/st2110/pipeline/st20_pipeline_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ static int rx_st20p_create_transport(struct mtl_main_impl* impl, struct st20p_rx
ops_rx.ssrc = ops->port.ssrc;
ops_rx.type = ST20_TYPE_FRAME_LEVEL;
ops_rx.framebuff_cnt = ops->framebuff_cnt;
ops_rx.rx_burst_size = ops->rx_burst_size;
ops_rx.notify_frame_ready = rx_st20p_frame_ready;
ops_rx.notify_event = rx_st20p_notify_event;
if (ctx->derive) {
Expand Down
10 changes: 7 additions & 3 deletions lib/src/st2110/st_header.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ struct st_tx_muf_priv_data {
uint32_t idx; /* index of current frame */
};

/* info passing between dma and rx */
struct st_rx_muf_priv_data {
uint32_t offset;
uint32_t len;
Expand Down Expand Up @@ -510,8 +509,6 @@ struct st_rv_tp_slot {
/* Cinst, packet level check */
int64_t cinst_sum;
/* vrx, packet level check */
int32_t vrx_drained_prev;
int32_t vrx_prev;
int64_t vrx_sum;
/* Inter-packet time(ns), packet level check */
int64_t ipt_sum;
Expand Down Expand Up @@ -549,6 +546,7 @@ struct st_rx_video_tp {

/* for the status */
struct st_rv_tp_stat stat[MTL_SESSION_PORT_MAX];
uint32_t stat_untrusted_pkts;
};

struct st_rx_video_session_impl {
Expand All @@ -558,6 +556,8 @@ struct st_rx_video_session_impl {
struct st_rx_video_sessions_mgr* parent;
struct st_rx_session_priv priv[MTL_SESSION_PORT_MAX];
bool time_measure;
uint16_t rx_burst_size;
uint16_t cur_succ_burst_cnt;

struct st20_rx_ops ops;
char ops_name[ST_MAX_NAME_LEN];
Expand Down Expand Up @@ -707,6 +707,10 @@ struct st_rx_video_session_impl {
double stat_cpu_busy_score;
/* for tasklet session time measure */
struct mt_stat_u64 stat_time;
/* for rx burst */
int stat_burst_succ_cnt;
uint16_t stat_burst_pkts_max;
uint64_t stat_burst_pkts_sum;
};

struct st_rx_video_sessions_mgr {
Expand Down
14 changes: 6 additions & 8 deletions lib/src/st2110/st_rx_timing_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ void rv_tp_on_packet(struct st_rx_video_session_impl* s, enum mtl_session_port s
int pkt_idx) {
struct st_rx_video_tp* tp = s->tp;
uint64_t epoch_tmstamp;
double tvd, packet_delta_ns, trs = tp->trs;
double tvd, trs = tp->trs;

if (!slot->cur_epochs) { /* the first packet */
uint64_t epochs = (double)pkt_time / s->frame_time;
uint64_t epoch_tmstamp = (double)epochs * s->frame_time;

slot->cur_epochs = epochs;
slot->rtp_tmstamp = rtp_tmstamp;
slot->first_pkt_time = pkt_time;
slot->meta.fpt = pkt_time - epoch_tmstamp;
double first_pkt_time = (double)pkt_time - (trs * pkt_idx);
slot->first_pkt_time = first_pkt_time;
slot->meta.fpt = first_pkt_time - epoch_tmstamp;

uint64_t tmstamp64 = epochs * s->frame_time_sampling;
uint32_t tmstamp32 = tmstamp64;
Expand All @@ -40,16 +41,13 @@ void rv_tp_on_packet(struct st_rx_video_session_impl* s, enum mtl_session_port s

epoch_tmstamp = (uint64_t)(slot->cur_epochs * s->frame_time);
tvd = epoch_tmstamp + tp->pass.tr_offset;
double expect_time = tvd + trs * (pkt_idx + 1);

/* Calculate vrx */
packet_delta_ns = (double)pkt_time - tvd;
int32_t drained = (packet_delta_ns + trs) / trs;
int32_t vrx_cur = slot->vrx_prev + 1 - (drained - slot->vrx_drained_prev);
int32_t vrx_cur = (expect_time - pkt_time) / trs;
slot->vrx_sum += vrx_cur;
slot->meta.vrx_min = RTE_MIN(vrx_cur, slot->meta.vrx_min);
slot->meta.vrx_max = RTE_MAX(vrx_cur, slot->meta.vrx_max);
slot->vrx_prev = vrx_cur;
slot->vrx_drained_prev = drained;

/* Calculate C-inst */
int exp_cin_pkts = ((pkt_time - slot->first_pkt_time) / trs) * ST_TP_CINST_DRAIN_FACTOR;
Expand Down
49 changes: 41 additions & 8 deletions lib/src/st2110/st_rx_video_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ static int rv_start_pcapng(struct mtl_main_impl* impl, struct st_rx_video_sessio
static int rv_dma_dequeue(struct st_rx_video_session_impl* s) {
struct mtl_dma_lender_dev* dma_dev = s->dma_dev;

uint16_t nb_dq = mt_dma_completed(dma_dev, ST_RX_VIDEO_BURST_SIZE, NULL, NULL);
uint16_t nb_dq = mt_dma_completed(dma_dev, s->rx_burst_size, NULL, NULL);

if (nb_dq) {
dbg("%s(%d), nb_dq %u\n", __func__, s->idx, nb_dq);
Expand Down Expand Up @@ -1369,11 +1369,17 @@ static inline void rv_tp_pkt_handle(struct st_rx_video_session_impl* s,
struct rte_mbuf* mbuf, enum mtl_session_port s_port,
struct st_rx_video_slot_impl* slot, uint32_t tmstamp,
int pkt_idx) {
struct st_rx_video_tp* tp = s->tp;
if (s->cur_succ_burst_cnt > (tp->pass.cinst_max_narrow / 2)) {
/* untrusted result */
tp->stat_untrusted_pkts++;
return;
}
struct mtl_main_impl* impl = rv_get_impl(s);
enum mtl_port port = mt_port_logic2phy(s->port_maps, s_port);

uint64_t pkt_ns = mt_mbuf_time_stamp(impl, mbuf, port);
struct st_rv_tp_slot* tp_slot = &s->tp->slots[slot->idx][s_port];
struct st_rv_tp_slot* tp_slot = &tp->slots[slot->idx][s_port];
dbg("%s(%d,%d), tmstamp %u pkt_ns %" PRIu64 " pkt_idx %d\n", __func__, s->idx, s_port,
tmstamp, pkt_ns, pkt_idx);
rv_tp_on_packet(s, s_port, tp_slot, tmstamp, pkt_ns, pkt_idx);
Expand Down Expand Up @@ -2217,7 +2223,7 @@ static int rv_init_pkt_lcore(struct mtl_main_impl* impl,

snprintf(ring_name, 32, "%sM%dS%d_PKT", ST_RX_VIDEO_PREFIX, mgr_idx, idx);
flags = RING_F_SP_ENQ | RING_F_SC_DEQ; /* single-producer and single-consumer */
count = ST_RX_VIDEO_BURST_SIZE * 4;
count = s->rx_burst_size;
ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags);
if (!ring) {
err("%s(%d,%d), ring create fail\n", __func__, mgr_idx, idx);
Expand Down Expand Up @@ -2642,7 +2648,7 @@ static int rv_handle_mbuf(void* priv, struct rte_mbuf** mbuf, uint16_t nb) {
}

static int rv_pkt_rx_tasklet(struct st_rx_video_session_impl* s) {
struct rte_mbuf* mbuf[ST_RX_VIDEO_BURST_SIZE];
struct rte_mbuf* mbuf[s->rx_burst_size];
uint16_t rv;
int num_port = s->ops.num_port;

Expand All @@ -2658,13 +2664,18 @@ static int rv_pkt_rx_tasklet(struct st_rx_video_session_impl* s) {
for (int s_port = 0; s_port < num_port; s_port++) {
if (!s->rxq[s_port]) continue;

rv = mt_rxq_burst(s->rxq[s_port], &mbuf[0], ST_RX_VIDEO_BURST_SIZE);
rv = mt_rxq_burst(s->rxq[s_port], &mbuf[0], s->rx_burst_size);
s->cur_succ_burst_cnt = rv;
if (rv) {
s->stat_burst_succ_cnt++;
s->stat_burst_pkts_sum += rv;
if (rv > s->stat_burst_pkts_max) s->stat_burst_pkts_max = rv;

rv_handle_mbuf(&s->priv[s_port], &mbuf[0], rv);
rte_pktmbuf_free_bulk(&mbuf[0], rv);
}

if (rv) done = false;
done = false;
}
}

/* submit if any */
Expand Down Expand Up @@ -2967,10 +2978,17 @@ static int rv_attach(struct mtl_main_impl* impl, struct st_rx_video_sessions_mgr
s->st20_dst_port[i] = (ops->udp_port[i]) ? (ops->udp_port[i]) : (10000 + idx * 2);
}

/* init trs */
/* init estimated trs */
int estimated_total_pkts = s->st20_frame_size / ST_VIDEO_BPM_SIZE;
s->trs = s->frame_time / estimated_total_pkts;

if (ops->rx_burst_size) {
s->rx_burst_size = ops->rx_burst_size;
info("%s(%d), user customized rx_burst_size %u\n", __func__, idx, s->rx_burst_size);
} else {
s->rx_burst_size = 128;
}

/* init simulated packet loss for test usage */
if (s->ops.flags & ST20_RX_FLAG_SIMULATE_PKT_LOSS) {
uint16_t burst_loss_max = 1;
Expand Down Expand Up @@ -3366,6 +3384,21 @@ static void rv_stat(struct st_rx_video_sessions_mgr* mgr,
s->stat_st22_boxes);
s->stat_st22_boxes = 0;
}
if (s->stat_burst_succ_cnt) {
notice("RX_VIDEO_SESSION(%d,%d): succ burst max %u, avg %f\n", m_idx, idx,
s->stat_burst_pkts_max,
(float)s->stat_burst_pkts_sum / s->stat_burst_succ_cnt);
s->stat_burst_pkts_max = 0;
s->stat_burst_succ_cnt = 0;
s->stat_burst_pkts_sum = 0;
}

struct st_rx_video_tp* tp = s->tp;
if (tp && tp->stat_untrusted_pkts) {
info("%s(%d), untrusted pkts time %u for timing parser\n", __func__, idx,
tp->stat_untrusted_pkts);
tp->stat_untrusted_pkts = 0;
}
if (s->enable_timing_parser_stat) rv_tp_stat(s);

if (s->time_measure) {
Expand Down
2 changes: 0 additions & 2 deletions lib/src/st2110/st_rx_video_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

#include "st_main.h"

#define ST_RX_VIDEO_BURST_SIZE (128)

#define ST_RX_VIDEO_DMA_MIN_SIZE (1024)

#define ST_RV_TP_TSC_SYNC_MS (100) /* sync tsc with ptp period(ms) */
Expand Down
1 change: 1 addition & 0 deletions python/example/misc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def parse_args(is_tx):
# nb_tx_desc & nb_rx_desc
parser.add_argument("--nb_tx_desc", type=int, default=0, help="nb_tx_desc")
parser.add_argument("--nb_rx_desc", type=int, default=0, help="nb_rx_desc")
parser.add_argument("--rx_burst_size", type=int, default=0, help="rx_burst_size")
# p_tx_ip
parser.add_argument(
"--p_tx_ip",
Expand Down
1 change: 1 addition & 0 deletions python/example/rx_timing_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def main():
rx_para.framebuff_cnt = 3
rx_para.transport_fmt = mtl.ST20_FMT_YUV_422_10BIT
rx_para.output_fmt = args.pipeline_fmt
rx_para.rx_burst_size = args.rx_burst_size
# rx port
rx_port = mtl.st_rx_port()
mtl.st_rxp_para_port_set(
Expand Down