Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] cleaning up sanitize=thread found several races #771

Merged
merged 14 commits into from
Jul 22, 2024
Merged
21 changes: 16 additions & 5 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ on:
coverage:
type: string
default: "OFF"
dev_mode:
type: string
default: "OFF"
lib_msg_delivery:
type: string
default: "OFF"
Expand Down Expand Up @@ -40,6 +43,12 @@ on:
type: string
description: "Ubuntu version to use, e.g. '20.04'"
default: "latest"
verbose_test_output:
type: string
default: "OFF"
verbose_make_output:
type: string
default: "ON"
secrets:
CODECOV_TOKEN:
description: "Codecov repo token"
Expand Down Expand Up @@ -83,6 +92,12 @@ jobs:
if [[ "${{ inputs.coverage }}" == "ON" ]]; then
flags="$flags -DNATS_COVERAGE=ON"
fi
if [[ "${{ inputs.dev_mode }}" == "ON" ]]; then
flags="$flags -DDEV_MODE=ON"
fi
if [[ "${{ inputs.verbose_make_output }}" == "ON" ]]; then
flags="$flags -DCMAKE_VERBOSE_MAKEFILE=ON"
fi
echo "flags=$flags" >> $GITHUB_OUTPUT

- id: nats-vars
Expand Down Expand Up @@ -158,11 +173,7 @@ jobs:
export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH
export NATS_TEST_SERVER_VERSION="$(nats-server -v)"
flags=""
ctest --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} 2>&1 | tee /tmp/out.txt
if [[ $(grep -q 'ThreadSanitizer: ' /tmp/out.txt; echo $?) == 0 ]]; then
echo "!!! ThreadSanitizer detected WARNING(s) !!!"
exit 1
fi
ctest --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }}

- name: Upload coverage reports to Codecov
# PRs from external contributors fail: https://github.com/codecov/feedback/issues/301
Expand Down
43 changes: 29 additions & 14 deletions .github/workflows/on-pr-debug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,51 @@ jobs:
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

sanitize-addr:
name: "Sanitize address"
coverage-pooled:
name: "Coverage"
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
coverage: ON
server_version: main
type: Debug
lib_msg_delivery: ON
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

sanitize-addr-lib-msg-delivery:
name: "Sanitize address (lib_msg_delivery)"
dev-mode:
name: "DEV_MODE"
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
dev_mode: ON
server_version: main
lib_msg_delivery: ON
type: Debug
verbose_test_output: ON
verbose_make_output: ON

san-addr:
name: "Sanitize address (lib_write_deadline)"
sanitize:
name: "Sanitize"
strategy:
fail-fast: false
matrix:
compiler: [gcc, clang]
sanitize: [address, thread]
pooled_delivery: [ON, OFF]
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
server_version: main
lib_write_deadline: ON
type: Debug
compiler: ${{ matrix.compiler }}
sanitize: ${{ matrix.sanitize }}
lib_msg_delivery: ${{ matrix.pooled_delivery }}

san-thread:
name: "Sanitize thread"
san-addr-deadline:
name: "Sanitize address (lib_write_deadline)"
uses: ./.github/workflows/build-test.yml
with:
sanitize: thread
type: Debug
sanitize: address
server_version: main
lib_write_deadline: ON

Windows:
name: "Windows"
Expand Down
23 changes: 23 additions & 0 deletions .github/workflows/on-push-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,26 @@ jobs:
server_version: main
ubuntu_version: ${{ matrix.ubuntu_version }}
compiler: ${{ matrix.compiler }}

dev-mode:
name: "DEV_MODE"
uses: ./.github/workflows/build-test.yml
with:
dev_mode: ON
server_version: main
verbose_test_output: ON
verbose_make_output: ON

sanitize-addr:
name: "Sanitize address"
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
server_version: main

san-thread:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not sure why you would shorten to san- here while you used santize- for the address matrix run above :-)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorter is better? :) (will change for consistency)

name: "Sanitize thread"
uses: ./.github/workflows/build-test.yml
with:
sanitize: thread
server_version: main
22 changes: 21 additions & 1 deletion src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1137,17 +1137,29 @@

adjustedMax = 0;
natsSub_Lock(sub);
if (sub->libDlvWorker != NULL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are macro for lock/unlock of this in sub.c. Maybe we move the definition of these in sub.h (we include this header in conn.c), and so you could replace these 2 lines with:

SUB_DLV_WORKER_LOCK(sub);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kozlovic I stayed away from further exposing these macros. I am about to submit a re-factor (just passed all tests now without kicking!) that hopefully eliminates the need for double-locking, leaving the worker lock almost exclusively to protect the message queue. Ditto for the sub->closed comment.

It's ok with me if we keep this PR "hanging" for now until you see the upcoming solution, and then we can decide which way to fix? I just want to have it as my base branch to get the CI/unrelated test fixes out of the way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the last part, that is, do you want to merge this PR to be the base for the other work or the other work will be the (re)base for the CI changes. Either way, I am going to LGTM this PR so that you proceed as you wish.

{
natsMutex_Lock(sub->libDlvWorker->lock);
}
// If JS ordered consumer, trigger a reset. Don't check the error
// condition here. If there is a failure, it will be retried
// at the next HB interval.
if ((sub->jsi != NULL) && (sub->jsi->ordered))
{
jsSub_resetOrderedConsumer(sub, sub->jsi->sseq+1);
if (sub->libDlvWorker != NULL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and:

SUB_DLV_WORKER_UNLOCK(sub);

{
natsMutex_Unlock(sub->libDlvWorker->lock);

Check warning on line 1152 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L1152

Added line #L1152 was not covered by tests
}
natsSub_Unlock(sub);
continue;
}
if (natsSub_drainStarted(sub))
{
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
continue;
}
Expand All @@ -1160,6 +1172,10 @@
// messages have reached the max, if so, unsubscribe.
if (adjustedMax == 0)
{
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);

Check warning on line 1177 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L1177

Added line #L1177 was not covered by tests
}
natsSub_Unlock(sub);
s = natsConn_sendUnsubProto(nc, sub->sid, 0);
continue;
Expand All @@ -1172,6 +1188,10 @@

// Hold the lock up to that point so we are sure not to resend
// any SUB/UNSUB for a subscription that is in draining mode.
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
}

Expand Down Expand Up @@ -3435,7 +3455,7 @@
return nats_setDefaultError(NATS_CONNECTION_CLOSED);
}

natsSock_Close(nc->sockCtx.fd);
natsSock_Shutdown(nc->sockCtx.fd);

natsConn_Unlock(nc);
return NATS_OK;
Expand Down
20 changes: 20 additions & 0 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,10 @@
natsStatus s = NATS_OK;

natsSub_Lock(sub);
if (sub->libDlvWorker != NULL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same than for conn.c. We also include sub.h here, so that would work.

{
natsMutex_Lock(sub->libDlvWorker->lock);
}
alert = !jsi->active;
oc = jsi->ordered;
jsi->active = false;
Expand All @@ -2062,10 +2066,18 @@
natsCondition_Signal(sub->cond);
natsTimer_Stop(timer);
}
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);

Check warning on line 2071 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2071

Added line #L2071 was not covered by tests
}
natsSub_Unlock(sub);
return;
}
nc = sub->conn;
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);

if (!alert)
Expand All @@ -2075,12 +2087,20 @@
if (oc)
{
natsSub_Lock(sub);
if (sub->libDlvWorker != NULL)
{
natsMutex_Lock(sub->libDlvWorker->lock);

Check warning on line 2092 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2092

Added line #L2092 was not covered by tests
}
if (!sub->closed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Humm... there are may places where we check this under the sub's lock only. Sure, it is set under both locks and checked only under the lib's lock in nats.c (the code that deals with the delivery's pool thread), but if we find many other reports, then I think it would be simpler to modify _deliverMsgs so that we capture sub->closed under the sub's lock after releasing the delivery's lock. Anyway, for now change is ok, but again, let's keep that in mind.

{
// If we fail in that call, we will report to async err callback
// (if one is specified).
s = jsSub_resetOrderedConsumer(sub, sub->jsi->sseq+1);
}
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);

Check warning on line 2102 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2102

Added line #L2102 was not covered by tests
}
natsSub_Unlock(sub);
}

Expand Down
7 changes: 6 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ file(STRINGS list.txt listOfTestNames)

# For each test name
foreach(name ${listOfTestNames})

# Create a test and pass the index (start and end are the same)
# to the testsuite executable
add_test(NAME Test_${name}
Expand All @@ -45,6 +44,12 @@ foreach(name ${listOfTestNames})
# Make sure the test passes
set_tests_properties(Test_${name} PROPERTIES PASS_REGULAR_EXPRESSION "ALL PASSED")

# Set TSAN_OPTIONS for the test
if(NATS_SANITIZE)
set_tests_properties(Test_${name} PROPERTIES
ENVIRONMENT "TSAN_OPTIONS=detect_deadlocks=1:second_deadlock_stack=1:halt_on_error=1:report_signal_unsafe=1")
endif(NATS_SANITIZE)

# Bump the test index number
math(EXPR testIndex "${testIndex}+1")
endforeach()
Expand Down
15 changes: 13 additions & 2 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -5477,6 +5477,7 @@ test_natsSrvVersionAtLeast(void)
{
s = NATS_ERR;
}
natsConn_Unlock(nc);
}
}
testCond(s == NATS_OK);
Expand Down Expand Up @@ -13337,7 +13338,10 @@ test_ClientAutoUnsubAndReconnect(void)
nats_Sleep(10);

test("Received no more than max: ");
testCond((s == NATS_OK) && (arg.sum == 10));
natsMutex_Lock(arg.m);
int sum = arg.sum;
natsMutex_Unlock(arg.m);
testCond((s == NATS_OK) && (sum == 10));

natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
Expand Down Expand Up @@ -13390,6 +13394,7 @@ test_AutoUnsubNoUnsubOnDestroy(void)
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.done)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

natsConnection_Destroy(nc);
Expand Down Expand Up @@ -20252,6 +20257,7 @@ test_ForcedReconnect(void)
CHECK_SERVER_STARTED(pid);
IFOK(s, natsOptions_Create(&opts));
IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg));
IFOK(s, natsOptions_SetReconnectWait(opts, 100));
IFOK(s, natsConnection_Connect(&nc, opts));
IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo"));
testCond(s == NATS_OK);
Expand Down Expand Up @@ -20800,6 +20806,7 @@ test_EventLoop(void)
natsMutex_Lock(arg.m);
if (arg.attached != 2 || !arg.detached)
s = NATS_ERR;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

natsSubscription_Destroy(sub);
Expand Down Expand Up @@ -29485,6 +29492,9 @@ _jsOrderedErrHandler(natsConnection *nc, natsSubscription *subscription, natsSta
{
struct threadArg *args = (struct threadArg*) closure;

if (err != NATS_MISSED_HEARTBEAT)
return;

natsMutex_Lock(args->m);
args->status = err;
natsCondition_Signal(args->c);
Expand Down Expand Up @@ -29824,6 +29834,7 @@ test_JetStreamOrderedConsSrvRestart(void)
natsMutex_Lock(args.m);
while ((s != NATS_TIMEOUT) && !args.reconnected)
s = natsCondition_TimedWait(args.c, args.m, 2000);
natsMutex_Unlock(args.m);
testCond(s == NATS_OK);

test("Send 1 message: ");
Expand Down Expand Up @@ -33921,8 +33932,8 @@ test_MicroAsyncErrorHandler_MaxPendingMsgs(void)
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.closed)
s = natsCondition_TimedWait(arg.c, arg.m, 1000);
natsMutex_Unlock(arg.m);
testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER));
natsMutex_Unlock(arg.m);

microService_Destroy(m);
_waitForMicroservicesAllDone(&arg);
Expand Down