diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e83346e..ca1a699d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -269,6 +269,9 @@ if(BUILD_GSTREAMER_PLUGIN) add_executable(kvs_gstreamer_file_uploader_sample samples/kvs_gstreamer_file_uploader_sample.cpp) target_link_libraries(kvs_gstreamer_file_uploader_sample ${GST_APP_LIBRARIES}) + add_executable(gstreamer_sandbox samples/gstreamer_sandbox.cpp) + target_link_libraries(gstreamer_sandbox ${GST_APP_LIBRARIES} KinesisVideoProducer) + install( TARGETS gstkvssink ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" diff --git a/samples/gstreamer_sandbox.cpp b/samples/gstreamer_sandbox.cpp new file mode 100644 index 00000000..133f9e4f --- /dev/null +++ b/samples/gstreamer_sandbox.cpp @@ -0,0 +1,146 @@ +#include +#include +#include +#include +#include "KinesisVideoProducer.h" +#include +#include +#include +#include +#include "gstreamer/gstkvssink.h" +#include +#include "include.h" + +using namespace log4cplus; + +LOGGER_TAG("com.amazonaws.kinesis.video.gstreamer"); + + + +typedef struct _CustomData +{ + gboolean is_live; + GstElement *pipeline; + GMainLoop *loop; +} CustomData; + +static void cb_message(GstBus *bus, GstMessage *msg, CustomData *data) +{ + + switch (GST_MESSAGE_TYPE(msg)) + { + case GST_MESSAGE_ERROR: + { + GError *err; + gchar *debug; + + gst_message_parse_error(msg, &err, &debug); + g_print("Error: %s\n", err->message); + g_error_free(err); + g_free(debug); + + gst_element_set_state(data->pipeline, GST_STATE_READY); + g_main_loop_quit(data->loop); + break; + } + case GST_MESSAGE_EOS: + /* end-of-stream */ + gst_element_set_state(data->pipeline, GST_STATE_READY); + g_main_loop_quit(data->loop); + break; + case GST_MESSAGE_BUFFERING: + { + gint percent = 0; + + /* If the stream is live, we do not care about buffering. */ + if (data->is_live) + break; + + gst_message_parse_buffering(msg, &percent); + g_print("Buffering (%3d%%)\r", percent); + /* Wait until buffering is complete before start/resume playing */ + if (percent < 100) + gst_element_set_state(data->pipeline, GST_STATE_PAUSED); + else + gst_element_set_state(data->pipeline, GST_STATE_PLAYING); + break; + } + case GST_MESSAGE_CLOCK_LOST: + /* Get a new clock */ + gst_element_set_state(data->pipeline, GST_STATE_PAUSED); + gst_element_set_state(data->pipeline, GST_STATE_PLAYING); + break; + default: + /* Unhandled message */ + break; + } +} + +void sleeper(CustomData *data) { + sleep(10); + LOG_DEBUG("Pausing..."); + gst_element_set_state(gst_bin_get_by_name(GST_BIN(data->pipeline), "source"), GST_STATE_PAUSED); + // gst_element_set_state(data->pipeline, GST_STATE_PAUSED); + sleep(10); + LOG_DEBUG("Playing..."); + gst_element_set_state(gst_bin_get_by_name(GST_BIN(data->pipeline), "source"), GST_STATE_PLAYING); + // gst_element_set_state(data->pipeline, GST_STATE_PLAYING); + + +} + +int main(int argc, char *argv[]) +{ + GstElement *pipeline; + GstBus *bus; + GstStateChangeReturn ret; + GMainLoop *main_loop; + CustomData data; + + + /* Initialize GStreamer */ + gst_init(&argc, &argv); + + /* Initialize our data structure */ + memset(&data, 0, sizeof(data)); + + /* Build the pipeline */ + pipeline = gst_parse_launch("autovideosrc name=source ! videoconvert ! video/x-raw,format=I420 ! x264enc bframes=0 bitrate=1000 tune=zerolatency ! video/x-h264,stream-format=avc,alignment=au,profile=baseline ! decodebin ! videoconvert ! clockoverlay ! autovideosink", NULL); + // pipeline = gst_parse_launch("autovideosrc name=source ! videoconvert ! video/x-raw,format=I420 ! x264enc bframes=0 bitrate=512 tune=zerolatency sync-lookahead=0 ! video/x-h264,stream-format=avc,alignment=au,profile=baseline ! \ + kvssink stream-name=aTestStream storage-size=128", NULL); + bus = gst_element_get_bus(pipeline); + + /* Start playing */ + ret = gst_element_set_state(pipeline, GST_STATE_PLAYING); + + if (ret == GST_STATE_CHANGE_FAILURE) + { + g_printerr("Unable to set the pipeline to the playing state.\n"); + gst_object_unref(pipeline); + return -1; + } + else if (ret == GST_STATE_CHANGE_NO_PREROLL) + { + data.is_live = TRUE; + } + + main_loop = g_main_loop_new(NULL, FALSE); + data.loop = main_loop; + data.pipeline = pipeline; + + gst_bus_add_signal_watch(bus); + g_signal_connect(bus, "message", G_CALLBACK(cb_message), &data); + + std::thread sleepThread(sleeper, &data); + + g_main_loop_run (main_loop); + + sleepThread.join(); + + /* Free resources */ + g_main_loop_unref(main_loop); + gst_object_unref(bus); + gst_element_set_state(pipeline, GST_STATE_NULL); + gst_object_unref(pipeline); + return 0; +} \ No newline at end of file diff --git a/samples/kvssink_gstreamer_sample.cpp b/samples/kvssink_gstreamer_sample.cpp index 4c096384..5df7405c 100644 --- a/samples/kvssink_gstreamer_sample.cpp +++ b/samples/kvssink_gstreamer_sample.cpp @@ -129,6 +129,8 @@ typedef struct _CustomData { int max_runtime; GstElement* source_element; + GstElement* encoder_element; + } CustomData; // CustomData @@ -263,7 +265,6 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem return 1; } kvssink = gst_element_factory_make("kvssink", "kvssink"); - //kvssink = gst_element_factory_make("autovideosink", "kvssink"); if (!kvssink) { LOG_ERROR("Failed to create kvssink"); return 1; @@ -320,6 +321,18 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem encoder = gst_element_factory_make("x264enc", "encoder"); + GstElement* decodebin = gst_element_factory_make("decodebin", "decodebin"); + if (!decodebin ) { + g_printerr("Not all elements could be created. decodebin\n"); + return 1; + } + + GstElement* autovideosink = gst_element_factory_make("autovideosink", "autovideosink"); + if (!autovideosink ) { + g_printerr("Not all elements could be created. autovideosink\n"); + return 1; + } + if (!pipeline || !source || !clock_overlay || !source_filter || !encoder || !filter || !kvssink || !h264parse) { g_printerr("Not all elements could be created.\n"); @@ -350,12 +363,13 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem gst_caps_unref(query_caps_raw); /* configure encoder */ - g_object_set(G_OBJECT(encoder), "bframes", 0, "bitrate", 512, NULL); - + g_object_set(G_OBJECT(encoder), "bframes", 0, "bitrate", 512, "key-int-max", 40, NULL); + /* configure output filter */ GstCaps *h264_caps = gst_caps_new_simple("video/x-h264", "stream-format", G_TYPE_STRING, "avc", "alignment", G_TYPE_STRING, "au", + "framerate", GST_TYPE_FRACTION, 20, 1, NULL); //gst_caps_set_simple(h264_caps, "profile", G_TYPE_STRING, "baseline", NULL); g_object_set(G_OBJECT(filter), "caps", h264_caps, NULL); @@ -366,6 +380,17 @@ int gstreamer_live_source_init(int argc, char *argv[], CustomData *data, GstElem determine_credentials(kvssink, data); data->source_element = source; + data->encoder_element = encoder; + + GstElement* queue = gst_element_factory_make("queue", "queue"); + + GstElement* matroskamux = gst_element_factory_make("matroskamux", "matroskamux"); + GstElement* filesink = gst_element_factory_make("filesink", "filesink"); + g_object_set(G_OBJECT(filesink), "location", "footage.mkv", NULL); + + + + /* build the pipeline */ gst_bin_add_many(GST_BIN(pipeline), source, clock_overlay, video_convert, source_filter, encoder, filter, @@ -484,34 +509,55 @@ int gstreamer_init(int argc, char *argv[], CustomData *data) { // while(data_global.stream_status != STATUS_KVS_GSTREAMER_SAMPLE_INTERRUPTED) { // count++; // LOG_DEBUG("Play/Pause count: " << count); + for (int i = 0; i < 5; i++) { + sleep(10); GstEvent* flush_start; GstEvent* flush_stop; - // GstEvent* eos; + GstEvent* eos; + + + // ./kvssink_gstreamer_sample aTestStream video-only devicesrc +/* +export GST_PLUGIN_PATH=`pwd`/build +export LD_LIBRARY_PATH=`pwd`/open-source/local/lib +*/ LOG_DEBUG("Pausing..."); - // sleep(2); - gst_element_set_state(data->source_element, GST_STATE_NULL); - flush_start = gst_event_new_flush_start(); - gst_element_send_event(pipeline, flush_start); - - // eos = gst_event_new_eos(); - // gst_element_send_event(pipeline, eos); - - sleep(10); + //sleep(2); + // gst_element_set_state(data->source_element, GST_STATE_PAUSED); + // gst_element_set_state(pipeline, GST_STATE_PAUSED); + + + + eos = gst_event_new_eos(); + gst_element_send_event(data->source_element, eos); + + +// 9864, 9902, 9912 + sleep(10); LOG_DEBUG("Playing..."); - // sleep(2); + // // sleep(2); + + // gst_element_set_state(data->encoder_element, GST_STATE_READY); + // gst_element_set_state(data->encoder_element, GST_STATE_PAUSED); + + // gst_element_set_state(data->source_element, GST_STATE_PLAYING); + flush_start = gst_event_new_flush_start(); + gst_element_send_event(pipeline, flush_start); flush_stop = gst_event_new_flush_stop(true); gst_element_send_event(pipeline, flush_stop); - gst_element_set_state(data->source_element, GST_STATE_PLAYING); - + + + gst_element_set_state(pipeline, GST_STATE_PLAYING); + } // LOG_DEBUG("Pausing..."); // flush_start = gst_event_new_flush_start(); diff --git a/src/gstreamer/gstkvssink.cpp b/src/gstreamer/gstkvssink.cpp index 1ab436d3..a6d36c8b 100644 --- a/src/gstreamer/gstkvssink.cpp +++ b/src/gstreamer/gstkvssink.cpp @@ -1265,7 +1265,8 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, FRAME_FLAGS kinesis_video_flags = FRAME_FLAG_NONE; GstMapInfo info; - LOG_INFO("BUFFER RECEIVED"); + LOG_INFO(""); + LOG_INFO("******************************** BUFFER RECEIVED ********************************"); info.data = NULL; // eos reached @@ -1291,6 +1292,11 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, // goto CleanUp; // } + if (buf == NULL && track_data == NULL) { + LOG_INFO("INVALID BUFFER RECEIVED"); + } + + if (STATUS_FAILED(stream_status)) { // in offline case, we cant tell the pipeline to restream the file again in case of network outage. // therefore error out and let higher level application do the retry. @@ -1324,21 +1330,8 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, goto CleanUp; } - // In offline mode, if user specifies a file_start_time, the stream will be configured to use absolute - // timestamp. Therefore in here we add the file_start_time to frame pts to create absolute timestamp. - // If user did not specify file_start_time, file_start_time will be 0 and has no effect. - if (IS_OFFLINE_STREAMING_MODE(kvssink->streaming_type)) { - if(!data->use_original_pts) { - buf->dts = 0; // if offline mode, i.e. streaming a file, the dts from gstreamer is undefined. - buf->pts += data->pts_base; - } - else { - buf->pts = buf->dts; - } - } else if (!GST_BUFFER_DTS_IS_VALID(buf)) { - LOG_INFO("UPDATING TO NEW TS "); - buf->dts = data->last_dts + DEFAULT_FRAME_DURATION_MS * HUNDREDS_OF_NANOS_IN_A_MILLISECOND * DEFAULT_TIME_UNIT_IN_NANOS; - } + LOG_INFO("Starting pts: " << buf->pts); + data->last_dts = buf->dts; track_id = kvs_sink_track_data->track_id; @@ -1367,30 +1360,38 @@ gst_kvs_sink_handle_buffer (GstCollectPads * pads, } if (!IS_OFFLINE_STREAMING_MODE(kvssink->streaming_type)) { if (data->first_pts == GST_CLOCK_TIME_NONE) { + LOG_INFO("Updating first_pts to: " << buf->pts); data->first_pts = buf->pts; } if (data->producer_start_time == GST_CLOCK_TIME_NONE) { - data->producer_start_time = (uint64_t) chrono::duration_cast( + uint64_t nowTime = (uint64_t) chrono::duration_cast( systemCurrentTime().time_since_epoch()).count(); + LOG_INFO("Updating producer_start_time to: " << nowTime); + data->producer_start_time = nowTime ; } if(!data->use_original_pts) { + LOG_INFO("Incrementing buf->pts by: producer_start_time - first_pts"); + LOG_INFO("= " << data->producer_start_time << " - " << data->first_pts); + LOG_INFO("= " << (data->producer_start_time - data->first_pts)); buf->pts += data->producer_start_time - data->first_pts; + LOG_INFO("buf->pts is now: " << buf->pts); } else { buf->pts = buf->dts; } } - LOG_INFO("Puting frame..."); - LOG_INFO("With pts: " << buf->pts); - LOG_INFO("With dts: " << (buf->dts + kvssink->pause_time) / 1000000); + + LOG_INFO("With pts: " << (buf->pts - data->producer_start_time) / 1); //1000000 + LOG_INFO("With dts: " << buf->dts / 1); put_frame(kvssink->data, info.data, info.size, - std::chrono::nanoseconds( (uint64_t) chrono::duration_cast( - systemCurrentTime().time_since_epoch()).count()), - std::chrono::nanoseconds( (uint64_t) chrono::duration_cast( - systemCurrentTime().time_since_epoch()).count() + kvssink->pause_time), kinesis_video_flags, track_id, data->frame_count); + std::chrono::nanoseconds(buf->pts), + std::chrono::nanoseconds(buf->dts), kinesis_video_flags, track_id, data->frame_count); data->frame_count++; + LOG_INFO("*********************************************************************************"); + LOG_INFO(""); + } else { LOG_WARN("GStreamer buffer is invalid for " << kvssink->stream_name); @@ -1653,8 +1654,8 @@ gst_kvs_sink_change_state(GstElement *element, GstStateChange transition) { data->streamingStopped.store(false); try { init_track_data(kvssink); - kvssink->data->first_pts = GST_CLOCK_TIME_NONE; - kvssink->data->producer_start_time = GST_CLOCK_TIME_NONE; + // kvssink->data->first_pts = GST_CLOCK_TIME_NONE; + // kvssink->data->producer_start_time = GST_CLOCK_TIME_NONE; } catch (runtime_error &err) { oss << "Failed to init track data. Error: " << err.what(); err_msg = oss.str(); @@ -1668,11 +1669,13 @@ gst_kvs_sink_change_state(GstElement *element, GstStateChange transition) { gst_collect_pads_start (kvssink->collect); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + // kvssink->data->first_pts = GST_CLOCK_TIME_NONE; + // kvssink->data->producer_start_time = GST_CLOCK_TIME_NONE; // flush_stop = gst_event_new_flush_stop((gboolean)FALSE); // gst_element_send_event(element, flush_stop); // kvssink->data->first_pts = GST_CLOCK_TIME_NONE; // kvssink->data->producer_start_time = GST_CLOCK_TIME_NONE; - //data->streamingStopped.store(false); + // data->streamingStopped.store(false); // if (kvssink->pause_time != 0) { // kvssink->pause_time = (uint64_t) chrono::duration_cast( @@ -1709,13 +1712,15 @@ gst_kvs_sink_change_state(GstElement *element, GstStateChange transition) { // flush_start = gst_event_new_flush_start(); // gst_element_send_event(element, flush_start); - GSList *walk; - for (walk = kvssink->collect->data; walk; walk = g_slist_next (walk)) { - GstCollectData *c_data; - c_data = (GstCollectData *) walk->data; - gst_collect_pads_pop (kvssink->collect, c_data) ; - } - + + // GSList *walk; + // for (walk = kvssink->collect->data; walk; walk = g_slist_next (walk)) { + // GstCollectData *c_data; + // c_data = (GstCollectData *) walk->data; + // gst_collect_pads_pop (kvssink->collect, c_data); + // } + + //data->streamingStopped.store(true); //put_eofr_frame(data);