From fe1ae1aa7b9f15d77b87394cfd81c3daf7486815 Mon Sep 17 00:00:00 2001 From: Nick Peng Date: Mon, 31 Jul 2023 15:23:19 +0800 Subject: [PATCH 1/8] python: add default_batch_size option and meta support python object. --- .../python/modelbox_api/modelbox_api.cc | 41 +++++++++++++++++-- .../virtual/python/virtualdriver_python.cc | 5 +++ src/python/test/op/op_buffer/op_buffer.py | 8 +--- src/python/test/test_buffer.py | 10 ++--- 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/src/drivers/common/python/modelbox_api/modelbox_api.cc b/src/drivers/common/python/modelbox_api/modelbox_api.cc index b72d7aa0e..4cfe237bd 100644 --- a/src/drivers/common/python/modelbox_api/modelbox_api.cc +++ b/src/drivers/common/python/modelbox_api/modelbox_api.cc @@ -192,6 +192,12 @@ bool GetAttributes(void *value, std::size_t value_type, ret_data = *((py::object *)(value)); return true; } + + if (typeid(std::shared_ptr).hash_code() == value_type) { + ret_data = *(*((std::shared_ptr *)(value))); + return true; + } + return false; } @@ -233,6 +239,17 @@ void BufferSetAttributes(Buffer &buffer, const std::string &key, return; } + if (py::isinstance(obj)) { + auto *obj_ptr = new py::object(); + *obj_ptr = obj; + auto obj_shared = std::shared_ptr(obj_ptr, [](void *ptr) { + py::gil_scoped_acquire interpreter_guard{}; + delete static_cast(ptr); + }); + buffer.Set(key, obj_shared); + return; + } + throw std::invalid_argument("invalid data type " + py::str(obj).cast() + " for key " + key); @@ -738,7 +755,25 @@ void ModelboxPyApiSetUpBuffer(pybind11::module &m) { return buffer; }), py::keep_alive<1, 2>()) + .def(py::init([](const std::shared_ptr &device) { + auto buffer = std::make_shared(device); + return buffer; + }), + py::keep_alive<1, 2>()) .def(py::init()) + .def("build", [](std::shared_ptr &buffer, const std::string &str) { + StrToBuffer(buffer, str); + }) + .def("build", [](std::shared_ptr &buffer, const py::list &li) { + ListToBuffer(buffer, li); + }) + .def("build", [](std::shared_ptr &buffer, const py::buffer &buf) { + PyBufferToBuffer(buffer, buf); + }) + .def("as_bytes", [](Buffer &buffer) { + return py::bytes{(const char *)buffer.ConstData(), + buffer.GetBytes()}; + }) .def("as_object", [](Buffer &buffer) -> py::object { return BufferToPyObject(buffer); @@ -760,9 +795,8 @@ void ModelboxPyApiSetUpBuffer(pybind11::module &m) { buffer.CopyMeta(other_ptr); }) .def("set", - [](Buffer &buffer, const std::string &key, py::object &obj) { - BufferSetAttributes(buffer, key, obj); - }) + [](Buffer &buffer, const std::string &key, + py::object &obj) { BufferSetAttributes(buffer, key, obj); }) .def("get", BufferGetAttributes); ModelboxPyApiSetUpDataType(h); @@ -784,6 +818,7 @@ void ModelboxPyApiSetUpBufferList(pybind11::module &m) { }) .def("size", &modelbox::BufferList::Size) .def("get_bytes", &modelbox::BufferList::GetBytes) + .def("get_device", &modelbox::BufferList::GetDevice) .def( "push_back", [](BufferList &bl, Buffer &buffer) { diff --git a/src/drivers/virtual/python/virtualdriver_python.cc b/src/drivers/virtual/python/virtualdriver_python.cc index efe20016b..d6c708c8a 100644 --- a/src/drivers/virtual/python/virtualdriver_python.cc +++ b/src/drivers/virtual/python/virtualdriver_python.cc @@ -366,6 +366,11 @@ void VirtualPythonFlowUnitFactory::FillFlowUnitType( auto max_batch_size = config->GetInt32("base.max_batch_size", 1); flowunit_desc->SetMaxBatchSize(max_batch_size); } + + if (config->Contain("base.default_batch_size")) { + auto default_batch_size = config->GetInt32("base.default_batch_size", 1); + flowunit_desc->SetDefaultBatchSize(default_batch_size); + } } std::map> diff --git a/src/python/test/op/op_buffer/op_buffer.py b/src/python/test/op/op_buffer/op_buffer.py index eb2820938..d03a5faf2 100644 --- a/src/python/test/op/op_buffer/op_buffer.py +++ b/src/python/test/op/op_buffer/op_buffer.py @@ -68,13 +68,7 @@ def process(self, data_ctx): add_buffer.set("np_test", data_ctx.get_private("np_test")) - try: - add_buffer.set("map_test", {"test" : 1}) - except ValueError as err: - modelbox.info(str(err)) - else: - return modelbox.Status.StatusCode.STATUS_SHUTDOWN - + add_buffer.set("map_test", {"test" : 1}) out_bl.push_back(add_buffer) return modelbox.Status() diff --git a/src/python/test/test_buffer.py b/src/python/test/test_buffer.py index b64236a11..4502ac837 100644 --- a/src/python/test/test_buffer.py +++ b/src/python/test/test_buffer.py @@ -153,13 +153,9 @@ def test_flow_for_buffer(self): if not (np_set_test == np_get_test).all(): return modelbox.Status(modelbox.Status.StatusCode.STATUS_SHUTDOWN, "invalid np test") - try: - dict_test = buffer.get("map_test") - except ValueError as err: - modelbox.info(str(err)) - else: - flow.stop() - self.assertTrue(False) + dict_test = buffer.get("map_test") + if dict_test != {"test" : 1}: + return modelbox.Status(modelbox.Status.StatusCode.STATUS_SHUTDOWN, "invalid map test") flow.stop() From 5f8b8890dea11b3d1a501d92017fc59743367460 Mon Sep 17 00:00:00 2001 From: Nick Peng Date: Mon, 31 Jul 2023 19:28:36 +0800 Subject: [PATCH 2/8] flowunit: open in parallel --- src/libmodelbox/engine/flowunit_group.cc | 29 +++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/libmodelbox/engine/flowunit_group.cc b/src/libmodelbox/engine/flowunit_group.cc index f4aa7e7f5..5995bcc4f 100644 --- a/src/libmodelbox/engine/flowunit_group.cc +++ b/src/libmodelbox/engine/flowunit_group.cc @@ -357,10 +357,10 @@ Status FlowUnitGroup::Init(const std::set &input_ports_name, Status FlowUnitGroup::Open(const CreateExternalDataFunc &create_func) { auto status = STATUS_OK; - for (auto &flowunit : flowunit_group_) { + auto open_func = [&](const std::shared_ptr &flowunit) -> modelbox::Status { if (!flowunit) { MBLOG_WARN << "flow unit is nullptr."; - continue; + return STATUS_INVALID; } auto flowunit_desc = flowunit->GetFlowUnitDesc(); @@ -380,11 +380,34 @@ Status FlowUnitGroup::Open(const CreateExternalDataFunc &create_func) { "', type '" + flowunit_desc->GetDriverDesc()->GetType() + "' failed."}; - break; + return status; } MBLOG_DEBUG << flowunit_desc->GetFlowUnitName() << ":" << flowunit_desc->GetFlowUnitAliasName() << " opened."; + + return STATUS_OK; + }; + + ThreadPool pool(std::thread::hardware_concurrency()); + pool.SetName(unit_name_ + "-Open"); + std::vector> result; + + for (auto &flowunit : flowunit_group_) { + auto ret = pool.Submit(open_func, flowunit); + result.push_back(std::move(ret)); + } + + for (auto &fut : result) { + const auto *msg = "open flowunit failed, please check log."; + if (!fut.valid()) { + return {STATUS_FAULT, msg}; + } + + auto ret = fut.get(); + if (!ret) { + return ret; + } } bool need_check_output = false; From e3e313ab4795f0f753da44c67364484b371b56c0 Mon Sep 17 00:00:00 2001 From: Nick Peng Date: Mon, 7 Aug 2023 10:43:13 +0800 Subject: [PATCH 3/8] python: support run specific test case --- src/python/CMakeLists.txt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/python/CMakeLists.txt b/src/python/CMakeLists.txt index 792c41ce9..41b61cc43 100644 --- a/src/python/CMakeLists.txt +++ b/src/python/CMakeLists.txt @@ -123,6 +123,21 @@ if(NOT DISABLE_MODELBOX_TEST) COMMENT "Run python Test..." ) + # run single test case + # usage: + # cmake -DPYTHON_TEST_CASE=test_log.py .. + # make unittest-python-case + if (PYTHON_TEST_CASE) + message(STATUS "run test case for python: ${PYTHON_TEST_CASE}") + add_custom_target(unittest-python-case + COMMAND PYTHONPATH=${MODELBOX_PYTHON_BINARY_DIR} ${PYTHON_EXECUTABLE} ${MODELBOX_PYTHON_BINARY_DIR}/test/${PYTHON_TEST_CASE}; + DEPENDS modelbox-python + WORKING_DIRECTORY ${TEST_WORKING_DIR} + COMMENT "Run python Test..." + ) + add_dependencies(unittest-python-case all-drivers) + endif() + add_dependencies(unittest-python all-drivers) list(APPEND MODELBOX_UNIT_TEST_TARGETS modelbox-python) From e2a746443a6fe9d4bf5c863e26eb7bb4cbe49aeb Mon Sep 17 00:00:00 2001 From: Nick Peng Date: Wed, 16 Aug 2023 10:57:51 +0800 Subject: [PATCH 4/8] server: make cpp-httplib use poll --- thirdparty/CMake/cpp_httplib_cmakelist.in | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/thirdparty/CMake/cpp_httplib_cmakelist.in b/thirdparty/CMake/cpp_httplib_cmakelist.in index 472d810d8..6a4ea67b7 100644 --- a/thirdparty/CMake/cpp_httplib_cmakelist.in +++ b/thirdparty/CMake/cpp_httplib_cmakelist.in @@ -22,8 +22,8 @@ file(GLOB CPP_HTTPLIB_SOURCE *.c *.cpp *.cc) add_library(cpp-httplib SHARED EXCLUDE_FROM_ALL ${CPP_HTTPLIB_SOURCE}) add_library(cpp-httplib-static STATIC EXCLUDE_FROM_ALL ${CPP_HTTPLIB_SOURCE}) set_property(TARGET cpp-httplib-static PROPERTY POSITION_INDEPENDENT_CODE ON) -set_target_properties(cpp-httplib PROPERTIES COMPILE_FLAGS "-DCPPHTTPLIB_OPENSSL_SUPPORT -DCPPHTTPLIB_HEADER_MAX_LENGTH=65535") -set_target_properties(cpp-httplib-static PROPERTIES COMPILE_FLAGS "-DCPPHTTPLIB_OPENSSL_SUPPORT -DCPPHTTPLIB_HEADER_MAX_LENGTH=65535") +set_target_properties(cpp-httplib PROPERTIES COMPILE_FLAGS "-DCPPHTTPLIB_USE_POLL -DCPPHTTPLIB_OPENSSL_SUPPORT -DCPPHTTPLIB_HEADER_MAX_LENGTH=65535") +set_target_properties(cpp-httplib-static PROPERTIES COMPILE_FLAGS "-DCPPHTTPLIB_USE_POLL -DCPPHTTPLIB_OPENSSL_SUPPORT -DCPPHTTPLIB_HEADER_MAX_LENGTH=65535") set(CPP_HTTPLIB_INCLUDE ${CMAKE_CURRENT_LIST_DIR} CACHE INTERNAL "") set(CPP_HTTPLIB_LIBRARIES cpp-httplib CACHE INTERNAL "") From b2c4afd898fe67d1740d0b40a84847ee3e182683 Mon Sep 17 00:00:00 2001 From: Jack <1214639475@qq.com> Date: Mon, 11 Sep 2023 19:56:59 +0800 Subject: [PATCH 5/8] max_executor_thread_num && flow-stream-feature --- .../base/include/modelbox/base/executor.h | 2 ++ .../engine/flowunit_data_executor.cc | 4 +++ src/libmodelbox/engine/flowunit_group.cc | 4 ++- src/libmodelbox/engine/flowunit_manager.cc | 14 ++++++++ src/libmodelbox/engine/node.cc | 36 ++++++++++++++----- src/libmodelbox/include/modelbox/flowunit.h | 2 ++ .../include/modelbox/flowunit_group.h | 2 ++ 7 files changed, 55 insertions(+), 9 deletions(-) diff --git a/src/libmodelbox/base/include/modelbox/base/executor.h b/src/libmodelbox/base/include/modelbox/base/executor.h index 84f27253f..b0eaaddea 100644 --- a/src/libmodelbox/base/include/modelbox/base/executor.h +++ b/src/libmodelbox/base/include/modelbox/base/executor.h @@ -33,6 +33,8 @@ class Executor { virtual ~Executor(); + void SetThreadCount(int thread_count); + template auto Run(func &&fun, int32_t priority, ts &&...params) -> std::future::type> { diff --git a/src/libmodelbox/engine/flowunit_data_executor.cc b/src/libmodelbox/engine/flowunit_data_executor.cc index e6633d3e4..be0fd6a4e 100644 --- a/src/libmodelbox/engine/flowunit_data_executor.cc +++ b/src/libmodelbox/engine/flowunit_data_executor.cc @@ -34,6 +34,10 @@ Executor::Executor(int thread_count) { Executor::~Executor() { thread_pool_ = nullptr; } +void Executor::SetThreadCount(int thread_count) { + thread_pool_->SetThreadSize(thread_count); +} + FlowUnitExecContext::FlowUnitExecContext( std::shared_ptr data_ctx) : data_ctx_(std::move(data_ctx)) {} diff --git a/src/libmodelbox/engine/flowunit_group.cc b/src/libmodelbox/engine/flowunit_group.cc index 5995bcc4f..0c300fad7 100644 --- a/src/libmodelbox/engine/flowunit_group.cc +++ b/src/libmodelbox/engine/flowunit_group.cc @@ -46,6 +46,8 @@ void FlowUnitGroup::InitTrace() { } } +uint32_t FlowUnitGroup::GetBatchSize() const { return batch_size_; } + std::shared_ptr FlowUnitGroup::StartTrace( FUExecContextList &exec_ctx_list) { std::call_once(trace_init_flag_, &FlowUnitGroup::InitTrace, this); @@ -388,7 +390,7 @@ Status FlowUnitGroup::Open(const CreateExternalDataFunc &create_func) { return STATUS_OK; }; - + ThreadPool pool(std::thread::hardware_concurrency()); pool.SetName(unit_name_ + "-Open"); std::vector> result; diff --git a/src/libmodelbox/engine/flowunit_manager.cc b/src/libmodelbox/engine/flowunit_manager.cc index a72786ef9..43016fd28 100644 --- a/src/libmodelbox/engine/flowunit_manager.cc +++ b/src/libmodelbox/engine/flowunit_manager.cc @@ -59,6 +59,14 @@ Status FlowUnitManager::Initialize( SetDeviceManager(std::move(device_mgr)); Status status; status = InitFlowUnitFactory(driver); + + if (config != nullptr) { + max_executor_thread_num_ = + config->GetUint32("graph.max_executor_thread_num", 0); + } else { + max_executor_thread_num_ = 0; + } + if (status != STATUS_SUCCESS) { return status; } @@ -407,6 +415,12 @@ std::shared_ptr FlowUnitManager::CreateSingleFlowUnit( return nullptr; } + if (max_executor_thread_num_ > 0) { + MBLOG_INFO << "find the parameter max_executor_thread_num in the config: " + << max_executor_thread_num_; + device->GetDeviceExecutor()->SetThreadCount(max_executor_thread_num_); + } + flowunit->SetBindDevice(device); std::vector &in_list = flowunit_desc->GetFlowUnitInput(); for (auto &in_item : in_list) { diff --git a/src/libmodelbox/engine/node.cc b/src/libmodelbox/engine/node.cc index 9d7b52d56..be0e024f5 100644 --- a/src/libmodelbox/engine/node.cc +++ b/src/libmodelbox/engine/node.cc @@ -763,23 +763,43 @@ void Node::CleanDataContext() { Status Node::Run(RunType type) { std::list> data_ctx_list; + size_t process_count = 0; auto ret = Recv(type, data_ctx_list); - if (!ret) { - return ret; - } - ret = Process(data_ctx_list); if (!ret) { return ret; } - if (!GetOutputNames().empty()) { - ret = Send(data_ctx_list); + std::list> process_ctx_list; + + auto output_names_is_empty = GetOutputNames().empty(); + + for (auto& ctx : data_ctx_list) { + // process data according to batch size + process_count++; + process_ctx_list.push_back(ctx); + + if (process_ctx_list.size() < flowunit_group_->GetBatchSize()) { + if (process_count < data_ctx_list.size()) { + continue; + } + } + + ret = Process(process_ctx_list); if (!ret) { return ret; } - } else { - SetLastError(data_ctx_list); + + if (!output_names_is_empty) { + ret = Send(process_ctx_list); + if (!ret) { + return ret; + } + } else { + SetLastError(process_ctx_list); + } + + process_ctx_list.clear(); } Clean(data_ctx_list); diff --git a/src/libmodelbox/include/modelbox/flowunit.h b/src/libmodelbox/include/modelbox/flowunit.h index bb8a8726f..123643e33 100644 --- a/src/libmodelbox/include/modelbox/flowunit.h +++ b/src/libmodelbox/include/modelbox/flowunit.h @@ -612,6 +612,8 @@ class FlowUnitManager { std::shared_ptr GetDeviceManager(); + int max_executor_thread_num_; + private: Status CheckParams(const std::string &unit_name, const std::string &unit_type, const std::string &unit_device_id); diff --git a/src/libmodelbox/include/modelbox/flowunit_group.h b/src/libmodelbox/include/modelbox/flowunit_group.h index 341dfbc29..1bf1ba2c2 100644 --- a/src/libmodelbox/include/modelbox/flowunit_group.h +++ b/src/libmodelbox/include/modelbox/flowunit_group.h @@ -64,6 +64,8 @@ class FlowUnitGroup { Status Close(); + uint32_t GetBatchSize() const; + private: std::weak_ptr node_; uint32_t batch_size_; From ee5159b868f1304fe0a64856a45011c5104358b1 Mon Sep 17 00:00:00 2001 From: JuntongMa Date: Fri, 22 Sep 2023 11:34:18 +0800 Subject: [PATCH 6/8] Fixed the problem when meta transfers an empty array, out-of-bounds array --- src/drivers/common/python/modelbox_api/modelbox_api.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/drivers/common/python/modelbox_api/modelbox_api.cc b/src/drivers/common/python/modelbox_api/modelbox_api.cc index 4cfe237bd..aea114b97 100644 --- a/src/drivers/common/python/modelbox_api/modelbox_api.cc +++ b/src/drivers/common/python/modelbox_api/modelbox_api.cc @@ -152,8 +152,14 @@ bool SetAttributes(DataType &context, const std::string &key, if (py::isinstance(obj)) { py::list obj_list_all = obj.cast(); + if (obj_list_all.empty()) { + return setup_data(List1DObjectFunc, obj_list_all, obj_list_all); + } if (py::isinstance(obj_list_all[0])) { py::list obj_list_1d = obj_list_all[0].cast(); + if (obj_list_1d.empty()) { + return setup_data(List2DObjectFunc, obj_list_1d, obj_list_all); + } if (setup_data(List2DObjectFunc, obj_list_1d[0], obj_list_all)) { return true; } From 5cf7b925acdce7d51d246e3cb3aaaafd2888db8b Mon Sep 17 00:00:00 2001 From: Nick Peng Date: Tue, 31 Oct 2023 17:01:50 +0800 Subject: [PATCH 7/8] cmake: fix thirdparty link issue --- .github/workflows/unit-test-daily-on-device.yml | 2 +- .github/workflows/unit-test-pull-requests-on-device.yml | 2 +- thirdparty/CMake/pre-download.in | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/unit-test-daily-on-device.yml b/.github/workflows/unit-test-daily-on-device.yml index 9c027f829..c1a51b312 100644 --- a/.github/workflows/unit-test-daily-on-device.yml +++ b/.github/workflows/unit-test-daily-on-device.yml @@ -27,7 +27,7 @@ jobs: run: | mkdir build cd build - cmake .. -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DUSE_CN_MIRROR=yes -DCLANG_TIDY=on -DCLANG_TIDY_AS_ERROR=on + cmake .. -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DCLANG_TIDY=on -DCLANG_TIDY_AS_ERROR=on - name: Build working-directory: build diff --git a/.github/workflows/unit-test-pull-requests-on-device.yml b/.github/workflows/unit-test-pull-requests-on-device.yml index 1531cbdfe..396dee9f6 100644 --- a/.github/workflows/unit-test-pull-requests-on-device.yml +++ b/.github/workflows/unit-test-pull-requests-on-device.yml @@ -53,7 +53,7 @@ jobs: run: | mkdir build cd build - cmake .. -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DUSE_CN_MIRROR=yes -DWITH_WEBUI=${{github.event.inputs.WithWebUI}} -DCLANG_TIDY=${{github.event.inputs.WithClangTidy}} -DCLANG_TIDY_AS_ERROR=on + cmake .. -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DWITH_WEBUI=${{github.event.inputs.WithWebUI}} -DCLANG_TIDY=${{github.event.inputs.WithClangTidy}} -DCLANG_TIDY_AS_ERROR=on - name: Build working-directory: build diff --git a/thirdparty/CMake/pre-download.in b/thirdparty/CMake/pre-download.in index 5f374f8cf..dd13f8c63 100644 --- a/thirdparty/CMake/pre-download.in +++ b/thirdparty/CMake/pre-download.in @@ -39,13 +39,13 @@ if (NOT @USE_CN_MIRROR@) set(MODELBOX_WEBUI_DOWNLOAD_URL "https://github.com/modelbox-ai/modelbox-webui/archive/refs/tags/0.1.3.zip") set(EMOTION_DEMO_FILES_DOWNLOAD_URL "https://github.com/modelbox-ai/modelbox-binary/releases/download/BinaryArchive/emotion_demo_files.zip") else() - set(GOOGLETEST_DOWNLOAD_URL "https://gitcode.net/mirrors/google/googletest/-/archive/release-1.11.0/googletest-release-1.11.0.zip") + set(GOOGLETEST_DOWNLOAD_URL "https://ghproxy.com/github.com/google/googletest/archive/refs/tags/release-1.11.0.zip") set(HUAWEI_SECURE_C_DOWNLOAD_URL "https://gitee.com/openeuler/libboundscheck/repository/archive/master.zip") set(TINYLOG_DOWNLOAD_URL "https://ghproxy.com/github.com/pymumu/tinylog/archive/refs/tags/v1.8.zip") - set(PYBIND11_DOWNLOAD_URL "https://gitcode.net/mirrors/pybind/pybind11/-/archive/v2.10.4/pybind11-v2.10.4.zip") + set(PYBIND11_DOWNLOAD_URL "https://ghproxy.com/github.com/pybind/pybind11/archive/refs/tags/v2.10.4.zip") set(TOML11_DOWNLOAD_URL "https://ghproxy.com/github.com/ToruNiina/toml11/archive/refs/tags/v3.7.1.zip") - set(NLOHMANN_DOWNLOAD_URL "https://gitcode.net/mirrors/nlohmann/json/-/archive/v3.11.2/json-v3.11.2.zip") - set(CPP_HTTPLIB_DOWNLOAD_URL "https://gitcode.net/mirrors/yhirose/cpp-httplib/-/archive/v0.12.6/cpp-httplib-v0.12.6.zip") + set(NLOHMANN_DOWNLOAD_URL "https://ghproxy.com/github.com/nlohmann/json/releases/download/v3.11.2/include.zip") + set(CPP_HTTPLIB_DOWNLOAD_URL "https://ghproxy.com/github.com/yhirose/cpp-httplib/archive/refs/tags/v0.12.6.zip") set(APIGW_CPP_SDK_DOWNLOAD_URL "https://obs.cn-north-1.myhuaweicloud.com/apig-sdk/APIGW-cpp-sdk.zip") set(MODELBOX_WEBUI_DOWNLOAD_URL "https://gitee.com/modelbox/modelbox-webui/repository/archive/tags/0.1.3.zip") set(EMOTION_DEMO_FILES_DOWNLOAD_URL "https://gitee.com/modelbox/modelbox-binary/attach_files/1010735/download/emotion_demo_files.zip") From faa1e931a464fb3c30e6a9988cc4a3acd6aaa26d Mon Sep 17 00:00:00 2001 From: Nick Peng Date: Fri, 8 Mar 2024 11:21:06 +0800 Subject: [PATCH 8/8] video_decoder: support reconnect with video format change. --- .../video_decoder/ascend_video_decode.cc | 19 +- .../video_decoder/video_decoder_flowunit.cc | 200 +++++++++++++++--- .../video_decoder/video_decoder_flowunit.h | 15 +- .../video_decoder/video_decoder_flowunit.cc | 114 ++++++++-- .../video_decoder/video_decoder_flowunit.h | 16 +- .../flowunit/video_demuxer/ffmpeg_reader.cc | 4 + .../video_demuxer/ffmpeg_video_demuxer.cc | 5 + .../video_demux_flowunit_retry_test.cc | 13 +- .../video_demuxer/video_demuxer_flowunit.cc | 21 ++ .../video_demuxer/video_demuxer_flowunit.h | 1 + .../video_encoder/ffmpeg_video_muxer.cc | 4 + .../video_encoder/video_encoder_flowunit.cc | 154 +++++++++++--- .../video_encoder/video_encoder_flowunit.h | 8 + .../video_decoder/nvcodec_video_decoder.cc | 2 +- .../video_decoder/nvcodec_video_decoder.h | 2 +- .../video_decoder/video_decoder_flowunit.cc | 122 +++++++++-- .../video_decoder/video_decoder_flowunit.h | 13 +- .../common/video_out/ffmpeg_video_muxer.cc | 5 + 18 files changed, 607 insertions(+), 111 deletions(-) diff --git a/src/drivers/devices/ascend/flowunit/video_decoder/ascend_video_decode.cc b/src/drivers/devices/ascend/flowunit/video_decoder/ascend_video_decode.cc index 519f85f31..f2e906704 100644 --- a/src/drivers/devices/ascend/flowunit/video_decoder/ascend_video_decode.cc +++ b/src/drivers/devices/ascend/flowunit/video_decoder/ascend_video_decode.cc @@ -205,6 +205,11 @@ void AscendVideoDecoder::Callback(acldvppStreamDesc *input, auto *ctx = (DvppVideoDecodeContext *)userData; auto queue = ctx->GetCacheQueue(); + if (queue == nullptr) { + acldvppFree(vdecOutBufferDev); + MBLOG_ERROR << "get cache queue failed."; + return; + } auto res = queue->Push(dvpp_frame); if (!res) { acldvppFree(vdecOutBufferDev); @@ -214,6 +219,7 @@ void AscendVideoDecoder::Callback(acldvppStreamDesc *input, modelbox::Status AscendVideoDecoder::Init( const std::shared_ptr &data_ctx) { + vdecChannelDesc_ = nullptr; aclError ret = aclrtSetDevice(device_id_); if (ret != ACL_ERROR_NONE) { auto errMsg = "acl set device " + std::to_string(device_id_) + @@ -304,22 +310,17 @@ modelbox::Status AscendVideoDecoder::Init( auto device_id = device_id_; vdecChannelDesc_.reset( - vdecChannelDescPtr, [device_id](aclvdecChannelDesc *p) { - auto ret = aclrtSetDevice(device_id); - if (ret != ACL_ERROR_NONE) { - MBLOG_ERROR << "Set device to " << device_id - << " failed, err: " << ret; - } - - ret = aclvdecDestroyChannel(p); + vdecChannelDescPtr, [this, device_id](aclvdecChannelDesc *p) { + auto ret = aclvdecDestroyChannel(p); if (ret != ACL_ERROR_NONE) { MBLOG_ERROR << "fail to destroy vdec channel, err: " << ret; } - ret = aclvdecDestroyChannelDesc(p); if (ret != ACL_ERROR_NONE) { MBLOG_ERROR << "fail to destroy vdec channel desc, err: " << ret; } + + this->thread_handler_ = nullptr; }); setup_result = true; diff --git a/src/drivers/devices/ascend/flowunit/video_decoder/video_decoder_flowunit.cc b/src/drivers/devices/ascend/flowunit/video_decoder/video_decoder_flowunit.cc index 4a56b794c..c7ad683af 100644 --- a/src/drivers/devices/ascend/flowunit/video_decoder/video_decoder_flowunit.cc +++ b/src/drivers/devices/ascend/flowunit/video_decoder/video_decoder_flowunit.cc @@ -147,8 +147,90 @@ modelbox::Status VideoDecodeFlowUnit::Open( return modelbox::STATUS_OK; } -modelbox::Status VideoDecodeFlowUnit::DataPre( - std::shared_ptr data_ctx) { +modelbox::Status VideoDecodeFlowUnit::ReopenDecoder( + std::shared_ptr &data_ctx, + const std::shared_ptr &flag_buffer) { + auto old_source_url = + std::static_pointer_cast(data_ctx->GetPrivate(SOURCE_URL_META)); + auto old_codec_id = + std::static_pointer_cast(data_ctx->GetPrivate(CODEC_ID_META)); + + if (old_source_url == nullptr || old_codec_id == nullptr) { + MBLOG_ERROR << "Reopen decoder failed, source url or codec id is null"; + return modelbox::STATUS_FAULT; + } + + std::string source_url; + AVCodecID codec_id; + if (flag_buffer->Get(SOURCE_URL_META, source_url) == false) { + return modelbox::STATUS_SUCCESS; + } + + if (flag_buffer->Get(CODEC_ID_META, codec_id) == false) { + return modelbox::STATUS_SUCCESS; + } + + if (source_url == *old_source_url && codec_id == *old_codec_id) { + return modelbox::STATUS_SUCCESS; + } + + MBLOG_WARN << "Reopen decoder, source url or codec id changed"; + auto ret = CloseDecoder(data_ctx); + if (ret != modelbox::STATUS_SUCCESS) { + MBLOG_ERROR << "Close decoder failed"; + return modelbox::STATUS_FAULT; + } + + int32_t rate_num; + int32_t rate_den; + int32_t encode_type; + + auto res = flag_buffer->Get("rate_num", rate_num); + if (!res) { + return {modelbox::STATUS_FAULT, "get rate_num failed."}; + } + + res = flag_buffer->Get("rate_den", rate_den); + if (!res) { + return {modelbox::STATUS_FAULT, "get rate_den failed."}; + } + + auto in_meta = data_ctx->GetInputMeta(VIDEO_PACKET_INPUT); + auto profile_id = + std::static_pointer_cast(in_meta->GetMeta(PROFILE_META)); + if (profile_id == nullptr) { + return {modelbox::STATUS_FAULT, "get profile id failed."}; + } + + encode_type = GetDvppEncodeType(codec_id, *profile_id); + if (encode_type == -1) { + return {modelbox::STATUS_FAULT, "get dvpp encode type failed."}; + } + + return NewDecoder(data_ctx, source_url, codec_id, rate_num, rate_den, + encode_type); +} + +modelbox::Status VideoDecodeFlowUnit::CloseDecoder( + std::shared_ptr &data_ctx) { + auto instance_id = + std::static_pointer_cast(data_ctx->GetPrivate(INSTANCE_ID)); + if (instance_id != nullptr) { + RestoreInstanceId(*instance_id); + } + data_ctx->SetPrivate(DVPP_DECODER, nullptr); + data_ctx->SetPrivate(DVPP_DECODER_CTX, nullptr); + data_ctx->SetPrivate(FRAME_INDEX_CTX, nullptr); + data_ctx->SetPrivate(INSTANCE_ID, nullptr); + data_ctx->SetPrivate(SOURCE_URL_META, nullptr); + data_ctx->SetPrivate(CODEC_ID_META, nullptr); + return modelbox::STATUS_SUCCESS; +} + +modelbox::Status VideoDecodeFlowUnit::NewDecoder( + std::shared_ptr &data_ctx, + const std::string &source_url, AVCodecID codec_id, int32_t rate_num, + int32_t rate_den, int32_t encode_type) { int32_t instance_id = 0; instance_id = FindTheMinimumAvailableId(); modelbox::Status ret = modelbox::STATUS_SUCCESS; @@ -162,17 +244,6 @@ modelbox::Status VideoDecodeFlowUnit::DataPre( DeferCondAdd { RestoreInstanceId(instance_id); }; - int32_t rate_num; - int32_t rate_den; - int32_t encode_type; - auto res = GetDecoderParam(data_ctx, rate_num, rate_den, encode_type); - if (!res) { - auto errMsg = "get decoder param failed, detail: " + res.ToString(); - MBLOG_ERROR << errMsg; - ret = {modelbox::STATUS_FAULT, errMsg}; - return ret; - } - auto video_decoder = std::make_shared( instance_id, dev_id_, rate_num, rate_den, format_, encode_type); ret = video_decoder->Init(data_ctx); @@ -192,24 +263,67 @@ modelbox::Status VideoDecodeFlowUnit::DataPre( data_ctx->SetPrivate(DVPP_DECODER, video_decoder); data_ctx->SetPrivate(FRAME_INDEX_CTX, frame_index); data_ctx->SetPrivate(INSTANCE_ID, instance_id_ptr); - MBLOG_INFO << "acl video decode data pre success."; + data_ctx->SetPrivate(SOURCE_URL_META, std::make_shared(source_url)); + data_ctx->SetPrivate(CODEC_ID_META, std::make_shared(codec_id)); + MBLOG_INFO << "open video decode data success."; return ret; +} + +modelbox::Status VideoDecodeFlowUnit::DataPre( + std::shared_ptr data_ctx) { + auto input_packet = data_ctx->Input(VIDEO_PACKET_INPUT); + if (input_packet == nullptr) { + return {modelbox::STATUS_FAULT, "get input failed."}; + } + + int32_t rate_num; + int32_t rate_den; + int32_t encode_type; + + auto buffer = input_packet->At(0); + auto res = buffer->Get("rate_num", rate_num); + if (!res) { + return {modelbox::STATUS_FAULT, "get rate_num failed."}; + } + + res = buffer->Get("rate_den", rate_den); + if (!res) { + return {modelbox::STATUS_FAULT, "get rate_den failed."}; + } + + auto in_meta = data_ctx->GetInputMeta(VIDEO_PACKET_INPUT); + auto codec_id = + std::static_pointer_cast(in_meta->GetMeta(CODEC_META)); + if (codec_id == nullptr) { + return {modelbox::STATUS_FAULT, "get codec id failed."}; + } + + auto source_url = + std::static_pointer_cast(in_meta->GetMeta(SOURCE_URL_META)); + if (source_url == nullptr) { + MBLOG_ERROR << "Stream source url is null, init decoder failed"; + return modelbox::STATUS_FAULT; + } + + auto profile_id = + std::static_pointer_cast(in_meta->GetMeta(PROFILE_META)); + if (profile_id == nullptr) { + return {modelbox::STATUS_FAULT, "get profile id failed."}; + } + + encode_type = GetDvppEncodeType(*codec_id, *profile_id); + if (encode_type == -1) { + return {modelbox::STATUS_FAULT, "get dvpp encode type failed."}; + } + + return NewDecoder(data_ctx, *source_url, *codec_id, rate_num, rate_den, + encode_type); }; modelbox::Status VideoDecodeFlowUnit::DataPost( std::shared_ptr data_ctx) { - MBLOG_DEBUG << "videodecoder data post."; - // Destroy decoder first - data_ctx->SetPrivate(DVPP_DECODER, nullptr); - // Ctx must destroy after decoder destroy - data_ctx->SetPrivate(DVPP_DECODER_CTX, nullptr); - // Restore id - auto instance_id = - std::static_pointer_cast(data_ctx->GetPrivate(INSTANCE_ID)); - RestoreInstanceId(*instance_id); - - return modelbox::STATUS_SUCCESS; + return CloseDecoder(data_ctx); } modelbox::Status VideoDecodeFlowUnit::Close() { @@ -219,7 +333,9 @@ modelbox::Status VideoDecodeFlowUnit::Close() { modelbox::Status VideoDecodeFlowUnit::ReadData( const std::shared_ptr &data_ctx, - std::vector> &dvpp_packet_list) { + std::vector> &dvpp_packet_list, + std::shared_ptr &flag_buffer) { + auto reset_flag = false; auto video_packet_input = data_ctx->Input(VIDEO_PACKET_INPUT); if (video_packet_input == nullptr) { MBLOG_ERROR << "video packet input is null"; @@ -233,6 +349,14 @@ modelbox::Status VideoDecodeFlowUnit::ReadData( for (size_t i = 0; i < video_packet_input->Size(); ++i) { auto packet_buffer = video_packet_input->At(i); + + if (reset_flag == false) { + packet_buffer->Get("reset_flag", reset_flag); + if (reset_flag == true) { + flag_buffer = packet_buffer; + } + } + std::shared_ptr dvpp_packet; auto ret = ReadDvppStreamDesc(packet_buffer, dvpp_packet); if (ret != modelbox::STATUS_SUCCESS) { @@ -462,6 +586,8 @@ modelbox::Status VideoDecodeFlowUnit::WriteData( modelbox::Status VideoDecodeFlowUnit::Process( std::shared_ptr data_ctx) { + std::shared_ptr flag_buffer = nullptr; + auto acl_ret = aclrtSetDevice(dev_id_); if (acl_ret != ACL_SUCCESS) { MBLOG_ERROR << "set acl device to " << dev_id_ << " failed, err " @@ -473,7 +599,7 @@ modelbox::Status VideoDecodeFlowUnit::Process( data_ctx->GetPrivate(DVPP_DECODER_CTX)); auto video_decoder = std::static_pointer_cast( data_ctx->GetPrivate(DVPP_DECODER)); - if (video_decoder == nullptr) { + if (video_decoder == nullptr || video_decoder_ctx == nullptr) { MBLOG_ERROR << "Video decoder is not init"; return modelbox::STATUS_FAULT; } @@ -489,12 +615,30 @@ modelbox::Status VideoDecodeFlowUnit::Process( } std::vector> dvpp_packet_list; - ret = ReadData(data_ctx, dvpp_packet_list); + ret = ReadData(data_ctx, dvpp_packet_list, flag_buffer); if (ret != modelbox::STATUS_SUCCESS) { MBLOG_ERROR << "Read av_packet input failed, err code " + ret.ToString(); return modelbox::STATUS_FAULT; } + if (flag_buffer) { + video_decoder_ctx = nullptr; + video_decoder = nullptr; + if (ReopenDecoder(data_ctx, flag_buffer) != modelbox::STATUS_SUCCESS) { + MBLOG_ERROR << "Reopen decoder failed"; + return modelbox::STATUS_FAULT; + } + + video_decoder_ctx = std::static_pointer_cast( + data_ctx->GetPrivate(DVPP_DECODER_CTX)); + video_decoder = std::static_pointer_cast( + data_ctx->GetPrivate(DVPP_DECODER)); + if (video_decoder == nullptr || video_decoder_ctx == nullptr) { + MBLOG_ERROR << "Video decoder is not init"; + return modelbox::STATUS_FAULT; + } + } + size_t err_num = 0; modelbox::Status decode_ret = modelbox::STATUS_SUCCESS; for (auto &dvpp_pkt : dvpp_packet_list) { diff --git a/src/drivers/devices/ascend/flowunit/video_decoder/video_decoder_flowunit.h b/src/drivers/devices/ascend/flowunit/video_decoder/video_decoder_flowunit.h index 3d90df385..6894f3326 100644 --- a/src/drivers/devices/ascend/flowunit/video_decoder/video_decoder_flowunit.h +++ b/src/drivers/devices/ascend/flowunit/video_decoder/video_decoder_flowunit.h @@ -73,6 +73,8 @@ constexpr const char *DVPP_DECODER = "dvpp_decode"; constexpr const char *VIDEO_PACKET_INPUT = "in_video_packet"; constexpr const char *FRAME_INFO_OUTPUT = "out_video_frame"; constexpr const char *CODEC_META = "codec_meta"; +constexpr const char *SOURCE_URL_META = "source_url"; +constexpr const char *CODEC_ID_META = "codec_id"; constexpr const char *PROFILE_META = "profile_meta"; constexpr const char *DVPP_DECODER_CTX = "dvpp_decode_context"; constexpr const char *DVPP_DECODE_FLOWUNIT_DESC = @@ -121,7 +123,8 @@ class VideoDecodeFlowUnit : public modelbox::FlowUnit { int32_t &rate_den, int32_t &encode_type); modelbox::Status ReadData( const std::shared_ptr &data_ctx, - std::vector> &dvpp_packet_list); + std::vector> &dvpp_packet_list, + std::shared_ptr &flag_buffer); modelbox::Status ReadDvppStreamDesc( const std::shared_ptr &packet_buffer, std::shared_ptr &dvpp_packet); @@ -134,6 +137,16 @@ class VideoDecodeFlowUnit : public modelbox::FlowUnit { int32_t FindTheMinimumAvailableId(); void RestoreInstanceId(int32_t instance_id); + modelbox::Status CloseDecoder( + std::shared_ptr &data_ctx); + modelbox::Status NewDecoder(std::shared_ptr &data_ctx, + const std::string &source_url, AVCodecID codec_id, + int32_t rate_num, int32_t rate_den, + int32_t encode_type); + modelbox::Status ReopenDecoder( + std::shared_ptr &data_ctx, + const std::shared_ptr &flag_buffer); + uint32_t dest_width_{224}; uint32_t dest_height_{224}; // 1: YUV420 semi-planner(nv12), 2: YVU420 semi-planner(nv21) diff --git a/src/drivers/devices/cpu/flowunit/video_decoder/video_decoder_flowunit.cc b/src/drivers/devices/cpu/flowunit/video_decoder/video_decoder_flowunit.cc index 8ebe9930c..ac4e9fcd0 100644 --- a/src/drivers/devices/cpu/flowunit/video_decoder/video_decoder_flowunit.cc +++ b/src/drivers/devices/cpu/flowunit/video_decoder/video_decoder_flowunit.cc @@ -51,6 +51,7 @@ modelbox::Status VideoDecoderFlowUnit::Close() { return modelbox::STATUS_OK; } modelbox::Status VideoDecoderFlowUnit::Process( std::shared_ptr data_ctx) { + std::shared_ptr flag_buffer = nullptr; auto video_decoder = std::static_pointer_cast( data_ctx->GetPrivate(DECODER_CTX)); if (video_decoder == nullptr) { @@ -59,12 +60,26 @@ modelbox::Status VideoDecoderFlowUnit::Process( } std::vector> pkt_list; - auto ret = ReadData(data_ctx, pkt_list); + auto ret = ReadData(data_ctx, pkt_list, flag_buffer); if (ret != modelbox::STATUS_SUCCESS) { MBLOG_ERROR << "Read av_packet input failed"; return modelbox::STATUS_FAULT; } + if (flag_buffer) { + if (ReopenDecoder(data_ctx, flag_buffer) != modelbox::STATUS_SUCCESS) { + MBLOG_ERROR << "Reopen decoder failed"; + return modelbox::STATUS_FAULT; + } + + video_decoder = std::static_pointer_cast( + data_ctx->GetPrivate(DECODER_CTX)); + if (video_decoder == nullptr) { + MBLOG_ERROR << "Video decoder is not init"; + return modelbox::STATUS_FAULT; + } + } + std::list> frame_list; modelbox::Status decode_ret = modelbox::STATUS_SUCCESS; for (auto &pkt : pkt_list) { @@ -90,7 +105,9 @@ modelbox::Status VideoDecoderFlowUnit::Process( modelbox::Status VideoDecoderFlowUnit::ReadData( const std::shared_ptr &data_ctx, - std::vector> &pkt_list) { + std::vector> &pkt_list, + std::shared_ptr &flag_buffer) { + bool reset_flag = false; auto video_packet_input = data_ctx->Input(VIDEO_PACKET_INPUT); if (video_packet_input == nullptr) { MBLOG_ERROR << "video packet input is null"; @@ -104,6 +121,14 @@ modelbox::Status VideoDecoderFlowUnit::ReadData( for (size_t i = 0; i < video_packet_input->Size(); ++i) { auto packet_buffer = video_packet_input->At(i); + + if (reset_flag == false) { + packet_buffer->Get("reset_flag", reset_flag); + if (reset_flag == true) { + flag_buffer = packet_buffer; + } + } + std::shared_ptr pkt; auto ret = ReadAVPacket(packet_buffer, pkt); if (ret != modelbox::STATUS_SUCCESS) { @@ -260,25 +285,59 @@ modelbox::Status VideoDecoderFlowUnit::WriteData( return modelbox::STATUS_SUCCESS; } -modelbox::Status VideoDecoderFlowUnit::DataPre( - std::shared_ptr data_ctx) { - auto in_meta = data_ctx->GetInputMeta(VIDEO_PACKET_INPUT); - auto codec_id = - std::static_pointer_cast(in_meta->GetMeta(CODEC_META)); - if (codec_id == nullptr) { - MBLOG_ERROR << "Stream codec id is null, init decoder failed"; +modelbox::Status VideoDecoderFlowUnit::ReopenDecoder( + std::shared_ptr &data_ctx, + const std::shared_ptr &flag_buffer) { + auto old_source_url = std::static_pointer_cast( + data_ctx->GetPrivate(SOURCE_URL_META)); + auto old_codec_id = + std::static_pointer_cast(data_ctx->GetPrivate(CODEC_ID_META)); + + if (old_source_url == nullptr || old_codec_id == nullptr) { + MBLOG_ERROR << "Reopen decoder failed, source url or codec id is null"; return modelbox::STATUS_FAULT; } - auto source_url = - std::static_pointer_cast(in_meta->GetMeta(SOURCE_URL_META)); - if (source_url == nullptr) { - MBLOG_ERROR << "Stream source url is null, init decoder failed"; + std::string source_url; + AVCodecID codec_id; + if (flag_buffer->Get(SOURCE_URL_META, source_url) == false) { + return modelbox::STATUS_SUCCESS; + } + + if (flag_buffer->Get(CODEC_ID_META, codec_id) == false) { + return modelbox::STATUS_SUCCESS; + } + + if (source_url == *old_source_url && codec_id == *old_codec_id) { + return modelbox::STATUS_SUCCESS; + } + + MBLOG_WARN << "Reopen decoder, source url or codec id changed"; + auto ret = CloseDecoder(data_ctx); + if (ret != modelbox::STATUS_SUCCESS) { + MBLOG_ERROR << "Close decoder failed"; return modelbox::STATUS_FAULT; } + return NewDecoder(data_ctx, source_url, codec_id); +} + +modelbox::Status VideoDecoderFlowUnit::CloseDecoder( + std::shared_ptr &data_ctx) { + data_ctx->SetPrivate(DECODER_CTX, nullptr); + data_ctx->SetPrivate(CVT_CTX, nullptr); + data_ctx->SetPrivate(FRAME_INDEX_CTX, nullptr); + data_ctx->SetPrivate(SOURCE_URL_META, nullptr); + data_ctx->SetPrivate(CODEC_ID_META, nullptr); + data_ctx->SetOutputMeta(FRAME_INFO_OUTPUT, nullptr); + return modelbox::STATUS_SUCCESS; +} + +modelbox::Status VideoDecoderFlowUnit::NewDecoder( + std::shared_ptr &data_ctx, + const std::string &source_url, AVCodecID codec_id) { auto video_decoder = std::make_shared(); - auto ret = video_decoder->Init(*codec_id); + auto ret = video_decoder->Init(codec_id); if (ret != modelbox::STATUS_SUCCESS) { MBLOG_ERROR << "Video decoder init failed"; return modelbox::STATUS_FAULT; @@ -290,17 +349,40 @@ modelbox::Status VideoDecoderFlowUnit::DataPre( data_ctx->SetPrivate(DECODER_CTX, video_decoder); data_ctx->SetPrivate(CVT_CTX, color_cvt); data_ctx->SetPrivate(FRAME_INDEX_CTX, frame_index); + data_ctx->SetPrivate(SOURCE_URL_META, + std::make_shared(source_url)); + data_ctx->SetPrivate(CODEC_ID_META, std::make_shared(codec_id)); auto meta = std::make_shared(); - meta->SetMeta(SOURCE_URL_META, source_url); + meta->SetMeta(SOURCE_URL_META, std::make_shared(source_url)); data_ctx->SetOutputMeta(FRAME_INFO_OUTPUT, meta); MBLOG_INFO << "Video decoder init success"; MBLOG_INFO << "Video decoder output pix fmt " << out_pix_fmt_str_; return modelbox::STATUS_OK; } +modelbox::Status VideoDecoderFlowUnit::DataPre( + std::shared_ptr data_ctx) { + auto in_meta = data_ctx->GetInputMeta(VIDEO_PACKET_INPUT); + auto codec_id = + std::static_pointer_cast(in_meta->GetMeta(CODEC_META)); + if (codec_id == nullptr) { + MBLOG_ERROR << "Stream codec id is null, init decoder failed"; + return modelbox::STATUS_FAULT; + } + + auto source_url = + std::static_pointer_cast(in_meta->GetMeta(SOURCE_URL_META)); + if (source_url == nullptr) { + MBLOG_ERROR << "Stream source url is null, init decoder failed"; + return modelbox::STATUS_FAULT; + } + + return NewDecoder(data_ctx, *source_url, *codec_id); +} + modelbox::Status VideoDecoderFlowUnit::DataPost( std::shared_ptr data_ctx) { - return modelbox::STATUS_OK; + return CloseDecoder(data_ctx); } MODELBOX_FLOWUNIT(VideoDecoderFlowUnit, desc) { diff --git a/src/drivers/devices/cpu/flowunit/video_decoder/video_decoder_flowunit.h b/src/drivers/devices/cpu/flowunit/video_decoder/video_decoder_flowunit.h index e4a16d0d2..83c2250dc 100644 --- a/src/drivers/devices/cpu/flowunit/video_decoder/video_decoder_flowunit.h +++ b/src/drivers/devices/cpu/flowunit/video_decoder/video_decoder_flowunit.h @@ -57,7 +57,8 @@ constexpr const char *FLOWUNIT_DESC = "\t\tField Name: shape, Type: vector\n" "\t\tField Name: type, Type: ModelBoxDataType::MODELBOX_UINT8\n" "\t@Constraint: The flowuint 'video_decoder' must be used pair " - "with 'video_demuxer. the output buffer meta fields 'pix_fmt' is 'brg_packed' or 'rgb_packed', 'layout' is 'hcw'."; + "with 'video_demuxer. the output buffer meta fields 'pix_fmt' is " + "'brg_packed' or 'rgb_packed', 'layout' is 'hcw'."; constexpr const char *CODEC_META = "codec_meta"; constexpr const char *DECODER_CTX = "decoder_ctx"; constexpr const char *CVT_CTX = "converter_ctx"; @@ -65,6 +66,7 @@ constexpr const char *FRAME_INDEX_CTX = "frame_index_ctx"; constexpr const char *VIDEO_PACKET_INPUT = "in_video_packet"; constexpr const char *FRAME_INFO_OUTPUT = "out_video_frame"; constexpr const char *SOURCE_URL_META = "source_url"; +constexpr const char *CODEC_ID_META = "codec_id"; constexpr const char *LAST_FRAME = "last_frame"; class VideoDecoderFlowUnit : public modelbox::FlowUnit { @@ -99,7 +101,8 @@ class VideoDecoderFlowUnit : public modelbox::FlowUnit { private: modelbox::Status ReadData( const std::shared_ptr &data_ctx, - std::vector> &pkt_list); + std::vector> &pkt_list, + std::shared_ptr &flag_buffer); modelbox::Status ReadAVPacket( const std::shared_ptr &packet_buffer, std::shared_ptr &pkt); @@ -109,6 +112,15 @@ class VideoDecoderFlowUnit : public modelbox::FlowUnit { std::list> &frame_list, bool eos); + modelbox::Status CloseDecoder( + std::shared_ptr &data_ctx); + modelbox::Status NewDecoder(std::shared_ptr &data_ctx, + const std::string &source_url, + AVCodecID codec_id); + modelbox::Status ReopenDecoder( + std::shared_ptr &data_ctx, + const std::shared_ptr &flag_buffer); + AVPixelFormat out_pix_fmt_{AV_PIX_FMT_NV12}; std::string out_pix_fmt_str_; }; diff --git a/src/drivers/devices/cpu/flowunit/video_demuxer/ffmpeg_reader.cc b/src/drivers/devices/cpu/flowunit/video_demuxer/ffmpeg_reader.cc index f8b3ed88c..3327ec0ab 100644 --- a/src/drivers/devices/cpu/flowunit/video_demuxer/ffmpeg_reader.cc +++ b/src/drivers/devices/cpu/flowunit/video_demuxer/ffmpeg_reader.cc @@ -42,6 +42,8 @@ modelbox::Status FfmpegReader::Open(const std::string &source_url) { #if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(58, 9, 100) av_register_all(); #endif + format_ctx_ = nullptr; + auto ret = avformat_network_init(); if (ret < 0) { GET_FFMPEG_ERR(ret, err_str); @@ -60,6 +62,7 @@ modelbox::Status FfmpegReader::Open(const std::string &source_url) { AVFormatContext *ctx = nullptr; ctx = avformat_alloc_context(); if (ctx == nullptr) { + av_dict_free(&options); return {modelbox::STATUS_FAULT, "ctx is null"}; } ResetStartTime(); @@ -71,6 +74,7 @@ modelbox::Status FfmpegReader::Open(const std::string &source_url) { GET_FFMPEG_ERR(ret, err_str); MBLOG_ERROR << "avformat open input[" << format_source_url_ << "] failed, err " << err_str; + avformat_close_input(&ctx); return modelbox::STATUS_FAULT; } diff --git a/src/drivers/devices/cpu/flowunit/video_demuxer/ffmpeg_video_demuxer.cc b/src/drivers/devices/cpu/flowunit/video_demuxer/ffmpeg_video_demuxer.cc index aaad3feb5..853557b41 100644 --- a/src/drivers/devices/cpu/flowunit/video_demuxer/ffmpeg_video_demuxer.cc +++ b/src/drivers/devices/cpu/flowunit/video_demuxer/ffmpeg_video_demuxer.cc @@ -30,6 +30,11 @@ modelbox::Status FfmpegVideoDemuxer::Init(std::shared_ptr &reader, bool key_frame_only) { source_url_ = reader->GetSourceURL(); format_ctx_ = reader->GetCtx(); + + if (format_ctx_ == nullptr) { + return modelbox::STATUS_FAULT; + } + reader_ = reader; auto ret = SetupStreamInfo(); if (ret != modelbox::STATUS_SUCCESS) { diff --git a/src/drivers/devices/cpu/flowunit/video_demuxer/video_demux_flowunit_retry_test.cc b/src/drivers/devices/cpu/flowunit/video_demuxer/video_demux_flowunit_retry_test.cc index 2a7039355..e8d74ac95 100644 --- a/src/drivers/devices/cpu/flowunit/video_demuxer/video_demux_flowunit_retry_test.cc +++ b/src/drivers/devices/cpu/flowunit/video_demuxer/video_demux_flowunit_retry_test.cc @@ -63,7 +63,7 @@ modelbox::Status VideoDemuxerFlowUnitRetryTest::StartFlow( driver_flow_->BuildAndRun("VideoDecoder", toml_content, -1); std::string source_type = "url"; std::string data_source_cfg = R"({ - "url": "rtsp://0.0.0.1:554/sample_100kbit.mp4", + "url": "rtsp://192.168.59.29:10054/live/k14XeNAIR", "url_type": "stream" })"; flow_ = driver_flow_->GetFlow(); @@ -79,6 +79,7 @@ TEST_F(VideoDemuxerFlowUnitRetryTest, RtspInputTest) { std::string VideoDemuxerFlowUnitRetryTest::GetRtspTomlConfig() { const std::string test_lib_dir = TEST_DRIVER_DIR; + const std::string dest_url = "rtmp://192.168.59.29:10035/live/iEunZv0IR?sign=mPu7WDASRz"; std::string toml_content = R"( [log] @@ -95,12 +96,16 @@ std::string VideoDemuxerFlowUnitRetryTest::GetRtspTomlConfig() { data_source_parser[type=flowunit, flowunit=data_source_parser, device=cpu, deviceid=0, retry_interval_ms = 1000, obs_retry_interval_ms = 3000,url_retry_interval_ms = 1000, label="", plugin_dir=")" + test_lib_dir + R"("] videodemuxer[type=flowunit, flowunit=video_demuxer, device=cpu, deviceid=0, label=" | ", queue_size = 16] - videodecoder[type=flowunit, flowunit=video_decoder, device=cpu, deviceid=0, label=" | ", pix_fmt=nv12, queue_size = 16] - read_frame[type=flowunit, flowunit=read_frame, device=cpu, deviceid=0, label="", queue_size = 16] + videodecoder[type=flowunit, flowunit=video_decoder, device=cpu, deviceid=0, label=" | ", pix_fmt=rgb, queue_size = 16] + // videodecoder[type=flowunit, flowunit=video_decoder, device=cuda, deviceid=0, label=" | ", pix_fmt=rgb, queue_size = 16] + // videodecoder[type=flowunit, flowunit=video_decoder, device=ascend, deviceid=0, label=" | ", pix_fmt=nv12, queue_size = 16] + videoencoder[type=flowunit, flowunit=video_encoder, device=cpu, queue_size = 16, deviceid=0, default_dest_url=")" + + dest_url + R"( + ", format=flv, encoder=libx264 ] input -> data_source_parser:in_data data_source_parser:out_video_url -> videodemuxer:in_video_url videodemuxer:out_video_packet -> videodecoder:in_video_packet - videodecoder:out_video_frame -> read_frame:frame_info + videodecoder:out_video_frame -> videoencoder:in_video_frame }''' format = "graphviz" )"; diff --git a/src/drivers/devices/cpu/flowunit/video_demuxer/video_demuxer_flowunit.cc b/src/drivers/devices/cpu/flowunit/video_demuxer/video_demuxer_flowunit.cc index 9c058bc63..02723a154 100644 --- a/src/drivers/devices/cpu/flowunit/video_demuxer/video_demuxer_flowunit.cc +++ b/src/drivers/devices/cpu/flowunit/video_demuxer/video_demuxer_flowunit.cc @@ -69,6 +69,9 @@ modelbox::Status VideoDemuxerFlowUnit::Process( std::shared_ptr pkt; if (demuxer_worker != nullptr) { demux_status = demuxer_worker->ReadPacket(pkt); + if (demux_status == modelbox::STATUS_NODATA) { + is_retry_reset_ = true; + } } if (demux_status == modelbox::STATUS_OK) { @@ -126,6 +129,16 @@ modelbox::Status VideoDemuxerFlowUnit::WriteData( } auto packet_buffer = video_packet_output->At(0); + if (is_retry_reset_) { + bool is_reset = true; + auto codec_id = std::make_shared(video_demuxer->GetCodecID()); + auto source_url = + std::static_pointer_cast(data_ctx->GetPrivate(SOURCE_URL)); + packet_buffer->Set("reset_flag", is_reset); + packet_buffer->Set("source_url", *source_url); + packet_buffer->Set("codec_id", video_demuxer->GetCodecID()); + is_retry_reset_ = false; + } packet_buffer->Set("pts", pkt->pts); packet_buffer->Set("dts", pkt->dts); packet_buffer->Set("time_base", video_demuxer->GetTimeBase()); @@ -281,6 +294,14 @@ modelbox::Status VideoDemuxerFlowUnit::InitDemuxer( } video_demuxer->LogStreamInfo(); + int32_t width = 0; + int32_t height = 0; + video_demuxer->GetFrameMeta(&width, &height); + if (width == 0 || height == 0) { + MBLOG_ERROR << "video demuxer get frame meta failed"; + return modelbox::STATUS_FAULT; + } + auto codec_id = video_demuxer->GetCodecID(); auto profile_id = video_demuxer->GetProfileID(); // reset meta value diff --git a/src/drivers/devices/cpu/flowunit/video_demuxer/video_demuxer_flowunit.h b/src/drivers/devices/cpu/flowunit/video_demuxer/video_demuxer_flowunit.h index ce113e642..03b90a909 100644 --- a/src/drivers/devices/cpu/flowunit/video_demuxer/video_demuxer_flowunit.h +++ b/src/drivers/devices/cpu/flowunit/video_demuxer/video_demuxer_flowunit.h @@ -108,6 +108,7 @@ class VideoDemuxerFlowUnit bool key_frame_only_{false}; size_t queue_size_{32}; + bool is_retry_reset_{false}; }; class DemuxerWorker { diff --git a/src/drivers/devices/cpu/flowunit/video_encoder/ffmpeg_video_muxer.cc b/src/drivers/devices/cpu/flowunit/video_encoder/ffmpeg_video_muxer.cc index 85f1d45ca..4c7c62410 100644 --- a/src/drivers/devices/cpu/flowunit/video_encoder/ffmpeg_video_muxer.cc +++ b/src/drivers/devices/cpu/flowunit/video_encoder/ffmpeg_video_muxer.cc @@ -69,6 +69,10 @@ modelbox::Status FfmpegVideoMuxer::Mux(const AVRational &time_base, auto ret = av_interleaved_write_frame(format_ctx_.get(), av_packet.get()); if (ret < 0) { + if (ret == AVERROR(EPIPE) || ret == AVERROR_EOF) { + MBLOG_ERROR << "remote end closed the connection"; + return modelbox::STATUS_NOSTREAM; + } GET_FFMPEG_ERR(ret, ffmpeg_err); MBLOG_ERROR << "av_write_frame failed, ret " << ffmpeg_err; return modelbox::STATUS_FAULT; diff --git a/src/drivers/devices/cpu/flowunit/video_encoder/video_encoder_flowunit.cc b/src/drivers/devices/cpu/flowunit/video_encoder/video_encoder_flowunit.cc index c836c8be3..e14607f3c 100644 --- a/src/drivers/devices/cpu/flowunit/video_encoder/video_encoder_flowunit.cc +++ b/src/drivers/devices/cpu/flowunit/video_encoder/video_encoder_flowunit.cc @@ -65,6 +65,58 @@ modelbox::Status VideoEncoderFlowUnit::Process( return modelbox::STATUS_FAULT; } + if (reopen_remote_ == true) { + + static time_t last_time = 0; + time_t now = time(nullptr); + + if (now - last_time < 5) { + return modelbox::STATUS_SUCCESS; + } + + muxer = nullptr; + encoder = nullptr; + color_cvt = nullptr; + + auto frame_buffer_list = data_ctx->Input(FRAME_INFO_INPUT); + auto buffer = frame_buffer_list->At(0); + + int32_t width = 0; + int32_t height = 0; + int32_t rate_num = 0; + int32_t rate_den = 0; + + buffer->Get("width", width); + buffer->Get("height", height); + buffer->Get("rate_num", rate_num); + buffer->Get("rate_den", rate_den); + + if (width == 0 || height == 0 || rate_num == 0 || rate_den == 0) { + MBLOG_ERROR << "buffer meta is invalid"; + return modelbox::STATUS_SUCCESS; + } + + CloseMuexer(data_ctx); + if (OpenMuxer(data_ctx, width, height, rate_num, rate_den, "") != + modelbox::STATUS_SUCCESS) { + MBLOG_ERROR << "Open muxer failed"; + return modelbox::STATUS_FAULT; + } + + muxer = std::static_pointer_cast( + data_ctx->GetPrivate(MUXER_CTX)); + encoder = std::static_pointer_cast( + data_ctx->GetPrivate(ENCODER_CTX)); + color_cvt = std::static_pointer_cast( + data_ctx->GetPrivate(COLOR_CVT_CTX)); + if (muxer == nullptr || encoder == nullptr || color_cvt == nullptr) { + MBLOG_ERROR << "Open muxer failed"; + return modelbox::STATUS_FAULT; + } + + reopen_remote_ = false; + } + std::vector> av_packet_list; ret = EncodeFrame(encoder, av_frame_list, av_packet_list); if (ret != modelbox::STATUS_SUCCESS) { @@ -74,6 +126,12 @@ modelbox::Status VideoEncoderFlowUnit::Process( ret = MuxPacket(muxer, encoder->GetCtx()->time_base, av_packet_list); if (ret != modelbox::STATUS_SUCCESS) { + if (ret == modelbox::STATUS_NOSTREAM) { + MBLOG_WARN << "No stream to mux, retry."; + reopen_remote_ = true; + return modelbox::STATUS_SUCCESS; + } + MBLOG_ERROR << "Mux packet failed"; return modelbox::STATUS_FAULT; } @@ -216,43 +274,34 @@ modelbox::Status VideoEncoderFlowUnit::MuxPacket( return modelbox::STATUS_SUCCESS; } -modelbox::Status VideoEncoderFlowUnit::DataPre( - std::shared_ptr data_ctx) { - std::string dest_url; - auto ret = GetDestUrl(data_ctx, dest_url); - if (ret != modelbox::STATUS_SUCCESS) { - return modelbox::STATUS_FAULT; - } - - auto frame_buffer_list = data_ctx->Input(FRAME_INFO_INPUT); - if (frame_buffer_list == nullptr || frame_buffer_list->Size() == 0) { - MBLOG_ERROR << "Input [frame_info] is empty"; - return modelbox::STATUS_FAULT; - } - - auto frame_buffer = frame_buffer_list->At(0); - int32_t width = 0; - int32_t height = 0; - int32_t rate_num = 0; - int32_t rate_den = 0; - frame_buffer->Get("width", width); - frame_buffer->Get("height", height); - frame_buffer->Get("rate_num", rate_num); - frame_buffer->Get("rate_den", rate_den); - - if (width == 0 || height == 0) { - MBLOG_ERROR << "buffer meta is invalid"; - return modelbox::STATUS_INVALID; - } +modelbox::Status VideoEncoderFlowUnit::OpenMuxer( + const std::shared_ptr &data_ctx, int32_t width, + int32_t height, int32_t rate_num, int32_t rate_den, std::string dest_url) { + MBLOG_WARN << "OpenMuxer, width " << width << ", height " << height + << ", rate_num " << rate_num << ", rate_den " << rate_den + << ", dest_url " << dest_url; if (rate_num == 0 || rate_den == 0) { rate_num = 25; rate_den = 1; } + if (dest_url == "") { + auto dest_url_ptr = + std::static_pointer_cast(data_ctx->GetPrivate("dest_url")); + if (dest_url_ptr != nullptr) { + dest_url = *dest_url_ptr; + } + + if (dest_url == "") { + MBLOG_ERROR << "dest_url is empty"; + return modelbox::STATUS_FAULT; + } + } + auto encoder = std::make_shared(); - ret = encoder->Init(width, height, {rate_num, rate_den}, bit_rate_, - encoder_name_); + auto ret = encoder->Init(width, height, {rate_num, rate_den}, bit_rate_, + encoder_name_); if (ret != modelbox::STATUS_SUCCESS) { MBLOG_ERROR << "Init encoder failed"; return modelbox::STATUS_FAULT; @@ -280,6 +329,7 @@ modelbox::Status VideoEncoderFlowUnit::DataPre( data_ctx->SetPrivate(COLOR_CVT_CTX, color_cvt); auto frame_index_ptr = std::make_shared(0); data_ctx->SetPrivate(FRAME_INDEX_CTX, frame_index_ptr); + data_ctx->SetPrivate("dest_url", std::make_shared(dest_url)); MBLOG_INFO << "Video encoder init success" << ", width " << width << ", height " << height << ", rate " << rate_num << "/" << rate_den << ", format " << format_name_ @@ -288,6 +338,50 @@ modelbox::Status VideoEncoderFlowUnit::DataPre( return modelbox::STATUS_OK; } +modelbox::Status VideoEncoderFlowUnit::CloseMuexer( + const std::shared_ptr &data_ctx) { + data_ctx->SetPrivate(MUXER_CTX, nullptr); + data_ctx->SetPrivate(ENCODER_CTX, nullptr); + data_ctx->SetPrivate(COLOR_CVT_CTX, nullptr); + + auto frame_index_ptr = std::make_shared(0); + data_ctx->SetPrivate(FRAME_INDEX_CTX, frame_index_ptr); + + return modelbox::STATUS_OK; +} + +modelbox::Status VideoEncoderFlowUnit::DataPre( + std::shared_ptr data_ctx) { + std::string dest_url; + auto ret = GetDestUrl(data_ctx, dest_url); + if (ret != modelbox::STATUS_SUCCESS) { + return modelbox::STATUS_FAULT; + } + + auto frame_buffer_list = data_ctx->Input(FRAME_INFO_INPUT); + if (frame_buffer_list == nullptr || frame_buffer_list->Size() == 0) { + MBLOG_ERROR << "Input [frame_info] is empty"; + return modelbox::STATUS_FAULT; + } + + auto frame_buffer = frame_buffer_list->At(0); + int32_t width = 0; + int32_t height = 0; + int32_t rate_num = 0; + int32_t rate_den = 0; + frame_buffer->Get("width", width); + frame_buffer->Get("height", height); + frame_buffer->Get("rate_num", rate_num); + frame_buffer->Get("rate_den", rate_den); + + if (width == 0 || height == 0) { + MBLOG_ERROR << "buffer meta is invalid"; + return modelbox::STATUS_INVALID; + } + + return OpenMuxer(data_ctx, width, height, rate_num, rate_den, dest_url); +} + modelbox::Status VideoEncoderFlowUnit::GetDestUrl( std::shared_ptr &data_ctx, std::string &dest_url) { auto stream_meta = data_ctx->GetInputMeta(FRAME_INFO_INPUT); diff --git a/src/drivers/devices/cpu/flowunit/video_encoder/video_encoder_flowunit.h b/src/drivers/devices/cpu/flowunit/video_encoder/video_encoder_flowunit.h index 484debf4e..adbbbc176 100644 --- a/src/drivers/devices/cpu/flowunit/video_encoder/video_encoder_flowunit.h +++ b/src/drivers/devices/cpu/flowunit/video_encoder/video_encoder_flowunit.h @@ -114,10 +114,18 @@ class VideoEncoderFlowUnit : public modelbox::FlowUnit { const AVRational &time_base, std::vector> &av_packet_list); + modelbox::Status OpenMuxer( + const std::shared_ptr &data_ctx, int32_t width, + int32_t height, int32_t rate_num, int32_t rate_den, std::string dest_url); + + modelbox::Status CloseMuexer( + const std::shared_ptr &data_ctx); + std::string default_dest_url_; std::string format_name_; std::string encoder_name_; uint64_t bit_rate_{0}; + bool reopen_remote_{false}; }; #endif // MODELBOX_FLOWUNIT_VIDEO_ENCODER_CPU_H_ diff --git a/src/drivers/devices/cuda/flowunit/video_decoder/nvcodec_video_decoder.cc b/src/drivers/devices/cuda/flowunit/video_decoder/nvcodec_video_decoder.cc index 636a9814d..174b6161a 100644 --- a/src/drivers/devices/cuda/flowunit/video_decoder/nvcodec_video_decoder.cc +++ b/src/drivers/devices/cuda/flowunit/video_decoder/nvcodec_video_decoder.cc @@ -119,7 +119,7 @@ NvcodecVideoDecoder::~NvcodecVideoDecoder() { modelbox::Status NvcodecVideoDecoder::Init(const std::string &device_id, AVCodecID codec_id, - std::string &file_url, + const std::string &file_url, bool skip_err_frame, bool no_delay) { gpu_id_ = std::stoi(device_id); MBLOG_INFO << "Init decode in gpu " << gpu_id_; diff --git a/src/drivers/devices/cuda/flowunit/video_decoder/nvcodec_video_decoder.h b/src/drivers/devices/cuda/flowunit/video_decoder/nvcodec_video_decoder.h index ebb90c03d..2a37a81ba 100644 --- a/src/drivers/devices/cuda/flowunit/video_decoder/nvcodec_video_decoder.h +++ b/src/drivers/devices/cuda/flowunit/video_decoder/nvcodec_video_decoder.h @@ -117,7 +117,7 @@ class NvcodecVideoDecoder { virtual ~NvcodecVideoDecoder(); modelbox::Status Init(const std::string &device_id, AVCodecID codec_id, - std::string &file_url, bool skip_err_frame, + const std::string &file_url, bool skip_err_frame, bool no_delay); modelbox::Status Decode( diff --git a/src/drivers/devices/cuda/flowunit/video_decoder/video_decoder_flowunit.cc b/src/drivers/devices/cuda/flowunit/video_decoder/video_decoder_flowunit.cc index 872dfa24f..3cf8447d9 100644 --- a/src/drivers/devices/cuda/flowunit/video_decoder/video_decoder_flowunit.cc +++ b/src/drivers/devices/cuda/flowunit/video_decoder/video_decoder_flowunit.cc @@ -44,6 +44,7 @@ modelbox::Status VideoDecoderFlowUnit::Close() { return modelbox::STATUS_OK; } modelbox::Status VideoDecoderFlowUnit::Process( std::shared_ptr data_ctx) { + std::shared_ptr flag_buffer = nullptr; auto video_decoder = std::static_pointer_cast( data_ctx->GetPrivate(DECODER_CTX)); if (video_decoder == nullptr) { @@ -52,12 +53,26 @@ modelbox::Status VideoDecoderFlowUnit::Process( } std::vector> pkt_list; - auto ret = ReadData(data_ctx, pkt_list); + auto ret = ReadData(data_ctx, pkt_list, flag_buffer); if (ret != modelbox::STATUS_SUCCESS) { MBLOG_ERROR << "Read av_packet input failed"; return modelbox::STATUS_FAULT; } + if (flag_buffer) { + if (ReopenDecoder(data_ctx, flag_buffer) != modelbox::STATUS_SUCCESS) { + MBLOG_ERROR << "Reopen decoder failed"; + return modelbox::STATUS_FAULT; + } + + video_decoder = std::static_pointer_cast( + data_ctx->GetPrivate(DECODER_CTX)); + if (video_decoder == nullptr) { + MBLOG_ERROR << "Video decoder is not init"; + return modelbox::STATUS_FAULT; + } + } + std::vector> frame_list; modelbox::Status decode_ret = modelbox::STATUS_SUCCESS; @@ -97,7 +112,9 @@ modelbox::Status VideoDecoderFlowUnit::Process( modelbox::Status VideoDecoderFlowUnit::ReadData( const std::shared_ptr &data_ctx, - std::vector> &pkt_list) { + std::vector> &pkt_list, + std::shared_ptr &flag_buffer) { + bool reset_flag = false; auto video_packet_input = data_ctx->Input(VIDEO_PACKET_INPUT); if (video_packet_input == nullptr) { MBLOG_ERROR << "video packet input is null"; @@ -111,6 +128,14 @@ modelbox::Status VideoDecoderFlowUnit::ReadData( for (size_t i = 0; i < video_packet_input->Size(); ++i) { auto packet_buffer = video_packet_input->At(i); + + if (reset_flag == false) { + packet_buffer->Get("reset_flag", reset_flag); + if (reset_flag == true) { + flag_buffer = packet_buffer; + } + } + std::shared_ptr pkt; auto ret = ReadNvcodecPacket(packet_buffer, pkt); if (ret != modelbox::STATUS_SUCCESS) { @@ -246,29 +271,60 @@ modelbox::Status VideoDecoderFlowUnit::WriteData( return modelbox::STATUS_SUCCESS; } -modelbox::Status VideoDecoderFlowUnit::DataPre( - std::shared_ptr data_ctx) { - MBLOG_INFO << "Video Decode Start"; - auto in_meta = data_ctx->GetInputMeta(VIDEO_PACKET_INPUT); - auto codec_id = - std::static_pointer_cast(in_meta->GetMeta(CODEC_META)); - if (codec_id == nullptr) { - MBLOG_ERROR << "Stream codec id is null, init decoder failed"; +modelbox::Status VideoDecoderFlowUnit::ReopenDecoder( + std::shared_ptr &data_ctx, + const std::shared_ptr &flag_buffer) { + auto old_source_url = std::static_pointer_cast( + data_ctx->GetPrivate(SOURCE_URL_META)); + auto old_codec_id = + std::static_pointer_cast(data_ctx->GetPrivate(CODEC_ID_META)); + + if (old_source_url == nullptr || old_codec_id == nullptr) { + MBLOG_ERROR << "Reopen decoder failed, source url or codec id is null"; return modelbox::STATUS_FAULT; } - auto source_url = - std::static_pointer_cast(in_meta->GetMeta(SOURCE_URL_META)); - if (source_url == nullptr) { - MBLOG_ERROR << "Stream source url is null, init decoder failed"; + std::string source_url; + AVCodecID codec_id; + if (flag_buffer->Get(SOURCE_URL_META, source_url) == false) { + return modelbox::STATUS_SUCCESS; + } + + if (flag_buffer->Get(CODEC_ID_META, codec_id) == false) { + return modelbox::STATUS_SUCCESS; + } + + if (source_url == *old_source_url && codec_id == *old_codec_id) { + return modelbox::STATUS_SUCCESS; + } + + MBLOG_WARN << "Reopen decoder, source url or codec id changed"; + auto ret = CloseDecoder(data_ctx); + if (ret != modelbox::STATUS_SUCCESS) { + MBLOG_ERROR << "Close decoder failed"; return modelbox::STATUS_FAULT; } + return NewDecoder(data_ctx, source_url, codec_id); +} + +modelbox::Status VideoDecoderFlowUnit::CloseDecoder( + std::shared_ptr &data_ctx) { + data_ctx->SetPrivate(DECODER_CTX, nullptr); + data_ctx->SetPrivate(CVT_CTX, nullptr); + data_ctx->SetPrivate(FRAME_INDEX_CTX, nullptr); + data_ctx->SetOutputMeta(FRAME_INFO_OUTPUT, nullptr); + return modelbox::STATUS_SUCCESS; +} + +modelbox::Status VideoDecoderFlowUnit::NewDecoder( + std::shared_ptr &data_ctx, + const std::string &source_url, AVCodecID codec_id) { auto video_decoder = std::make_shared(); // when concurrency limit set, no delay must be true to avoid gpu cache auto no_delay = concurrency_limit_ != 0; - auto ret = video_decoder->Init(GetBindDevice()->GetDeviceID(), *codec_id, - *source_url, skip_err_frame_, no_delay); + auto ret = video_decoder->Init(GetBindDevice()->GetDeviceID(), codec_id, + source_url, skip_err_frame_, no_delay); if (ret != modelbox::STATUS_SUCCESS) { MBLOG_ERROR << "Video decoder init failed"; return modelbox::STATUS_FAULT; @@ -280,8 +336,11 @@ modelbox::Status VideoDecoderFlowUnit::DataPre( data_ctx->SetPrivate(DECODER_CTX, video_decoder); data_ctx->SetPrivate(CVT_CTX, color_cvt); data_ctx->SetPrivate(FRAME_INDEX_CTX, frame_index); + data_ctx->SetPrivate(SOURCE_URL_META, + std::make_shared(source_url)); + data_ctx->SetPrivate(CODEC_ID_META, std::make_shared(codec_id)); auto meta = std::make_shared(); - meta->SetMeta(SOURCE_URL_META, source_url); + meta->SetMeta(SOURCE_URL_META, std::make_shared(source_url)); data_ctx->SetOutputMeta(FRAME_INFO_OUTPUT, meta); MBLOG_INFO << "Video decoder init success"; MBLOG_INFO << "Video decoder output pix fmt " << out_pix_fmt_str_; @@ -289,9 +348,36 @@ modelbox::Status VideoDecoderFlowUnit::DataPre( return modelbox::STATUS_OK; } +modelbox::Status VideoDecoderFlowUnit::DataPre( + std::shared_ptr data_ctx) { + MBLOG_INFO << "Video Decode Start"; + auto in_meta = data_ctx->GetInputMeta(VIDEO_PACKET_INPUT); + auto codec_id = + std::static_pointer_cast(in_meta->GetMeta(CODEC_META)); + if (codec_id == nullptr) { + MBLOG_ERROR << "Stream codec id is null, init decoder failed"; + return modelbox::STATUS_FAULT; + } + + auto source_url = + std::static_pointer_cast(in_meta->GetMeta(SOURCE_URL_META)); + if (source_url == nullptr) { + MBLOG_ERROR << "Stream source url is null, init decoder failed"; + return modelbox::STATUS_FAULT; + } + + return NewDecoder(data_ctx, *source_url, *codec_id); +} + modelbox::Status VideoDecoderFlowUnit::DataPost( std::shared_ptr data_ctx) { - return modelbox::STATUS_OK; + data_ctx->SetPrivate(DECODER_CTX, nullptr); + data_ctx->SetPrivate(CVT_CTX, nullptr); + data_ctx->SetPrivate(FRAME_INDEX_CTX, nullptr); + data_ctx->SetPrivate(SOURCE_URL_META, nullptr); + data_ctx->SetPrivate(CODEC_ID_META, nullptr); + data_ctx->SetOutputMeta(FRAME_INFO_OUTPUT, nullptr); + return modelbox::STATUS_SUCCESS; } MODELBOX_FLOWUNIT(VideoDecoderFlowUnit, desc) { diff --git a/src/drivers/devices/cuda/flowunit/video_decoder/video_decoder_flowunit.h b/src/drivers/devices/cuda/flowunit/video_decoder/video_decoder_flowunit.h index 27df7733a..605b60ee8 100644 --- a/src/drivers/devices/cuda/flowunit/video_decoder/video_decoder_flowunit.h +++ b/src/drivers/devices/cuda/flowunit/video_decoder/video_decoder_flowunit.h @@ -66,6 +66,7 @@ constexpr const char *FRAME_INDEX_CTX = "frame_index_ctx"; constexpr const char *VIDEO_PACKET_INPUT = "in_video_packet"; constexpr const char *FRAME_INFO_OUTPUT = "out_video_frame"; constexpr const char *SOURCE_URL_META = "source_url"; +constexpr const char *CODEC_ID_META = "codec_id"; constexpr const char *LAST_FRAME = "last_frame"; class VideoDecoderFlowUnit : public modelbox::FlowUnit { @@ -100,7 +101,8 @@ class VideoDecoderFlowUnit : public modelbox::FlowUnit { private: modelbox::Status ReadData( const std::shared_ptr &data_ctx, - std::vector> &pkt_list); + std::vector> &pkt_list, + std::shared_ptr &flag_buffer); modelbox::Status ReadNvcodecPacket( const std::shared_ptr &packet_buffer, std::shared_ptr &pkt); @@ -110,6 +112,15 @@ class VideoDecoderFlowUnit : public modelbox::FlowUnit { const std::string &file_url); modelbox::Status CreateCudaContext(CUcontext &cu_ctx, std::string &device_id); + modelbox::Status CloseDecoder( + std::shared_ptr &data_ctx); + modelbox::Status NewDecoder(std::shared_ptr &data_ctx, + const std::string &source_url, + AVCodecID codec_id); + modelbox::Status ReopenDecoder( + std::shared_ptr &data_ctx, + const std::shared_ptr &flag_buffer); + std::string out_pix_fmt_str_; bool skip_err_frame_{false}; std::string device_id_; diff --git a/src/drivers/devices/rockchip/common/video_out/ffmpeg_video_muxer.cc b/src/drivers/devices/rockchip/common/video_out/ffmpeg_video_muxer.cc index 4b9f4ceed..81160c7ac 100644 --- a/src/drivers/devices/rockchip/common/video_out/ffmpeg_video_muxer.cc +++ b/src/drivers/devices/rockchip/common/video_out/ffmpeg_video_muxer.cc @@ -84,6 +84,11 @@ modelbox::Status FfmpegVideoMuxer::Mux( auto ret = av_interleaved_write_frame(format_ctx_.get(), av_packet.get()); if (ret < 0) { + if (ret == AVERROR(EPIPE) || ret == AVERROR_EOF) { + MBLOG_ERROR << "remote end closed the connection"; + return modelbox::STATUS_NOSTREAM; + } + GET_FFMPEG_ERR(ret, ffmpeg_err); MBLOG_ERROR << "av_write_frame failed, ret " << ffmpeg_err; return {modelbox::STATUS_FAULT,