Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAVA-3051: Memory leak #1743

Open
wants to merge 24 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1deb2f3
memory leak inital change and test
SiyaoIsHiding Nov 2, 2023
cb61eb8
remove wildcard import
SiyaoIsHiding Nov 2, 2023
4f9d779
delete test
SiyaoIsHiding Nov 6, 2023
7042177
format
SiyaoIsHiding Nov 13, 2023
2f23cae
For the memory leak of DefaultLoadBalancingPolicy.responseTimes
SiyaoIsHiding Nov 14, 2023
6fa4cb2
For the memory leak of DistanceEvent.node
SiyaoIsHiding Nov 14, 2023
e443882
format
SiyaoIsHiding Nov 14, 2023
1588ebd
WIP: For memory leak from NodeStateEvent.node
SiyaoIsHiding Nov 14, 2023
35eab99
remove wildcard
SiyaoIsHiding Nov 14, 2023
6ed271a
address requests for changes about DefaultLoadBalancingPolicy.respons…
SiyaoIsHiding Nov 15, 2023
ddd6877
revert AtomicLongArray changes
SiyaoIsHiding Nov 16, 2023
39eb4ac
Add doc for synchronous load
SiyaoIsHiding Nov 16, 2023
08ad473
fix ReplayingEventFilter.recordedEvents and MicrometerNodeMetricUpdater
SiyaoIsHiding Nov 17, 2023
ce108d7
Merge remote-tracking branch 'upstream/4.x' into memory-leak
SiyaoIsHiding Nov 20, 2023
7684201
LOG.info to LOG.debug. Nullable getters.
SiyaoIsHiding Nov 20, 2023
9a4779a
Merge branch '4.x' of github.com:datastax/java-driver into memory-leak
SiyaoIsHiding Mar 7, 2024
99c5c5b
without refactoring of NodeResponseRateSample
SiyaoIsHiding Mar 7, 2024
86b0e1c
Refactored NodeResponseRateSample
SiyaoIsHiding Mar 8, 2024
5363a73
Merge remote-tracking branch 'upstream/4.x' into memory-leak
SiyaoIsHiding Jun 14, 2024
36d8dd5
revert changes on NodeStateEvent and DistanceEvent
SiyaoIsHiding Jun 14, 2024
7f15b93
refactor responseTimes to use new MapMaker().weakKeys().makeMap();
SiyaoIsHiding Jun 17, 2024
d616d06
return sample == null || !sample.hasSufficientResponses(now);
SiyaoIsHiding Jun 17, 2024
cde9bbb
missing response times data considered sufficient
SiyaoIsHiding Jun 17, 2024
4c2ca83
Refactor to Map<Node, NodeDistance> lastNodeDistance and Map<Node, No…
SiyaoIsHiding Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ private class SingleThreaded {
private final Reconnection reconnection;
private DriverChannelOptions channelOptions;
// The last events received for each node
private final Map<Node, DistanceEvent> lastDistanceEvents = new WeakHashMap<>();
private final Map<Node, NodeStateEvent> lastStateEvents = new WeakHashMap<>();
private final Map<Node, NodeDistance> lastNodeDistance = new WeakHashMap<>();
private final Map<Node, NodeState> lastNodeState = new WeakHashMap<>();

private SingleThreaded(InternalDriverContext context) {
this.context = context;
Expand Down Expand Up @@ -366,8 +366,8 @@ private void connect(
.whenCompleteAsync(
(channel, error) -> {
try {
DistanceEvent lastDistanceEvent = lastDistanceEvents.get(node);
NodeStateEvent lastStateEvent = lastStateEvents.get(node);
NodeDistance lastDistance = lastNodeDistance.get(node);
NodeState lastState = lastNodeState.get(node);
if (error != null) {
if (closeWasCalled || initFuture.isCancelled()) {
onSuccess.run(); // abort, we don't really care about the result
Expand Down Expand Up @@ -406,18 +406,17 @@ private void connect(
channel);
channel.forceClose();
onSuccess.run();
} else if (lastDistanceEvent != null
&& lastDistanceEvent.distance == NodeDistance.IGNORED) {
} else if (lastDistance == NodeDistance.IGNORED) {
LOG.debug(
"[{}] New channel opened ({}) but node became ignored, "
+ "closing and trying next node",
logPrefix,
channel);
channel.forceClose();
connect(nodes, errors, onSuccess, onFailure);
} else if (lastStateEvent != null
&& (lastStateEvent.newState == null /*(removed)*/
|| lastStateEvent.newState == NodeState.FORCED_DOWN)) {
} else if (lastNodeState.containsKey(node)
&& (lastState == null /*(removed)*/
|| lastState == NodeState.FORCED_DOWN)) {
LOG.debug(
"[{}] New channel opened ({}) but node was removed or forced down, "
+ "closing and trying next node",
Expand Down Expand Up @@ -534,7 +533,7 @@ private void reconnectNow() {

private void onDistanceEvent(DistanceEvent event) {
assert adminExecutor.inEventLoop();
this.lastDistanceEvents.put(event.node, event);
this.lastNodeDistance.put(event.node, event.distance);
if (event.distance == NodeDistance.IGNORED
&& channel != null
&& !channel.closeFuture().isDone()
Expand All @@ -549,7 +548,7 @@ private void onDistanceEvent(DistanceEvent event) {

private void onStateEvent(NodeStateEvent event) {
assert adminExecutor.inEventLoop();
this.lastStateEvents.put(event.node, event);
this.lastNodeState.put(event.node, event.newState);
if ((event.newState == null /*(removed)*/ || event.newState == NodeState.FORCED_DOWN)
&& channel != null
&& !channel.closeFuture().isDone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.MapMaker;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.BitSet;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLongArray;
import net.jcip.annotations.ThreadSafe;
Expand Down Expand Up @@ -96,14 +100,15 @@ public class DefaultLoadBalancingPolicy extends BasicLoadBalancingPolicy impleme
private static final int MAX_IN_FLIGHT_THRESHOLD = 10;
private static final long RESPONSE_COUNT_RESET_INTERVAL_NANOS = MILLISECONDS.toNanos(200);

protected final Map<Node, AtomicLongArray> responseTimes = new ConcurrentHashMap<>();
protected final ConcurrentMap<Node, NodeResponseRateSample> responseTimes;
protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
private final boolean avoidSlowReplicas;

public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
super(context, profileName);
this.avoidSlowReplicas =
profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE, true);
this.responseTimes = new MapMaker().weakKeys().makeMap();
}

@NonNull
Expand Down Expand Up @@ -274,40 +279,19 @@ protected boolean isBusy(@NonNull Node node, @NonNull Session session) {
}

protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
// response rate is considered insufficient when less than 2 responses were obtained in
// the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
if (responseTimes.containsKey(node)) {
AtomicLongArray array = responseTimes.get(node);
if (array.length() == 2) {
long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
long leastRecent = array.get(0);
return leastRecent - threshold < 0;
}
}
return true;
NodeResponseRateSample sample = responseTimes.get(node);
return !(sample == null || sample.hasSufficientResponses(now));
}

/**
* Synchronously updates the response times for the given node. It is synchronous because the
* {@link #DefaultLoadBalancingPolicy(com.datastax.oss.driver.api.core.context.DriverContext,
* java.lang.String) CacheLoader.load} assigned is synchronous.
*
* @param node The node to update.
*/
protected void updateResponseTimes(@NonNull Node node) {
responseTimes.compute(
node,
(n, array) -> {
// The array stores at most two timestamps, since we don't need more;
// the first one is always the least recent one, and hence the one to inspect.
long now = nanoTime();
if (array == null) {
array = new AtomicLongArray(1);
array.set(0, now);
} else if (array.length() == 1) {
long previous = array.get(0);
array = new AtomicLongArray(2);
array.set(0, previous);
array.set(1, now);
} else {
array.set(0, array.get(1));
array.set(1, now);
}
return array;
});
this.responseTimes.compute(node, (k, v) -> v == null ? new NodeResponseRateSample() : v.next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this was discussed already: the new design is undeniably more elegant, but it also allocates one new NodeResponseRateSample each time we access the map entry (because the entry value is immutable). In the old design the value was mutable in order to avoid allocations. Are we OK with this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My expectation is that it wouldn't matter since we're creating (and GCing) pretty small objects and the JVM usually handles that quite well but it's not unreasonable to confirm that.

@SiyaoIsHiding is this something you can test? I suppose if we can demonstrate roughly equivalent memory usage and throughput for a decent size load that would be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with a client app with VisualVM.
Using this branch, NodeResponseSample uses up to 5000 live bytes with 170 live objects. The app finished 40000 requests in 154 seconds.
Using the current 4.x branch, A lambda in DefaultLoadBalancingPolicy uses up to 3600 live bytes with 200 live objects. The only lambda I find there is in the responseTimes.compute here. The app finished 40000 requests in 161 seconds.
So I think there isn't much performance difference.

}

protected int getInFlight(@NonNull Node node, @NonNull Session session) {
Expand All @@ -318,4 +302,52 @@ protected int getInFlight(@NonNull Node node, @NonNull Session session) {
// processing them).
return (pool == null) ? 0 : pool.getInFlight();
}

protected class NodeResponseRateSample {

@VisibleForTesting protected final long oldest;
@VisibleForTesting protected final OptionalLong newest;

private NodeResponseRateSample() {
long now = nanoTime();
this.oldest = now;
this.newest = OptionalLong.empty();
}

private NodeResponseRateSample(long oldestSample) {
this(oldestSample, nanoTime());
}

private NodeResponseRateSample(long oldestSample, long newestSample) {
this.oldest = oldestSample;
this.newest = OptionalLong.of(newestSample);
}

@VisibleForTesting
protected NodeResponseRateSample(AtomicLongArray times) {
assert times.length() >= 1;
this.oldest = times.get(0);
this.newest = (times.length() > 1) ? OptionalLong.of(times.get(1)) : OptionalLong.empty();
}

// Our newest sample becomes the oldest in the next generation
private NodeResponseRateSample next() {
return new NodeResponseRateSample(this.getNewestValidSample(), nanoTime());
}

// If we have a pair of values return the newest, otherwise we have just one value... so just
// return it
private long getNewestValidSample() {
return this.newest.orElse(this.oldest);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be subject to race conditions to me, although I haven't been able to construct a specific scenario that triggers it. The old implementation computed a result array locally and then returned that; there was no changing of state at the ultimate destination (the old responseTimes map) beyond updating an entry or not doing so. This implementation is doing more than that; we're actually changing the types of elements (this.times changes in size) based on data we receive.

It might be okay... but then again I'm also not sure I see a harm in just making this.times be an array of size two from the beginning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I stated previously, this will race indeed, unless calls to update() are externally synchronized. I think the loading cache approach does guarantee that.

The idea of using an array of size 2 from the beginning could save us one array allocation – why not.


// response rate is considered insufficient when less than 2 responses were obtained in
// the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
private boolean hasSufficientResponses(long now) {
// If we only have one sample it's an automatic failure
if (!this.newest.isPresent()) return true;
long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
return this.oldest - threshold >= 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -105,7 +106,7 @@ public LoadBalancingPolicyWrapper(
// Just an alias to make the rest of the code more readable
this.policies = reporters.keySet();

this.distances = new HashMap<>();
this.distances = new WeakHashMap<>();

this.logPrefix = context.getSessionName();
context.getEventBus().register(NodeStateEvent.class, this::onNodeStateEvent);
Expand Down Expand Up @@ -172,6 +173,7 @@ private void onNodeStateEvent(NodeStateEvent event) {

// once it has gone through the filter
private void processNodeStateEvent(NodeStateEvent event) {
DefaultNode node = event.node;
switch (stateRef.get()) {
case BEFORE_INIT:
case DURING_INIT:
Expand All @@ -181,13 +183,13 @@ private void processNodeStateEvent(NodeStateEvent event) {
case RUNNING:
for (LoadBalancingPolicy policy : policies) {
if (event.newState == NodeState.UP) {
policy.onUp(event.node);
policy.onUp(node);
} else if (event.newState == NodeState.DOWN || event.newState == NodeState.FORCED_DOWN) {
policy.onDown(event.node);
policy.onDown(node);
} else if (event.newState == NodeState.UNKNOWN) {
policy.onAdd(event.node);
policy.onAdd(node);
} else if (event.newState == null) {
policy.onRemove(event.node);
policy.onRemove(node);
} else {
LOG.warn("[{}] Unsupported event: {}", logPrefix, event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ protected Timeout newTimeout() {
.getTimer()
.newTimeout(
t -> {
if (t.isExpired()) {
clearMetrics();
}
clearMetrics();
cancelMetricsExpirationTimeout();
Comment on lines +176 to +177
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lambda for timeout. Even after the timeout and lambda triggered, the Timer object is not collected and it still holds a reference to this, until it's canceled.

},
expireAfter.toNanos(),
TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,14 +527,18 @@ private void notifyListeners() {

private void onNodeStateChanged(NodeStateEvent event) {
assert adminExecutor.inEventLoop();
if (event.newState == null) {
context.getNodeStateListener().onRemove(event.node);
DefaultNode node = event.node;
if (node == null) {
LOG.debug(
"[{}] Node for this event was removed, ignoring state change: {}", logPrefix, event);
} else if (event.newState == null) {
context.getNodeStateListener().onRemove(node);
} else if (event.oldState == null && event.newState == NodeState.UNKNOWN) {
context.getNodeStateListener().onAdd(event.node);
context.getNodeStateListener().onAdd(node);
} else if (event.newState == NodeState.UP) {
context.getNodeStateListener().onUp(event.node);
context.getNodeStateListener().onUp(node);
} else if (event.newState == NodeState.DOWN || event.newState == NodeState.FORCED_DOWN) {
context.getNodeStateListener().onDown(event.node);
context.getNodeStateListener().onDown(node);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void markReady() {
consumer.accept(event);
}
} finally {
recordedEvents.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplayingEventFilter works like a buffer. It holds a list (queue) of Events since its state is STARTED, and consumes all of them when its state becomes READY all at once. However, the list of the events is never cleared. They leak strong references for the nodes.

stateLock.writeLock().unlock();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ public void should_prioritize_and_shuffle_3_or_more_replicas_when_first_unhealth
given(pool3.getInFlight()).willReturn(0);
given(pool5.getInFlight()).willReturn(0);

dsePolicy.responseTimes.put(node1, new AtomicLongArray(new long[] {T0, T0})); // unhealthy
dsePolicy.responseTimes.put(
node1,
dsePolicy
.new NodeResponseRateSample(new AtomicLongArray(new long[] {T0, T0}))); // unhealthy

// When
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);
Expand Down Expand Up @@ -232,7 +235,9 @@ public void should_prioritize_and_shuffle_3_or_more_replicas_when_first_unhealth
given(pool3.getInFlight()).willReturn(0);
given(pool5.getInFlight()).willReturn(0);

dsePolicy.responseTimes.put(node1, new AtomicLongArray(new long[] {T1, T1})); // healthy
dsePolicy.responseTimes.put(
node1,
dsePolicy.new NodeResponseRateSample(new AtomicLongArray(new long[] {T1, T1}))); // healthy

// When
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);
Expand Down
Loading