Skip to content

Commit

Permalink
rx/timing_parser: add compensation for the burst time jitter
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Du <frank.du@intel.com>
  • Loading branch information
frankdjx committed Jan 15, 2024
1 parent 931bef7 commit 731917a
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 14 deletions.
3 changes: 3 additions & 0 deletions include/st20_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,9 @@ struct st20_rx_ops {
/** Optional. Flags to control session behaviors. See ST20_RX_FLAG_* for possible value
*/
uint32_t flags;
/* Optional, the size for each mt_rxq_burst, leave to zero to let system select a
* default value */
uint16_t rx_burst_size;

/**
* Mandatory for ST20_TYPE_FRAME_LEVEL/ST20_TYPE_SLICE_LEVEL.
Expand Down
3 changes: 3 additions & 0 deletions include/st_pipeline_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,9 @@ struct st20p_rx_ops {
/** Optional. Flags to control session behaviors. See ST20P_RX_FLAG_* for possible value
*/
uint32_t flags;
/* Optional, the size for each mt_rxq_burst, leave to zero to let system select a
* default value */
uint16_t rx_burst_size;
/**
* Optional. Callback when frame available in the lib.
* And only non-block method can be used within this callback as it run from lcore
Expand Down
10 changes: 10 additions & 0 deletions lib/src/mt_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,16 @@ static inline uint32_t st_rx_mbuf_get_len(struct rte_mbuf* mbuf) {
return priv->rx_priv.len;
}

static inline void st_rx_mbuf_set_ptp(struct rte_mbuf* mbuf, uint64_t time_stamp) {
struct mt_muf_priv_data* priv = rte_mbuf_to_priv(mbuf);
priv->rx_priv.ptp_time_stamp = time_stamp;
}

static inline uint64_t st_rx_mbuf_get_ptp(struct rte_mbuf* mbuf) {
struct mt_muf_priv_data* priv = rte_mbuf_to_priv(mbuf);
return priv->rx_priv.ptp_time_stamp;
}

#ifdef ST_PCAPNG_ENABLED
struct rte_mbuf* mt_pcapng_copy(struct mtl_main_impl* impl, enum mtl_port port,
struct mt_rxq_entry* rxq, const struct rte_mbuf* m,
Expand Down
1 change: 1 addition & 0 deletions lib/src/st2110/pipeline/st20_pipeline_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ static int rx_st20p_create_transport(struct mtl_main_impl* impl, struct st20p_rx
ops_rx.ssrc = ops->port.ssrc;
ops_rx.type = ST20_TYPE_FRAME_LEVEL;
ops_rx.framebuff_cnt = ops->framebuff_cnt;
ops_rx.rx_burst_size = ops->rx_burst_size;
ops_rx.notify_frame_ready = rx_st20p_frame_ready;
ops_rx.notify_event = rx_st20p_notify_event;
if (ctx->derive) {
Expand Down
10 changes: 8 additions & 2 deletions lib/src/st2110/st_header.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,11 @@ struct st_tx_muf_priv_data {
uint32_t idx; /* index of current frame */
};

/* info passing between dma and rx */
struct st_rx_muf_priv_data {
uint64_t ptp_time_stamp; /* ptp time stamp of current mbuf */
uint32_t offset;
uint32_t len;
uint32_t lender;
uint32_t padding;
};

/* the frame is malloc by rte malloc, not ext or head split */
Expand Down Expand Up @@ -549,6 +548,7 @@ struct st_rx_video_tp {

/* for the status */
struct st_rv_tp_stat stat[MTL_SESSION_PORT_MAX];
uint32_t stat_adjust;
};

struct st_rx_video_session_impl {
Expand All @@ -558,6 +558,7 @@ struct st_rx_video_session_impl {
struct st_rx_video_sessions_mgr* parent;
struct st_rx_session_priv priv[MTL_SESSION_PORT_MAX];
bool time_measure;
uint16_t rx_burst_size;

struct st20_rx_ops ops;
char ops_name[ST_MAX_NAME_LEN];
Expand Down Expand Up @@ -707,6 +708,11 @@ struct st_rx_video_session_impl {
double stat_cpu_busy_score;
/* for tasklet session time measure */
struct mt_stat_u64 stat_time;
/* for rx burst */
uint64_t last_burst_time[MTL_SESSION_PORT_MAX];
int stat_burst_succ_cnt;
uint16_t stat_burst_pkts_max;
uint64_t stat_burst_pkts_sum;
};

struct st_rx_video_sessions_mgr {
Expand Down
74 changes: 64 additions & 10 deletions lib/src/st2110/st_rx_video_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ static int rv_start_pcapng(struct mtl_main_impl* impl, struct st_rx_video_sessio
static int rv_dma_dequeue(struct st_rx_video_session_impl* s) {
struct mtl_dma_lender_dev* dma_dev = s->dma_dev;

uint16_t nb_dq = mt_dma_completed(dma_dev, ST_RX_VIDEO_BURST_SIZE, NULL, NULL);
uint16_t nb_dq = mt_dma_completed(dma_dev, s->rx_burst_size, NULL, NULL);

if (nb_dq) {
dbg("%s(%d), nb_dq %u\n", __func__, s->idx, nb_dq);
Expand Down Expand Up @@ -1369,10 +1369,7 @@ static inline void rv_tp_pkt_handle(struct st_rx_video_session_impl* s,
struct rte_mbuf* mbuf, enum mtl_session_port s_port,
struct st_rx_video_slot_impl* slot, uint32_t tmstamp,
int pkt_idx) {
struct mtl_main_impl* impl = rv_get_impl(s);
enum mtl_port port = mt_port_logic2phy(s->port_maps, s_port);

uint64_t pkt_ns = mt_mbuf_time_stamp(impl, mbuf, port);
uint64_t pkt_ns = st_rx_mbuf_get_ptp(mbuf);
struct st_rv_tp_slot* tp_slot = &s->tp->slots[slot->idx][s_port];
dbg("%s(%d,%d), tmstamp %u pkt_ns %" PRIu64 " pkt_idx %d\n", __func__, s->idx, s_port,
tmstamp, pkt_ns, pkt_idx);
Expand Down Expand Up @@ -2217,7 +2214,7 @@ static int rv_init_pkt_lcore(struct mtl_main_impl* impl,

snprintf(ring_name, 32, "%sM%dS%d_PKT", ST_RX_VIDEO_PREFIX, mgr_idx, idx);
flags = RING_F_SP_ENQ | RING_F_SC_DEQ; /* single-producer and single-consumer */
count = ST_RX_VIDEO_BURST_SIZE * 4;
count = s->rx_burst_size;
ring = rte_ring_create(ring_name, count, mt_socket_id(impl, port), flags);
if (!ring) {
err("%s(%d,%d), ring create fail\n", __func__, mgr_idx, idx);
Expand Down Expand Up @@ -2642,7 +2639,7 @@ static int rv_handle_mbuf(void* priv, struct rte_mbuf** mbuf, uint16_t nb) {
}

static int rv_pkt_rx_tasklet(struct st_rx_video_session_impl* s) {
struct rte_mbuf* mbuf[ST_RX_VIDEO_BURST_SIZE];
struct rte_mbuf* mbuf[s->rx_burst_size];
uint16_t rv;
int num_port = s->ops.num_port;

Expand All @@ -2658,13 +2655,44 @@ static int rv_pkt_rx_tasklet(struct st_rx_video_session_impl* s) {
for (int s_port = 0; s_port < num_port; s_port++) {
if (!s->rxq[s_port]) continue;

rv = mt_rxq_burst(s->rxq[s_port], &mbuf[0], ST_RX_VIDEO_BURST_SIZE);
rv = mt_rxq_burst(s->rxq[s_port], &mbuf[0], s->rx_burst_size);
uint64_t burst_time = mt_get_tsc(s->impl);
if (rv) {
s->stat_burst_succ_cnt++;
s->stat_burst_pkts_sum += rv;
if (rv > s->stat_burst_pkts_max) s->stat_burst_pkts_max = rv;

struct st_rx_video_tp* tp = s->tp;
if (s->enable_timing_parser && tp) {
/* compensation for the burst time jitter */
bool adjust = false;
uint64_t diff = burst_time - s->last_burst_time[s_port];
enum mtl_port port = mt_port_logic2phy(s->port_maps, s_port);

if (diff > (tp->pass.cinst_max_narrow * tp->trs)) adjust = true;

if (adjust) {
double pkt_ns = mt_mbuf_time_stamp(s->impl, mbuf[rv - 1], port);
pkt_ns -= s->tp->trs * (rv - 1); /* set to start */
for (uint16_t i = 0; i < rv; i++) {
st_rx_mbuf_set_ptp(mbuf[i], pkt_ns);
pkt_ns += s->tp->trs;
}
tp->stat_adjust++;
} else {
for (uint16_t i = 0; i < rv; i++) {
st_rx_mbuf_set_ptp(mbuf[i], mt_mbuf_time_stamp(s->impl, mbuf[i], port));
}
}
}

rv_handle_mbuf(&s->priv[s_port], &mbuf[0], rv);
rte_pktmbuf_free_bulk(&mbuf[0], rv);

done = false;
}

if (rv) done = false;
s->last_burst_time[s_port] = burst_time;
}

/* submit if any */
Expand Down Expand Up @@ -2967,10 +2995,21 @@ static int rv_attach(struct mtl_main_impl* impl, struct st_rx_video_sessions_mgr
s->st20_dst_port[i] = (ops->udp_port[i]) ? (ops->udp_port[i]) : (10000 + idx * 2);
}

/* init trs */
/* init estimated trs */
int estimated_total_pkts = s->st20_frame_size / ST_VIDEO_BPM_SIZE;
s->trs = s->frame_time / estimated_total_pkts;

if (s->trs < 2000) /* 2us per pkt */
s->rx_burst_size = 128 * 4;
else if (s->trs < 3000)
s->rx_burst_size = 128 * 2;
else
s->rx_burst_size = 128 * 1;
if (ops->rx_burst_size) {
s->rx_burst_size = ops->rx_burst_size;
info("%s(%d), user customized rx_burst_size %u\n", __func__, idx, s->rx_burst_size);
}

/* init simulated packet loss for test usage */
if (s->ops.flags & ST20_RX_FLAG_SIMULATE_PKT_LOSS) {
uint16_t burst_loss_max = 1;
Expand Down Expand Up @@ -3081,6 +3120,7 @@ static int rv_attach(struct mtl_main_impl* impl, struct st_rx_video_sessions_mgr
__func__, idx, ops->width, ops->height, st20_frame_fmt_name(ops->fmt),
ops->packing, ops->payload_type, ops->flags, s->frame_time / NS_PER_MS,
st_frame_rate(s->ops.fps));
info("%s(%d), rx_burst_size %u\n", __func__, idx, s->rx_burst_size);
return 0;
}

Expand Down Expand Up @@ -3366,6 +3406,20 @@ static void rv_stat(struct st_rx_video_sessions_mgr* mgr,
s->stat_st22_boxes);
s->stat_st22_boxes = 0;
}
if (s->stat_burst_succ_cnt) {
notice("RX_VIDEO_SESSION(%d,%d): succ burst max %u, avg %f\n", m_idx, idx,
s->stat_burst_pkts_max,
(float)s->stat_burst_pkts_sum / s->stat_burst_succ_cnt);
s->stat_burst_pkts_max = 0;
s->stat_burst_succ_cnt = 0;
s->stat_burst_pkts_sum = 0;
}

struct st_rx_video_tp* tp = s->tp;
if (tp && tp->stat_adjust) {
info("%s(%d), time adjust %u for timing parser\n", __func__, idx, tp->stat_adjust);
tp->stat_adjust = 0;
}
if (s->enable_timing_parser_stat) rv_tp_stat(s);

if (s->time_measure) {
Expand Down
2 changes: 0 additions & 2 deletions lib/src/st2110/st_rx_video_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

#include "st_main.h"

#define ST_RX_VIDEO_BURST_SIZE (128)

#define ST_RX_VIDEO_DMA_MIN_SIZE (1024)

#define ST_RV_TP_TSC_SYNC_MS (100) /* sync tsc with ptp period(ms) */
Expand Down
1 change: 1 addition & 0 deletions python/example/misc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def parse_args(is_tx):
# nb_tx_desc & nb_rx_desc
parser.add_argument("--nb_tx_desc", type=int, default=0, help="nb_tx_desc")
parser.add_argument("--nb_rx_desc", type=int, default=0, help="nb_rx_desc")
parser.add_argument("--rx_burst_size", type=int, default=0, help="rx_burst_size")
# p_tx_ip
parser.add_argument(
"--p_tx_ip",
Expand Down
1 change: 1 addition & 0 deletions python/example/rx_timing_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def main():
rx_para.framebuff_cnt = 3
rx_para.transport_fmt = mtl.ST20_FMT_YUV_422_10BIT
rx_para.output_fmt = args.pipeline_fmt
rx_para.rx_burst_size = args.rx_burst_size
# rx port
rx_port = mtl.st_rx_port()
mtl.st_rxp_para_port_set(
Expand Down

0 comments on commit 731917a

Please sign in to comment.