Skip to content

Commit

Permalink
rx/timing_parser: add compensation for the burst time jitter (#703)
Browse files Browse the repository at this point in the history
test with:
python3 python/example/rx_timing_parser.py --pipeline_fmt
YUV422RFC4175PG2BE10 --p_port 0000:af:00.1 --ptp --rx_burst_size 256

Signed-off-by: Frank Du <frank.du@intel.com>
  • Loading branch information
frankdjx authored Jan 15, 2024
1 parent 931bef7 commit 3b4ee9a
Show file tree
Hide file tree
Showing 16 changed files with 78 additions and 21 deletions.
1 change: 1 addition & 0 deletions app/sample/rx_st20p_timing_parser_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ int main(int argc, char** argv) {
ops_rx.output_fmt = ctx.output_fmt;
ops_rx.device = ST_PLUGIN_DEVICE_AUTO;
ops_rx.framebuff_cnt = app[i]->fb_cnt;
ops_rx.rx_burst_size = ctx.rx_burst_size;
ops_rx.flags = ST20P_RX_FLAG_BLOCK_GET;
ops_rx.flags |= ST20P_RX_FLAG_TIMING_PARSER_META;

Expand Down
5 changes: 5 additions & 0 deletions app/sample/sample_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum sample_args_cmd {
SAMPLE_ARG_RSS_MODE,
SAMPLE_ARG_NB_TX_DESC,
SAMPLE_ARG_NB_RX_DESC,
SAMPLE_ARG_RX_BURST_SZ,
SAMPLE_ARG_DHCP,

SAMPLE_ARG_TX_VIDEO_URL = 0x200,
Expand Down Expand Up @@ -102,6 +103,7 @@ static struct option sample_args_options[] = {
{"nb_tx_desc", required_argument, 0, SAMPLE_ARG_NB_TX_DESC},
{"nb_rx_desc", required_argument, 0, SAMPLE_ARG_NB_RX_DESC},
{"dhcp", no_argument, 0, SAMPLE_ARG_DHCP},
{"rx_burst_size", required_argument, 0, SAMPLE_ARG_RX_BURST_SZ},

{"tx_url", required_argument, 0, SAMPLE_ARG_TX_VIDEO_URL},
{"rx_url", required_argument, 0, SAMPLE_ARG_RX_VIDEO_URL},
Expand Down Expand Up @@ -265,6 +267,9 @@ static int _sample_parse_args(struct st_sample_context* ctx, int argc, char** ar
case SAMPLE_ARG_NB_RX_DESC:
p->nb_rx_desc = atoi(optarg);
break;
case SAMPLE_ARG_RX_BURST_SZ:
ctx->rx_burst_size = atoi(optarg);
break;
case SAMPLE_ARG_QUEUES_CNT:
for (int i = 0; i < MTL_PORT_MAX; i++) {
p->rx_queues_cnt[i] = atoi(optarg);
Expand Down
1 change: 1 addition & 0 deletions app/sample/sample_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct st_sample_context {
bool ext_frame;
bool hdr_split;
bool rx_dump;
uint16_t rx_burst_size;

char logo_url[ST_SAMPLE_URL_MAX_LEN];
uint32_t logo_width;
Expand Down
1 change: 1 addition & 0 deletions app/src/app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ struct st_app_context {
bool enable_timing_parser;
bool tx_display;
bool rx_display;
uint16_t rx_burst_size;

bool ptp_systime_sync;
int ptp_sync_cnt;
Expand Down
5 changes: 5 additions & 0 deletions app/src/args.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ enum st_args_cmd {
ST_ARG_PTP_UNICAST_ADDR,
ST_ARG_CNI_THREAD,
ST_ARG_RX_TIMING_PARSER,
ST_ARG_RX_BURST_SZ,
ST_ARG_USER_LCORES,
ST_ARG_SCH_DATA_QUOTA,
ST_ARG_SCH_SESSION_QUOTA,
Expand Down Expand Up @@ -189,6 +190,7 @@ static struct option st_app_args_options[] = {
{"ptp_unicast", no_argument, 0, ST_ARG_PTP_UNICAST_ADDR},
{"cni_thread", no_argument, 0, ST_ARG_CNI_THREAD},
{"rx_timing_parser", no_argument, 0, ST_ARG_RX_TIMING_PARSER},
{"rx_burst_size", required_argument, 0, ST_ARG_RX_BURST_SZ},
{"lcores", required_argument, 0, ST_ARG_USER_LCORES},
{"sch_data_quota", required_argument, 0, ST_ARG_SCH_DATA_QUOTA},
{"sch_session_quota", required_argument, 0, ST_ARG_SCH_SESSION_QUOTA},
Expand Down Expand Up @@ -574,6 +576,9 @@ int st_app_parse_args(struct st_app_context* ctx, struct mtl_init_params* p, int
ctx->enable_timing_parser = true;
p->flags |= MTL_FLAG_ENABLE_HW_TIMESTAMP;
break;
case ST_ARG_RX_BURST_SZ:
ctx->rx_burst_size = atoi(optarg);
break;
case ST_ARG_RX_MONO_POOL:
p->flags |= MTL_FLAG_RX_MONO_POOL;
break;
Expand Down
1 change: 1 addition & 0 deletions app/src/rx_st20p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ static int app_rx_st20p_init(struct st_app_context* ctx,
ops.port.payload_type = st20p ? st20p->base.payload_type : ST_APP_PAYLOAD_TYPE_VIDEO;
ops.device = st20p ? st20p->info.device : ST_PLUGIN_DEVICE_AUTO;
ops.flags |= ST20P_RX_FLAG_BLOCK_GET;
ops.rx_burst_size = ctx->rx_burst_size;
ops.framebuff_cnt = s->framebuff_cnt;
/* always try to enable DMA offload */
ops.flags |= ST20P_RX_FLAG_DMA_OFFLOAD;
Expand Down
1 change: 1 addition & 0 deletions app/src/rx_video_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ static int app_rx_video_init(struct st_app_context* ctx, st_json_video_session_t
ops.rtcp.nack_interval_us = 250;
}
if (ctx->enable_timing_parser) ops.flags |= ST20_RX_FLAG_TIMING_PARSER_STAT;
ops.rx_burst_size = ctx->rx_burst_size;

st_pthread_mutex_init(&s->st20_wake_mutex, NULL);
st_pthread_cond_init(&s->st20_wake_cond, NULL);
Expand Down
3 changes: 3 additions & 0 deletions include/st20_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,9 @@ struct st20_rx_ops {
/** Optional. Flags to control session behaviors. See ST20_RX_FLAG_* for possible value
*/
uint32_t flags;
/* Optional, the size for each mt_rxq_burst, leave to zero to let system select a
* default value */
uint16_t rx_burst_size;

/**
* Mandatory for ST20_TYPE_FRAME_LEVEL/ST20_TYPE_SLICE_LEVEL.
Expand Down
3 changes: 3 additions & 0 deletions include/st_pipeline_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,9 @@ struct st20p_rx_ops {
/** Optional. Flags to control session behaviors. See ST20P_RX_FLAG_* for possible value
*/
uint32_t flags;
/* Optional, the size for each mt_rxq_burst, leave to zero to let system select a
* default value */
uint16_t rx_burst_size;
/**
* Optional. Callback when frame available in the lib.
* And only non-block method can be used within this callback as it run from lcore
Expand Down
1 change: 1 addition & 0 deletions lib/src/st2110/pipeline/st20_pipeline_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ static int rx_st20p_create_transport(struct mtl_main_impl* impl, struct st20p_rx
ops_rx.ssrc = ops->port.ssrc;
ops_rx.type = ST20_TYPE_FRAME_LEVEL;
ops_rx.framebuff_cnt = ops->framebuff_cnt;
ops_rx.rx_burst_size = ops->rx_burst_size;
ops_rx.notify_frame_ready = rx_st20p_frame_ready;
ops_rx.notify_event = rx_st20p_notify_event;
if (ctx->derive) {
Expand Down
10 changes: 7 additions & 3 deletions lib/src/st2110/st_header.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ struct st_tx_muf_priv_data {
uint32_t idx; /* index of current frame */
};

/* info passing between dma and rx */
struct st_rx_muf_priv_data {
uint32_t offset;
uint32_t len;
Expand Down Expand Up @@ -510,8 +509,6 @@ struct st_rv_tp_slot {
/* Cinst, packet level check */
int64_t cinst_sum;
/* vrx, packet level check */
int32_t vrx_drained_prev;
int32_t vrx_prev;
int64_t vrx_sum;
/* Inter-packet time(ns), packet level check */
int64_t ipt_sum;
Expand Down Expand Up @@ -549,6 +546,7 @@ struct st_rx_video_tp {

/* for the status */
struct st_rv_tp_stat stat[MTL_SESSION_PORT_MAX];
uint32_t stat_untrusted_pkts;
};

struct st_rx_video_session_impl {
Expand All @@ -558,6 +556,8 @@ struct st_rx_video_session_impl {
struct st_rx_video_sessions_mgr* parent;
struct st_rx_session_priv priv[MTL_SESSION_PORT_MAX];
bool time_measure;
uint16_t rx_burst_size;
uint16_t cur_succ_burst_cnt;

struct st20_rx_ops ops;
char ops_name[ST_MAX_NAME_LEN];
Expand Down Expand Up @@ -707,6 +707,10 @@ struct st_rx_video_session_impl {
double stat_cpu_busy_score;
/* for tasklet session time measure */
struct mt_stat_u64 stat_time;
/* for rx burst */
int stat_burst_succ_cnt;
uint16_t stat_burst_pkts_max;
uint64_t stat_burst_pkts_sum;
};

struct st_rx_video_sessions_mgr {
Expand Down
14 changes: 6 additions & 8 deletions lib/src/st2110/st_rx_timing_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ void rv_tp_on_packet(struct st_rx_video_session_impl* s, enum mtl_session_port s
int pkt_idx) {
struct st_rx_video_tp* tp = s->tp;
uint64_t epoch_tmstamp;
double tvd, packet_delta_ns, trs = tp->trs;
double tvd, trs = tp->trs;

if (!slot->cur_epochs) { /* the first packet */
uint64_t epochs = (double)pkt_time / s->frame_time;
uint64_t epoch_tmstamp = (double)epochs * s->frame_time;

slot->cur_epochs = epochs;
slot->rtp_tmstamp = rtp_tmstamp;
slot->first_pkt_time = pkt_time;
slot->meta.fpt = pkt_time - epoch_tmstamp;
double first_pkt_time = (double)pkt_time - (trs * pkt_idx);
slot->first_pkt_time = first_pkt_time;
slot->meta.fpt = first_pkt_time - epoch_tmstamp;

uint64_t tmstamp64 = epochs * s->frame_time_sampling;
uint32_t tmstamp32 = tmstamp64;
Expand All @@ -40,16 +41,13 @@ void rv_tp_on_packet(struct st_rx_video_session_impl* s, enum mtl_session_port s

epoch_tmstamp = (uint64_t)(slot->cur_epochs * s->frame_time);
tvd = epoch_tmstamp + tp->pass.tr_offset;
double expect_time = tvd + trs * (pkt_idx + 1);

/* Calculate vrx */
packet_delta_ns = (double)pkt_time - tvd;
int32_t drained = (packet_delta_ns + trs) / trs;
int32_t vrx_cur = slot->vrx_prev + 1 - (drained - slot->vrx_drained_prev);
int32_t vrx_cur = (expect_time - pkt_time) / trs;
slot->vrx_sum += vrx_cur;
slot->meta.vrx_min = RTE_MIN(vrx_cur, slot->meta.vrx_min);
slot->meta.vrx_max = RTE_MAX(vrx_cur, slot->meta.vrx_max);
slot->vrx_prev = vrx_cur;
slot->vrx_drained_prev = drained;

/* Calculate C-inst */
int exp_cin_pkts = ((pkt_time - slot->first_pkt_time) / trs) * ST_TP_CINST_DRAIN_FACTOR;
Expand Down
49 changes: 41 additions & 8 deletions lib/src/st2110/st_rx_video_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ static int rv_start_pcapng(struct mtl_main_impl* impl, struct st_rx_video_sessio
static int rv_dma_dequeue(struct st_rx_video_session_impl* s) {
struct mtl_dma_lender_dev* dma_dev = s->dma_dev;

uint16_t nb_dq = mt_dma_completed(dma_dev, ST_RX_VIDEO_BURST_SIZE, NULL, NULL);
uint16_t nb_dq = mt_dma_completed(dma_dev, s->rx_burst_size, NULL, NULL);

if (nb_dq) {
dbg("%s(%d), nb_dq %u\n", __func__, s->idx, nb_dq);
Expand Down Expand Up @@ -1369,11 +1369,17 @@ static inline void rv_tp_pkt_handle(struct st_rx_video_session_impl* s,
struct rte_mbuf* mbuf, enum mtl_session_port s_port,
struct st_rx_video_slot_impl* slot, uint32_t tmstamp,
int pkt_idx) {
struct st_rx_video_tp* tp = s->tp;
if (s->cur_succ_burst_cnt > (tp->pass.cinst_max_narrow / 2)) {
/* untrusted result */
tp->stat_untrusted_pkts++;
return;
}
struct mtl_main_impl* impl = rv_get_impl(s);
enum mtl_port port = mt_port_logic2phy(s->port_maps, s_port);

uint64_t pkt_ns = mt_mbuf_time_stamp(impl, mbuf, port);
struct st_rv_tp_slot* tp_slot = &s->tp->slots[slot->idx][s_port];
struct st_rv_tp_slot* tp_slot = &tp->slots[slot->idx][s_port];
dbg("%s(%d,%d), tmstamp %u pkt_ns %" PRIu64 " pkt_idx %d\n", __func__, s->idx, s_port,
tmstamp, pkt_ns, pkt_idx);
rv_tp_on_packet(s, s_port, tp_slot, tmstamp, pkt_ns, pkt_idx);
Expand Down Expand Up @@ -2217,7 +2223,7 @@ static int rv_init_pkt_lcore(struct mtl_main_impl* impl,

snprintf(ring_name, 32, "%sM%dS%d_PKT", ST_RX_VIDEO_PREFIX, mgr_idx, idx);
flags = RING_F_SP_ENQ | RING_F_SC_DEQ; /* single-producer and single-consumer */
count = ST_RX_VIDEO_BURST_SIZE * 4;
count = s->rx_burst_size;
ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags);
if (!ring) {
err("%s(%d,%d), ring create fail\n", __func__, mgr_idx, idx);
Expand Down Expand Up @@ -2642,7 +2648,7 @@ static int rv_handle_mbuf(void* priv, struct rte_mbuf** mbuf, uint16_t nb) {
}

static int rv_pkt_rx_tasklet(struct st_rx_video_session_impl* s) {
struct rte_mbuf* mbuf[ST_RX_VIDEO_BURST_SIZE];
struct rte_mbuf* mbuf[s->rx_burst_size];
uint16_t rv;
int num_port = s->ops.num_port;

Expand All @@ -2658,13 +2664,18 @@ static int rv_pkt_rx_tasklet(struct st_rx_video_session_impl* s) {
for (int s_port = 0; s_port < num_port; s_port++) {
if (!s->rxq[s_port]) continue;

rv = mt_rxq_burst(s->rxq[s_port], &mbuf[0], ST_RX_VIDEO_BURST_SIZE);
rv = mt_rxq_burst(s->rxq[s_port], &mbuf[0], s->rx_burst_size);
s->cur_succ_burst_cnt = rv;
if (rv) {
s->stat_burst_succ_cnt++;
s->stat_burst_pkts_sum += rv;
if (rv > s->stat_burst_pkts_max) s->stat_burst_pkts_max = rv;

rv_handle_mbuf(&s->priv[s_port], &mbuf[0], rv);
rte_pktmbuf_free_bulk(&mbuf[0], rv);
}

if (rv) done = false;
done = false;
}
}

/* submit if any */
Expand Down Expand Up @@ -2967,10 +2978,17 @@ static int rv_attach(struct mtl_main_impl* impl, struct st_rx_video_sessions_mgr
s->st20_dst_port[i] = (ops->udp_port[i]) ? (ops->udp_port[i]) : (10000 + idx * 2);
}

/* init trs */
/* init estimated trs */
int estimated_total_pkts = s->st20_frame_size / ST_VIDEO_BPM_SIZE;
s->trs = s->frame_time / estimated_total_pkts;

if (ops->rx_burst_size) {
s->rx_burst_size = ops->rx_burst_size;
info("%s(%d), user customized rx_burst_size %u\n", __func__, idx, s->rx_burst_size);
} else {
s->rx_burst_size = 128;
}

/* init simulated packet loss for test usage */
if (s->ops.flags & ST20_RX_FLAG_SIMULATE_PKT_LOSS) {
uint16_t burst_loss_max = 1;
Expand Down Expand Up @@ -3366,6 +3384,21 @@ static void rv_stat(struct st_rx_video_sessions_mgr* mgr,
s->stat_st22_boxes);
s->stat_st22_boxes = 0;
}
if (s->stat_burst_succ_cnt) {
notice("RX_VIDEO_SESSION(%d,%d): succ burst max %u, avg %f\n", m_idx, idx,
s->stat_burst_pkts_max,
(float)s->stat_burst_pkts_sum / s->stat_burst_succ_cnt);
s->stat_burst_pkts_max = 0;
s->stat_burst_succ_cnt = 0;
s->stat_burst_pkts_sum = 0;
}

struct st_rx_video_tp* tp = s->tp;
if (tp && tp->stat_untrusted_pkts) {
info("%s(%d), untrusted pkts time %u for timing parser\n", __func__, idx,
tp->stat_untrusted_pkts);
tp->stat_untrusted_pkts = 0;
}
if (s->enable_timing_parser_stat) rv_tp_stat(s);

if (s->time_measure) {
Expand Down
2 changes: 0 additions & 2 deletions lib/src/st2110/st_rx_video_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

#include "st_main.h"

#define ST_RX_VIDEO_BURST_SIZE (128)

#define ST_RX_VIDEO_DMA_MIN_SIZE (1024)

#define ST_RV_TP_TSC_SYNC_MS (100) /* sync tsc with ptp period(ms) */
Expand Down
1 change: 1 addition & 0 deletions python/example/misc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def parse_args(is_tx):
# nb_tx_desc & nb_rx_desc
parser.add_argument("--nb_tx_desc", type=int, default=0, help="nb_tx_desc")
parser.add_argument("--nb_rx_desc", type=int, default=0, help="nb_rx_desc")
parser.add_argument("--rx_burst_size", type=int, default=0, help="rx_burst_size")
# p_tx_ip
parser.add_argument(
"--p_tx_ip",
Expand Down
1 change: 1 addition & 0 deletions python/example/rx_timing_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def main():
rx_para.framebuff_cnt = 3
rx_para.transport_fmt = mtl.ST20_FMT_YUV_422_10BIT
rx_para.output_fmt = args.pipeline_fmt
rx_para.rx_burst_size = args.rx_burst_size
# rx port
rx_port = mtl.st_rx_port()
mtl.st_rxp_para_port_set(
Expand Down

0 comments on commit 3b4ee9a

Please sign in to comment.