diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 76987b3c7..a62b22f1d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -295,31 +295,34 @@ jobs: # ulimit -c unlimited -S # timeout --signal=SIGABRT 150m ./tst/producer_test --gtest_break_on_failure - # thread-sanitizer: - # runs-on: ubuntu-20.04 - # permissions: - # id-token: write - # contents: read - # env: - # CC: clang - # CXX: clang++ - # AWS_KVS_LOG_LEVEL: 2 - # steps: - # - name: Clone repository - # uses: actions/checkout@v3 - # - name: Configure AWS Credentials - # uses: aws-actions/configure-aws-credentials@v1-node16 - # with: - # role-to-assume: ${{ secrets.AWS_ROLE_TO_ASSUME }} - # role-session-name: ${{ secrets.AWS_ROLE_SESSION_NAME }} - # aws-region: ${{ secrets.AWS_REGION }} - # - name: Build repository - # run: | - # mkdir build && cd build - # cmake .. -DBUILD_TEST=TRUE -DTHREAD_SANITIZER=TRUE -DBUILD_COMMON_LWS=TRUE - # make - # ulimit -c unlimited -S - # timeout --signal=SIGABRT 150m ./tst/producer_test --gtest_break_on_failure + #thread-sanitizer: + # runs-on: ubuntu-20.04 + # permissions: + # id-token: write + # contents: read + # env: + # CC: clang + # CXX: clang++ + # AWS_KVS_LOG_LEVEL: 2 + # steps: + # - name: Clone repository + # uses: actions/checkout@v3 + # - name: Configure AWS Credentials + # uses: aws-actions/configure-aws-credentials@v1-node16 + # with: + # role-to-assume: ${{ secrets.AWS_ROLE_TO_ASSUME }} + # role-session-name: ${{ secrets.AWS_ROLE_SESSION_NAME }} + # aws-region: ${{ secrets.AWS_REGION }} + # - name: Build repository + # run: | + # mkdir build && cd build + # cmake .. -DBUILD_TEST=TRUE -DTHREAD_SANITIZER=TRUE -DBUILD_COMMON_LWS=TRUE + # make + # - name: Run tests + # run: | + # cd build + # ulimit -c unlimited -S + # timeout --signal=SIGABRT 150m ./tst/producer_test --gtest_break_on_failure ubuntu-gcc: runs-on: ubuntu-20.04 diff --git a/CMake/Dependencies/libcurl-CMakeLists.txt b/CMake/Dependencies/libcurl-CMakeLists.txt index f0cb9f85e..74e44b520 100644 --- a/CMake/Dependencies/libcurl-CMakeLists.txt +++ b/CMake/Dependencies/libcurl-CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.6.3) -project(libcurl-download NONE) +project(libcurl-download LANGUAGES C) find_program(MAKE_EXE NAMES make) diff --git a/CMake/Dependencies/libgtest-CMakeLists.txt b/CMake/Dependencies/libgtest-CMakeLists.txt index 916087e76..14e4a09cb 100644 --- a/CMake/Dependencies/libgtest-CMakeLists.txt +++ b/CMake/Dependencies/libgtest-CMakeLists.txt @@ -3,7 +3,6 @@ cmake_minimum_required(VERSION 3.6.3) project(libgtest-download NONE) include(ExternalProject) - ExternalProject_Add(libgtest-download GIT_REPOSITORY https://github.com/google/googletest.git GIT_TAG release-1.12.1 diff --git a/CMake/Dependencies/libkvspic-CMakeLists.txt b/CMake/Dependencies/libkvspic-CMakeLists.txt index c8ebe188d..4e52d8e09 100644 --- a/CMake/Dependencies/libkvspic-CMakeLists.txt +++ b/CMake/Dependencies/libkvspic-CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.6.3) -project(libkvspic-download NONE) +project(libkvspic-download LANGUAGES C) include(ExternalProject) diff --git a/CMake/Dependencies/libmbedtls-CMakeLists.txt b/CMake/Dependencies/libmbedtls-CMakeLists.txt index 4a0d5f318..edb782c68 100644 --- a/CMake/Dependencies/libmbedtls-CMakeLists.txt +++ b/CMake/Dependencies/libmbedtls-CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.6.3) -project(libmbedtls-download NONE) +project(libmbedtls-download LANGUAGES C) include(ExternalProject) diff --git a/CMake/Dependencies/libwebsockets-CMakeLists.txt b/CMake/Dependencies/libwebsockets-CMakeLists.txt index 3992337ef..3ada49e4b 100644 --- a/CMake/Dependencies/libwebsockets-CMakeLists.txt +++ b/CMake/Dependencies/libwebsockets-CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.6.3) -project(libwebsocket-download NONE) +project(libwebsocket-download LANGUAGES C) include(ExternalProject) diff --git a/src/source/Common/AwsV4Signer.c b/src/source/Common/AwsV4Signer.c index 3eddaf44c..19b5831ad 100644 --- a/src/source/Common/AwsV4Signer.c +++ b/src/source/Common/AwsV4Signer.c @@ -659,7 +659,7 @@ STATUS generateSignatureDateTime(UINT64 currentTime, PCHAR pDateTimeStr) // Convert to time_t timeT = (time_t) (currentTime / HUNDREDS_OF_NANOS_IN_A_SECOND); - retSize = STRFTIME(pDateTimeStr, SIGNATURE_DATE_TIME_STRING_LEN, DATE_TIME_STRING_FORMAT, GMTIME(&timeT)); + retSize = STRFTIME(pDateTimeStr, SIGNATURE_DATE_TIME_STRING_LEN, DATE_TIME_STRING_FORMAT, GMTIME_THREAD_SAFE(&timeT)); CHK(retSize > 0, STATUS_BUFFER_TOO_SMALL); pDateTimeStr[retSize] = '\0'; diff --git a/src/source/CurlApiCallbacks.c b/src/source/CurlApiCallbacks.c index 073dc5b22..c045e0e1e 100644 --- a/src/source/CurlApiCallbacks.c +++ b/src/source/CurlApiCallbacks.c @@ -1005,15 +1005,17 @@ STATUS createStreamCurl(UINT64 customData, PCHAR deviceName, PCHAR streamName, P CleanUp: + if (startLocked) { + // Release the lock to let the awaiting handler thread to continue + pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock); + } + if (STATUS_FAILED(retStatus)) { if (IS_VALID_TID_VALUE(threadId)) { THREAD_CANCEL(threadId); } freeCurlRequest(&pCurlRequest); - } else if (startLocked) { - // Release the lock to let the awaiting handler thread to continue - pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock); } if (shutdownLocked) { @@ -1237,15 +1239,17 @@ STATUS describeStreamCurl(UINT64 customData, PCHAR streamName, PServiceCallConte CleanUp: + if (startLocked) { + // Release the lock to let the awaiting handler thread to continue + pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock); + } + if (STATUS_FAILED(retStatus)) { if (IS_VALID_TID_VALUE(threadId)) { THREAD_CANCEL(threadId); } freeCurlRequest(&pCurlRequest); - } else if (startLocked) { - // Release the lock to let the awaiting handler thread to continue - pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock); } if (shutdownLocked) { @@ -1548,15 +1552,18 @@ STATUS getStreamingEndpointCurl(UINT64 customData, PCHAR streamName, PCHAR apiNa CleanUp: + if (startLocked) { + // Release the lock to let the awaiting handler thread to continue. + // This needs to be done before freeCurlRequest because there we will free the startLock mutex + pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock); + } + if (STATUS_FAILED(retStatus)) { if (IS_VALID_TID_VALUE(threadId)) { THREAD_CANCEL(threadId); } freeCurlRequest(&pCurlRequest); - } else if (startLocked) { - // Release the lock to let the awaiting handler thread to continue - pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock); } if (shutdownLocked) { @@ -1877,15 +1884,17 @@ STATUS tagResourceCurl(UINT64 customData, PCHAR streamArn, UINT32 tagCount, PTag CleanUp: + if (startLocked) { + // Release the lock to let the awaiting handler thread to continue + pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock); + } + if (STATUS_FAILED(retStatus)) { if (IS_VALID_TID_VALUE(threadId)) { THREAD_CANCEL(threadId); } freeCurlRequest(&pCurlRequest); - } else if (startLocked) { - // Release the lock to let the awaiting handler thread to continue - pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock); } if (shutdownLocked) { diff --git a/src/source/Response.c b/src/source/Response.c index 9b0f8397e..b96c05108 100644 --- a/src/source/Response.c +++ b/src/source/Response.c @@ -31,7 +31,7 @@ STATUS createCurlResponse(PCurlRequest pCurlRequest, PCurlResponse* ppCurlRespon // init putMedia related members pCurlResponse->endOfStream = FALSE; - pCurlResponse->paused = TRUE; + ATOMIC_STORE_BOOL(&pCurlResponse->paused, TRUE); pCurlResponse->debugDumpFile = FALSE; pCurlResponse->debugDumpFilePath[0] = '\0'; @@ -456,8 +456,8 @@ STATUS notifyDataAvailable(PCurlResponse pCurlResponse, UINT64 durationAvailable DLOGV("[%s] Note data received: duration(100ns): %" PRIu64 " bytes %" PRIu64 " for stream handle %" PRIu64, pCurlResponse->pCurlRequest->streamName, durationAvailable, sizeAvailable, pCurlResponse->pCurlRequest->uploadHandle); - if (pCurlResponse->paused && pCurlResponse->pCurl != NULL) { - pCurlResponse->paused = FALSE; + if (ATOMIC_LOAD_BOOL(&pCurlResponse->paused) && pCurlResponse->pCurl != NULL) { + ATOMIC_STORE_BOOL(&pCurlResponse->paused, FALSE); // frequent pause unpause causes curl segfault in offline scenario THREAD_SLEEP(10 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND); // un-pause curl @@ -634,7 +634,7 @@ SIZE_T postReadCallback(PCHAR pBuffer, SIZE_T size, SIZE_T numItems, PVOID custo pCurlApiCallbacks = pCurlRequest->pCurlApiCallbacks; uploadHandle = pCurlResponse->pCurlRequest->uploadHandle; - if (pCurlResponse->paused) { + if (ATOMIC_LOAD_BOOL(&pCurlResponse->paused)) { bytesWritten = CURL_READFUNC_PAUSE; CHK(FALSE, retStatus); } @@ -721,7 +721,7 @@ SIZE_T postReadCallback(PCHAR pBuffer, SIZE_T size, SIZE_T numItems, PVOID custo } } } else if (bytesWritten == CURL_READFUNC_PAUSE) { - pCurlResponse->paused = TRUE; + ATOMIC_STORE_BOOL(&pCurlResponse->paused, TRUE); } // Since curl is about to terminate gracefully, set flag to prevent shutdown thread from timing it out. diff --git a/src/source/Response.h b/src/source/Response.h index 670b268e4..5f625f4a9 100644 --- a/src/source/Response.h +++ b/src/source/Response.h @@ -62,7 +62,7 @@ struct __CurlResponse { BOOL endOfStream; // Whether curl is paused - volatile BOOL paused; + volatile ATOMIC_BOOL paused; // Whether to dump streaming session into mkv file BOOL debugDumpFile; diff --git a/tst/ProducerClientBasicTest.cpp b/tst/ProducerClientBasicTest.cpp index 22a97a0e6..85f5f0681 100644 --- a/tst/ProducerClientBasicTest.cpp +++ b/tst/ProducerClientBasicTest.cpp @@ -10,8 +10,8 @@ class ProducerClientBasicTest : public ProducerClientTestBase { mStreamsCreated = CVAR_CREATE(); MEMSET(mClients, 0x00, SIZEOF(mClients)); MEMSET(mClientCallbacks, 0x00, SIZEOF(mClientCallbacks)); - mActiveStreamCount = 0; - mActiveClientCount = 0; + ATOMIC_STORE(&mActiveStreamCount, 0); + ATOMIC_STORE(&mActiveClientCount, 0); } VOID deinitialize() @@ -35,8 +35,8 @@ class ProducerClientBasicTest : public ProducerClientTestBase { CVAR mStreamsCreated; CLIENT_HANDLE mClients[TEST_STREAM_COUNT]; PClientCallbacks mClientCallbacks[TEST_STREAM_COUNT]; - volatile UINT32 mActiveStreamCount; - volatile UINT32 mActiveClientCount; + volatile SIZE_T mActiveStreamCount; + volatile SIZE_T mActiveClientCount; }; extern ProducerClientTestBase* gProducerClientTestBase; @@ -137,7 +137,8 @@ PVOID ProducerClientBasicTest::staticCreateProducerClientRoutine(PVOID arg) EXPECT_EQ(STATUS_SUCCESS, retStatus = createKinesisVideoStreamSync(pTest->mClients[index], &streamInfo, &pTest->mStreams[index])); - if (++pTest->mActiveStreamCount == TEST_STREAM_COUNT) { + ATOMIC_INCREMENT(&pTest->mActiveStreamCount); + if (ATOMIC_LOAD(&pTest->mActiveStreamCount) == TEST_STREAM_COUNT) { CVAR_SIGNAL(pTest->mStreamsCreated); } @@ -196,7 +197,8 @@ PVOID ProducerClientBasicTest::staticCreateProducerRoutine(PVOID arg) retStatus = createKinesisVideoStreamSync(pTest->mClientHandle, &streamInfo, &pTest->mStreams[index]); - if (++pTest->mActiveStreamCount == TEST_STREAM_COUNT) { + ATOMIC_INCREMENT(&pTest->mActiveStreamCount); + if (ATOMIC_LOAD(&pTest->mActiveStreamCount) == TEST_STREAM_COUNT) { CVAR_SIGNAL(pTest->mStreamsCreated); } @@ -223,10 +225,10 @@ PVOID ProducerClientBasicTest::staticProducerClientStartRoutine(PVOID arg) EXPECT_EQ(STATUS_SUCCESS, kinesisVideoStreamGetStreamInfo(streamHandle, &pStreamInfo)); // Set an indicator that the producer is not stopped - pTest->mProducerStopped = FALSE; + ATOMIC_STORE_BOOL(&pTest->mProducerStopped, FALSE); // Increment the active stream/producer count - pTest->mActiveClientCount++; + ATOMIC_INCREMENT(&pTest->mActiveClientCount); // Loop until cancelled frame.version = FRAME_CURRENT_VERSION; @@ -245,7 +247,7 @@ PVOID ProducerClientBasicTest::staticProducerClientStartRoutine(PVOID arg) EXPECT_EQ(STATUS_SUCCESS, kinesisVideoStreamFormatChanged(streamHandle, cpdSize, cpd, DEFAULT_VIDEO_TRACK_ID)); - while (!pTest->mStopProducer) { + while (!ATOMIC_LOAD_BOOL(&pTest->mStopProducer)) { // Produce frames timestamp = GETTIME(); @@ -319,8 +321,9 @@ PVOID ProducerClientBasicTest::staticProducerClientStartRoutine(PVOID arg) pTest->mStreams[streamIndex] = INVALID_STREAM_HANDLE_VALUE; // Indicate that the producer routine had stopped - if (--pTest->mActiveClientCount == 0) { - pTest->mProducerStopped = true; + ATOMIC_DECREMENT(&pTest->mActiveClientCount); + if (ATOMIC_LOAD(&pTest->mActiveClientCount) == 0) { + ATOMIC_STORE_BOOL(&pTest->mProducerStopped, TRUE); } return NULL; @@ -356,7 +359,7 @@ PVOID ProducerClientTestBase::basicProducerRoutine(STREAM_HANDLE streamHandle, S EXPECT_EQ(STATUS_SUCCESS, kinesisVideoStreamGetStreamInfo(streamHandle, &pStreamInfo)); // Set an indicator that the producer is not stopped - mProducerStopped = FALSE; + ATOMIC_STORE_BOOL(&mProducerStopped, FALSE); // Loop until cancelled frame.version = FRAME_CURRENT_VERSION; @@ -375,7 +378,7 @@ PVOID ProducerClientTestBase::basicProducerRoutine(STREAM_HANDLE streamHandle, S EXPECT_EQ(STATUS_SUCCESS, kinesisVideoStreamFormatChanged(streamHandle, cpdSize, cpd, DEFAULT_VIDEO_TRACK_ID)); - while (!mStopProducer) { + while (!ATOMIC_LOAD_BOOL(&mStopProducer)) { // Produce frames if (IS_OFFLINE_STREAMING_MODE(streamingType)) { timestamp += frame.duration; @@ -459,7 +462,7 @@ EXPECT_TRUE(kinesis_video_stream->putFrame(eofr)); EXPECT_EQ(STATUS_SUCCESS, stopKinesisVideoStreamSync(streamHandle)) << "Timed out awaiting for the stream stop notification"; // Indicate that the producer routine had stopped - mProducerStopped = true; + ATOMIC_STORE_BOOL(&mProducerStopped, TRUE); return NULL; } @@ -489,7 +492,7 @@ TEST_F(ProducerClientBasicTest, create_produce_stream) for (UINT32 iter = 0; iter < 10; iter++) { THREAD_SLEEP(10 * HUNDREDS_OF_NANOS_IN_A_SECOND); DLOGD("Stopping the streams"); - mStopProducer = TRUE; + ATOMIC_STORE_BOOL(&mStopProducer, TRUE); DLOGD("Waiting for the streams to finish and close..."); THREAD_SLEEP(10 * HUNDREDS_OF_NANOS_IN_A_SECOND); @@ -501,7 +504,7 @@ TEST_F(ProducerClientBasicTest, create_produce_stream) } DLOGD("Starting the streams again"); - mStopProducer = FALSE; + ATOMIC_STORE_BOOL(&mStopProducer, FALSE); // Create new streams for (UINT32 i = 0; i < TEST_STREAM_COUNT; i++) { @@ -521,7 +524,7 @@ TEST_F(ProducerClientBasicTest, create_produce_stream) THREAD_SLEEP(2*TEST_EXECUTION_DURATION); // Indicate the cancel for the threads - mStopProducer = TRUE; + ATOMIC_STORE_BOOL(&mStopProducer, TRUE); // Join the thread and wait to exit. // NOTE: This is not a right way of doing it as for the multiple stream scenario @@ -531,9 +534,9 @@ TEST_F(ProducerClientBasicTest, create_produce_stream) UINT32 index = 0; do { THREAD_SLEEP(100 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND); - } while (index++ < 300 && !mProducerStopped); + } while (index++ < 300 && !ATOMIC_LOAD_BOOL(&mProducerStopped)); - EXPECT_TRUE(mProducerStopped) << "Producer thread failed to stop cleanly"; + EXPECT_TRUE(ATOMIC_LOAD_BOOL(&mProducerStopped)) << "Producer thread failed to stop cleanly"; // We will block for some time due to an incorrect implementation of the awaiting code // NOTE: The proper implementation should use synchronization primitives to await for the @@ -576,14 +579,14 @@ TEST_F(ProducerClientBasicTest, create_produce_stream_parallel) THREAD_SLEEP(2*TEST_EXECUTION_DURATION); // Indicate the cancel for the threads - mStopProducer = TRUE; + ATOMIC_STORE_BOOL(&mStopProducer, TRUE); UINT32 index = 0; do { THREAD_SLEEP(100 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND); - } while (index++ < 300 && !mProducerStopped); + } while (index++ < 300 && !ATOMIC_LOAD_BOOL(&mProducerStopped)); - EXPECT_TRUE(mProducerStopped) << "Producer thread failed to stop cleanly"; + EXPECT_TRUE(ATOMIC_LOAD_BOOL(&mProducerStopped)) << "Producer thread failed to stop cleanly"; THREAD_SLEEP(10 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND); @@ -622,14 +625,14 @@ TEST_F(ProducerClientBasicTest, create_produce_client_parallel) THREAD_SLEEP(2*TEST_EXECUTION_DURATION); // Indicate the cancel for the threads - mStopProducer = TRUE; + ATOMIC_STORE_BOOL(&mStopProducer, TRUE); UINT32 index = 0; do { THREAD_SLEEP(100 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND); - } while (index++ < 300 && !mProducerStopped); + } while (index++ < 300 && !ATOMIC_LOAD_BOOL(&mProducerStopped)); - EXPECT_TRUE(mProducerStopped) << "Producer thread failed to stop cleanly"; + EXPECT_TRUE(ATOMIC_LOAD_BOOL(&mProducerStopped)) << "Producer thread failed to stop cleanly"; THREAD_SLEEP(10 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND); @@ -662,7 +665,7 @@ TEST_F(ProducerClientBasicTest, cachingEndpointProvider_Returns_EndpointFromCach THREAD_SLEEP(TEST_STREAMING_TOKEN_DURATION * ITERATION_COUNT); // Indicate the cancel for the threads - mStopProducer = TRUE; + ATOMIC_STORE_BOOL(&mStopProducer, TRUE); // Join the thread and wait to exit. // NOTE: This is not a right way of doing it as for the multiple stream scenario @@ -672,9 +675,9 @@ TEST_F(ProducerClientBasicTest, cachingEndpointProvider_Returns_EndpointFromCach UINT32 index = 0; do { THREAD_SLEEP(100 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND); - } while (index++ < 300 && !mProducerStopped); + } while (index++ < 300 && !ATOMIC_LOAD_BOOL(&mProducerStopped)); - EXPECT_TRUE(mProducerStopped) << "Producer thread failed to stop cleanly"; + EXPECT_TRUE(ATOMIC_LOAD_BOOL(&mProducerStopped)) << "Producer thread failed to stop cleanly"; // Expect the number of calls EXPECT_EQ(((ITERATION_COUNT + 1) * TEST_STREAM_COUNT), mPutStreamFnCount); diff --git a/tst/ProducerTestFixture.cpp b/tst/ProducerTestFixture.cpp index 3309a98c8..902ca1d65 100644 --- a/tst/ProducerTestFixture.cpp +++ b/tst/ProducerTestFixture.cpp @@ -181,9 +181,7 @@ ProducerClientTestBase::ProducerClientTestBase() : mAccessKeyIdSet(FALSE), mCaCertPath(NULL), mProducerThread(INVALID_TID_VALUE), - mProducerStopped(FALSE), mStartProducer(FALSE), - mStopProducer(FALSE), mAccessKey(NULL), mSecretKey(NULL), mSessionToken(NULL), @@ -245,6 +243,8 @@ ProducerClientTestBase::ProducerClientTestBase() : } SET_LOGGER_LOG_LEVEL(this->loggerLogLevel); } + ATOMIC_STORE_BOOL(&mStopProducer, FALSE); + ATOMIC_STORE_BOOL(&mProducerStopped, FALSE); // Store the function pointers gTotalProducerClientMemoryUsage = 0; @@ -542,7 +542,7 @@ STATUS ProducerClientTestBase::createTestStream(UINT32 index, STREAMING_TYPE str VOID ProducerClientTestBase::freeStreams(BOOL sync) { - mProducerStopped = TRUE; + ATOMIC_STORE_BOOL(&mProducerStopped, TRUE); for (UINT32 i = 0; i < TEST_STREAM_COUNT; i++) { DLOGD("Freeing stream index %u with handle value %" PRIu64 " %s", i, mStreams[i], sync ? "synchronously" : "asynchronously"); @@ -568,6 +568,7 @@ STATUS ProducerClientTestBase::curlEasyPerformHookFunc(PCurlResponse pCurlRespon // Get the test object ProducerClientTestBase* pTest = (ProducerClientTestBase*) pCurlResponse->pCurlRequest->pCurlApiCallbacks->hookCustomData; + MUTEX_LOCK(pTest->mTestCallbackLock); DLOGV("Curl perform hook for %s", pCurlResponse->pCurlRequest->requestInfo.url); @@ -630,6 +631,7 @@ STATUS ProducerClientTestBase::curlEasyPerformHookFunc(PCurlResponse pCurlRespon pTest->mPutMediaCallResult = SERVICE_CALL_RESULT_OK; } } + MUTEX_UNLOCK(pTest->mTestCallbackLock); return retStatus; } @@ -647,6 +649,7 @@ STATUS ProducerClientTestBase::curlWriteCallbackHookFunc(PCurlResponse pCurlResp // Get the test object ProducerClientTestBase* pTest = (ProducerClientTestBase*) pCurlResponse->pCurlRequest->pCurlApiCallbacks->hookCustomData; + MUTEX_LOCK(pTest->mTestCallbackLock); pTest->mWriteCallbackFnCount++; @@ -654,6 +657,7 @@ STATUS ProducerClientTestBase::curlWriteCallbackHookFunc(PCurlResponse pCurlResp *ppRetBuffer = pTest->mWriteBuffer; *pRetDataSize = pTest->mWriteDataSize; } + MUTEX_UNLOCK(pTest->mTestCallbackLock); return pTest->mWriteStatus; } @@ -678,6 +682,7 @@ STATUS ProducerClientTestBase::curlReadCallbackHookFunc(PCurlResponse pCurlRespo // Get the test object ProducerClientTestBase* pTest = (ProducerClientTestBase*) pCurlResponse->pCurlRequest->pCurlApiCallbacks->hookCustomData; + MUTEX_LOCK(pTest->mTestCallbackLock); pTest->mReadCallbackFnCount++; @@ -686,16 +691,17 @@ STATUS ProducerClientTestBase::curlReadCallbackHookFunc(PCurlResponse pCurlRespo } else { pTest->mReadStatus = status; } + MUTEX_UNLOCK(pTest->mTestCallbackLock); return pTest->mReadStatus; } STATUS ProducerClientTestBase::testFreeApiCallbackFunc(PUINT64 customData) { - ProducerClientTestBase* pTestBase = (ProducerClientTestBase*) *customData; - - pTestBase->mFreeApiCallbacksFnCount++; - + ProducerClientTestBase* pTest = (ProducerClientTestBase*) *customData; + MUTEX_LOCK(pTest->mTestCallbackLock); + pTest->mFreeApiCallbacksFnCount++; + MUTEX_UNLOCK(pTest->mTestCallbackLock); return STATUS_SUCCESS; } diff --git a/tst/ProducerTestFixture.h b/tst/ProducerTestFixture.h index 2754a7e29..ce6910d22 100644 --- a/tst/ProducerTestFixture.h +++ b/tst/ProducerTestFixture.h @@ -272,8 +272,8 @@ class ProducerClientTestBase : public ::testing::Test { STREAM_HANDLE mStreams[TEST_MAX_STREAM_COUNT]; volatile bool mStartProducer; - volatile bool mStopProducer; - volatile bool mProducerStopped; + volatile ATOMIC_BOOL mStopProducer; + volatile ATOMIC_BOOL mProducerStopped; // Test callbacks ApiCallbacks mApiCallbacks;