diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index fd6a1b212..0ae656e33 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -10,6 +10,9 @@ on: coverage: type: string default: "OFF" + dev_mode: + type: string + default: "OFF" lib_msg_delivery: type: string default: "OFF" @@ -40,6 +43,12 @@ on: type: string description: "Ubuntu version to use, e.g. '20.04'" default: "latest" + verbose_test_output: + type: string + default: "OFF" + verbose_make_output: + type: string + default: "ON" secrets: CODECOV_TOKEN: description: "Codecov repo token" @@ -83,6 +92,12 @@ jobs: if [[ "${{ inputs.coverage }}" == "ON" ]]; then flags="$flags -DNATS_COVERAGE=ON" fi + if [[ "${{ inputs.dev_mode }}" == "ON" ]]; then + flags="$flags -DDEV_MODE=ON" + fi + if [[ "${{ inputs.verbose_make_output }}" == "ON" ]]; then + flags="$flags -DCMAKE_VERBOSE_MAKEFILE=ON" + fi echo "flags=$flags" >> $GITHUB_OUTPUT - id: nats-vars @@ -158,11 +173,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} 2>&1 | tee /tmp/out.txt - if [[ $(grep -q 'ThreadSanitizer: ' /tmp/out.txt; echo $?) == 0 ]]; then - echo "!!! ThreadSanitizer detected WARNING(s) !!!" - exit 1 - fi + ctest --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov # PRs from external contributors fail: https://github.com/codecov/feedback/issues/301 diff --git a/.github/workflows/on-pr-debug.yml b/.github/workflows/on-pr-debug.yml index eccc0f9f2..089cd7f58 100644 --- a/.github/workflows/on-pr-debug.yml +++ b/.github/workflows/on-pr-debug.yml @@ -28,36 +28,51 @@ jobs: secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - sanitize-addr: - name: "Sanitize address" + coverage-pooled: + name: "Coverage" uses: ./.github/workflows/build-test.yml with: - sanitize: address + coverage: ON server_version: main type: Debug + lib_msg_delivery: ON + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - sanitize-addr-lib-msg-delivery: - name: "Sanitize address (lib_msg_delivery)" + dev-mode: + name: "DEV_MODE" uses: ./.github/workflows/build-test.yml with: - sanitize: address + dev_mode: ON server_version: main - lib_msg_delivery: ON + type: Debug + verbose_test_output: ON + verbose_make_output: ON - san-addr: - name: "Sanitize address (lib_write_deadline)" + sanitize: + name: "Sanitize" + strategy: + fail-fast: false + matrix: + compiler: [gcc, clang] + sanitize: [address, thread] + pooled_delivery: [ON, OFF] uses: ./.github/workflows/build-test.yml with: - sanitize: address server_version: main - lib_write_deadline: ON + type: Debug + compiler: ${{ matrix.compiler }} + sanitize: ${{ matrix.sanitize }} + lib_msg_delivery: ${{ matrix.pooled_delivery }} - san-thread: - name: "Sanitize thread" + san-addr-deadline: + name: "Sanitize address (lib_write_deadline)" uses: ./.github/workflows/build-test.yml with: - sanitize: thread + type: Debug + sanitize: address server_version: main + lib_write_deadline: ON Windows: name: "Windows" diff --git a/.github/workflows/on-push-release.yml b/.github/workflows/on-push-release.yml index d47c20308..50513575c 100644 --- a/.github/workflows/on-push-release.yml +++ b/.github/workflows/on-push-release.yml @@ -22,3 +22,26 @@ jobs: server_version: main ubuntu_version: ${{ matrix.ubuntu_version }} compiler: ${{ matrix.compiler }} + + dev-mode: + name: "DEV_MODE" + uses: ./.github/workflows/build-test.yml + with: + dev_mode: ON + server_version: main + verbose_test_output: ON + verbose_make_output: ON + + sanitize-addr: + name: "Sanitize address" + uses: ./.github/workflows/build-test.yml + with: + sanitize: address + server_version: main + + san-thread: + name: "Sanitize thread" + uses: ./.github/workflows/build-test.yml + with: + sanitize: thread + server_version: main diff --git a/src/conn.c b/src/conn.c index 1b405c852..7cfe9beb1 100644 --- a/src/conn.c +++ b/src/conn.c @@ -1137,17 +1137,29 @@ _resendSubscriptions(natsConnection *nc) adjustedMax = 0; natsSub_Lock(sub); + if (sub->libDlvWorker != NULL) + { + natsMutex_Lock(sub->libDlvWorker->lock); + } // If JS ordered consumer, trigger a reset. Don't check the error // condition here. If there is a failure, it will be retried // at the next HB interval. if ((sub->jsi != NULL) && (sub->jsi->ordered)) { jsSub_resetOrderedConsumer(sub, sub->jsi->sseq+1); + if (sub->libDlvWorker != NULL) + { + natsMutex_Unlock(sub->libDlvWorker->lock); + } natsSub_Unlock(sub); continue; } if (natsSub_drainStarted(sub)) { + if (sub->libDlvWorker != NULL) + { + natsMutex_Unlock(sub->libDlvWorker->lock); + } natsSub_Unlock(sub); continue; } @@ -1160,6 +1172,10 @@ _resendSubscriptions(natsConnection *nc) // messages have reached the max, if so, unsubscribe. if (adjustedMax == 0) { + if (sub->libDlvWorker != NULL) + { + natsMutex_Unlock(sub->libDlvWorker->lock); + } natsSub_Unlock(sub); s = natsConn_sendUnsubProto(nc, sub->sid, 0); continue; @@ -1172,6 +1188,10 @@ _resendSubscriptions(natsConnection *nc) // Hold the lock up to that point so we are sure not to resend // any SUB/UNSUB for a subscription that is in draining mode. + if (sub->libDlvWorker != NULL) + { + natsMutex_Unlock(sub->libDlvWorker->lock); + } natsSub_Unlock(sub); } @@ -3435,7 +3455,7 @@ natsConnection_Reconnect(natsConnection *nc) return nats_setDefaultError(NATS_CONNECTION_CLOSED); } - natsSock_Close(nc->sockCtx.fd); + natsSock_Shutdown(nc->sockCtx.fd); natsConn_Unlock(nc); return NATS_OK; diff --git a/src/js.c b/src/js.c index e31dd2f0a..3d3e4bb18 100644 --- a/src/js.c +++ b/src/js.c @@ -2045,6 +2045,10 @@ _hbTimerFired(natsTimer *timer, void* closure) natsStatus s = NATS_OK; natsSub_Lock(sub); + if (sub->libDlvWorker != NULL) + { + natsMutex_Lock(sub->libDlvWorker->lock); + } alert = !jsi->active; oc = jsi->ordered; jsi->active = false; @@ -2062,10 +2066,18 @@ _hbTimerFired(natsTimer *timer, void* closure) natsCondition_Signal(sub->cond); natsTimer_Stop(timer); } + if (sub->libDlvWorker != NULL) + { + natsMutex_Unlock(sub->libDlvWorker->lock); + } natsSub_Unlock(sub); return; } nc = sub->conn; + if (sub->libDlvWorker != NULL) + { + natsMutex_Unlock(sub->libDlvWorker->lock); + } natsSub_Unlock(sub); if (!alert) @@ -2075,12 +2087,20 @@ _hbTimerFired(natsTimer *timer, void* closure) if (oc) { natsSub_Lock(sub); + if (sub->libDlvWorker != NULL) + { + natsMutex_Lock(sub->libDlvWorker->lock); + } if (!sub->closed) { // If we fail in that call, we will report to async err callback // (if one is specified). s = jsSub_resetOrderedConsumer(sub, sub->jsi->sseq+1); } + if (sub->libDlvWorker != NULL) + { + natsMutex_Unlock(sub->libDlvWorker->lock); + } natsSub_Unlock(sub); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 58a4aa14f..c15ccfc62 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -35,7 +35,6 @@ file(STRINGS list.txt listOfTestNames) # For each test name foreach(name ${listOfTestNames}) - # Create a test and pass the index (start and end are the same) # to the testsuite executable add_test(NAME Test_${name} @@ -45,6 +44,12 @@ foreach(name ${listOfTestNames}) # Make sure the test passes set_tests_properties(Test_${name} PROPERTIES PASS_REGULAR_EXPRESSION "ALL PASSED") + # Set TSAN_OPTIONS for the test + if(NATS_SANITIZE) + set_tests_properties(Test_${name} PROPERTIES + ENVIRONMENT "TSAN_OPTIONS=detect_deadlocks=1:second_deadlock_stack=1:halt_on_error=1:report_signal_unsafe=1") + endif(NATS_SANITIZE) + # Bump the test index number math(EXPR testIndex "${testIndex}+1") endforeach() diff --git a/test/test.c b/test/test.c index a2c898038..12a92c4d5 100644 --- a/test/test.c +++ b/test/test.c @@ -5477,6 +5477,7 @@ test_natsSrvVersionAtLeast(void) { s = NATS_ERR; } + natsConn_Unlock(nc); } } testCond(s == NATS_OK); @@ -13337,7 +13338,10 @@ test_ClientAutoUnsubAndReconnect(void) nats_Sleep(10); test("Received no more than max: "); - testCond((s == NATS_OK) && (arg.sum == 10)); + natsMutex_Lock(arg.m); + int sum = arg.sum; + natsMutex_Unlock(arg.m); + testCond((s == NATS_OK) && (sum == 10)); natsSubscription_Destroy(sub); natsConnection_Destroy(nc); @@ -13390,6 +13394,7 @@ test_AutoUnsubNoUnsubOnDestroy(void) natsMutex_Lock(arg.m); while ((s != NATS_TIMEOUT) && !arg.done) s = natsCondition_TimedWait(arg.c, arg.m, 2000); + natsMutex_Unlock(arg.m); testCond(s == NATS_OK); natsConnection_Destroy(nc); @@ -20252,6 +20257,7 @@ test_ForcedReconnect(void) CHECK_SERVER_STARTED(pid); IFOK(s, natsOptions_Create(&opts)); IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg)); + IFOK(s, natsOptions_SetReconnectWait(opts, 100)); IFOK(s, natsConnection_Connect(&nc, opts)); IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo")); testCond(s == NATS_OK); @@ -20800,6 +20806,7 @@ test_EventLoop(void) natsMutex_Lock(arg.m); if (arg.attached != 2 || !arg.detached) s = NATS_ERR; + natsMutex_Unlock(arg.m); testCond(s == NATS_OK); natsSubscription_Destroy(sub); @@ -29485,6 +29492,9 @@ _jsOrderedErrHandler(natsConnection *nc, natsSubscription *subscription, natsSta { struct threadArg *args = (struct threadArg*) closure; + if (err != NATS_MISSED_HEARTBEAT) + return; + natsMutex_Lock(args->m); args->status = err; natsCondition_Signal(args->c); @@ -29824,6 +29834,7 @@ test_JetStreamOrderedConsSrvRestart(void) natsMutex_Lock(args.m); while ((s != NATS_TIMEOUT) && !args.reconnected) s = natsCondition_TimedWait(args.c, args.m, 2000); + natsMutex_Unlock(args.m); testCond(s == NATS_OK); test("Send 1 message: "); @@ -33921,8 +33932,8 @@ test_MicroAsyncErrorHandler_MaxPendingMsgs(void) natsMutex_Lock(arg.m); while ((s != NATS_TIMEOUT) && !arg.closed) s = natsCondition_TimedWait(arg.c, arg.m, 1000); - natsMutex_Unlock(arg.m); testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); + natsMutex_Unlock(arg.m); microService_Destroy(m); _waitForMicroservicesAllDone(&arg);