Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAVA-3051: Memory leak #1743

Open
wants to merge 24 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1deb2f3
memory leak inital change and test
SiyaoIsHiding Nov 2, 2023
cb61eb8
remove wildcard import
SiyaoIsHiding Nov 2, 2023
4f9d779
delete test
SiyaoIsHiding Nov 6, 2023
7042177
format
SiyaoIsHiding Nov 13, 2023
2f23cae
For the memory leak of DefaultLoadBalancingPolicy.responseTimes
SiyaoIsHiding Nov 14, 2023
6fa4cb2
For the memory leak of DistanceEvent.node
SiyaoIsHiding Nov 14, 2023
e443882
format
SiyaoIsHiding Nov 14, 2023
1588ebd
WIP: For memory leak from NodeStateEvent.node
SiyaoIsHiding Nov 14, 2023
35eab99
remove wildcard
SiyaoIsHiding Nov 14, 2023
6ed271a
address requests for changes about DefaultLoadBalancingPolicy.respons…
SiyaoIsHiding Nov 15, 2023
ddd6877
revert AtomicLongArray changes
SiyaoIsHiding Nov 16, 2023
39eb4ac
Add doc for synchronous load
SiyaoIsHiding Nov 16, 2023
08ad473
fix ReplayingEventFilter.recordedEvents and MicrometerNodeMetricUpdater
SiyaoIsHiding Nov 17, 2023
ce108d7
Merge remote-tracking branch 'upstream/4.x' into memory-leak
SiyaoIsHiding Nov 20, 2023
7684201
LOG.info to LOG.debug. Nullable getters.
SiyaoIsHiding Nov 20, 2023
9a4779a
Merge branch '4.x' of github.com:datastax/java-driver into memory-leak
SiyaoIsHiding Mar 7, 2024
99c5c5b
without refactoring of NodeResponseRateSample
SiyaoIsHiding Mar 7, 2024
86b0e1c
Refactored NodeResponseRateSample
SiyaoIsHiding Mar 8, 2024
5363a73
Merge remote-tracking branch 'upstream/4.x' into memory-leak
SiyaoIsHiding Jun 14, 2024
36d8dd5
revert changes on NodeStateEvent and DistanceEvent
SiyaoIsHiding Jun 14, 2024
7f15b93
refactor responseTimes to use new MapMaker().weakKeys().makeMap();
SiyaoIsHiding Jun 17, 2024
d616d06
return sample == null || !sample.hasSufficientResponses(now);
SiyaoIsHiding Jun 17, 2024
cde9bbb
missing response times data considered sufficient
SiyaoIsHiding Jun 17, 2024
4c2ca83
Refactor to Map<Node, NodeDistance> lastNodeDistance and Map<Node, No…
SiyaoIsHiding Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.datastax.oss.driver.internal.core.channel.DriverChannelOptions;
import com.datastax.oss.driver.internal.core.channel.EventCallback;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.DefaultTopologyMonitor;
import com.datastax.oss.driver.internal.core.metadata.DistanceEvent;
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
Expand Down Expand Up @@ -534,30 +535,34 @@ private void reconnectNow() {

private void onDistanceEvent(DistanceEvent event) {
assert adminExecutor.inEventLoop();
this.lastDistanceEvents.put(event.node, event);
DefaultNode node = event.getNode();
if (node == null) return;
this.lastDistanceEvents.put(node, event);
if (event.distance == NodeDistance.IGNORED
&& channel != null
&& !channel.closeFuture().isDone()
&& event.node.getEndPoint().equals(channel.getEndPoint())) {
&& node.getEndPoint().equals(channel.getEndPoint())) {
LOG.debug(
"[{}] Control node {} became IGNORED, reconnecting to a different node",
logPrefix,
event.node);
node);
reconnectNow();
}
}

private void onStateEvent(NodeStateEvent event) {
assert adminExecutor.inEventLoop();
this.lastStateEvents.put(event.node, event);
DefaultNode node = event.getNode();
if (node == null) return;
this.lastStateEvents.put(node, event);
if ((event.newState == null /*(removed)*/ || event.newState == NodeState.FORCED_DOWN)
&& channel != null
&& !channel.closeFuture().isDone()
&& event.node.getEndPoint().equals(channel.getEndPoint())) {
&& node.getEndPoint().equals(channel.getEndPoint())) {
LOG.debug(
"[{}] Control node {} was removed or forced down, reconnecting to a different node",
logPrefix,
event.node);
node);
reconnectNow();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
import com.datastax.oss.driver.shaded.guava.common.cache.CacheLoader;
import com.datastax.oss.driver.shaded.guava.common.cache.LoadingCache;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.BitSet;
Expand Down Expand Up @@ -96,14 +99,38 @@ public class DefaultLoadBalancingPolicy extends BasicLoadBalancingPolicy impleme
private static final int MAX_IN_FLIGHT_THRESHOLD = 10;
private static final long RESPONSE_COUNT_RESET_INTERVAL_NANOS = MILLISECONDS.toNanos(200);

protected final Map<Node, AtomicLongArray> responseTimes = new ConcurrentHashMap<>();
protected final LoadingCache<Node, AtomicLongArray> responseTimes;
protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
private final boolean avoidSlowReplicas;

public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
super(context, profileName);
this.avoidSlowReplicas =
profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE, true);
CacheLoader<Node, AtomicLongArray> cacheLoader =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: use a separate class for the cache value here, rather than using AtomicLongArray as a generic container. Seems like it can be something like NodeResponseRateSample, with methods like boolean hasSufficientResponses. I see this was present in the previous implementation, so not a required change for this PR, just something I noticed.

new CacheLoader<Node, AtomicLongArray>() {
@Override
public AtomicLongArray load(Node key) throws Exception {
// The array stores at most two timestamps, since we don't need more;
// the first one is always the least recent one, and hence the one to inspect.
long now = nanoTime();
AtomicLongArray array = responseTimes.getIfPresent(key);
if (array == null) {
array = new AtomicLongArray(1);
array.set(0, now);
} else if (array.length() == 1) {
long previous = array.get(0);
array = new AtomicLongArray(2);
array.set(0, previous);
array.set(1, now);
SiyaoIsHiding marked this conversation as resolved.
Show resolved Hide resolved
} else {
array.set(0, array.get(1));
array.set(1, now);
}
return array;
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You actually don't need a LoadingCache here; a simple Cache will do. A LoadingCache is useful when you need to create an entry for a key that's been requested but isn't in the cache yet. That's not what's going on here; your CacheLoader is loading keys from the map, creating a new version if it isn't present and then updating it. You're not doing something to create the NodeResponseRateSample instance beyond calling the constructor if necessary.

You can accomplish the exact same thing using just a regular Cache by changing your update logic just a bit:

  protected void updateResponseTimes(@NonNull Node node) {
    try {

      responseTimes.get(node, NodeResponseRateSample::new).update();
    }
    catch (ExecutionException ee) {
      LOG.info("[{}] Exception updating node response times: {}", logPrefix, ee);
    }
  }

I note that this approach is even recommended in the Guava Javadoc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't your version subject to race conditions though. In the loading cache version, NodeResponseRateSample.update() is called inside CacheLoader.load() and thus cannot be called concurrently. In your suggestion, update() is called after the value is retrieved from the cache, and thus could be executed concurrently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I've gone round and round about this one over the past few days. I agree that my impl isn't right but a LoadingCache (or at least a LoadingCache used in the way it's used here) still feels wrong to me. I vastly prefer the approach of the original code which just leverages ConcurrentHashMap.compute() to do the right thing.

I think part of the mismatch for me here is that CacheLoader.get() doesn't provide the current value (if any) as an arg so you wind up having to do this weird refresh() dance. I'm assuming part of the reason the key wasn't provided is because that's not really the intended use case for a CacheLoader but I digress. As mentioned ConcurrentHashMap seems to model this case more effectively, and while that class doesn't support weak references directly we can get there indirectly with Guava'a MapMaker: new MapMaker().weakKeys.getMap() will give us what we want. The only thing we're missing then is the RemovalListener, but as mentioned elsewhere (a) I'm not sure if that's necessary now and (b) I'm not sure it'll do what we want anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this more concrete: I got the relevant test case (DefaultLoadBalancingPolicyRequestTrackerTest) with the following changes:

  protected final ConcurrentMap<Node, NodeResponseRateSample> responseTimes;
  protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
  private final boolean avoidSlowReplicas;

  public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
    super(context, profileName);
    this.avoidSlowReplicas =
        profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE, true);
    this.responseTimes =
            new MapMaker().weakKeys().makeMap();
  }
...
  protected void updateResponseTimes(@NonNull Node node) {
    this.responseTimes.compute(node, (k,v) -> v == null ? new NodeResponseRateSample() : v.next());
  }
...
  // Exposed as protected for unit tests
  protected class NodeResponseRateSample {

    @VisibleForTesting
    protected final long oldest;
    @VisibleForTesting
    protected final OptionalLong newest;

    private NodeResponseRateSample() {

      long now = nanoTime();
      this.oldest = now;
      this.newest = OptionalLong.empty();
    }

    private NodeResponseRateSample(long oldestSample) {
      this(oldestSample, nanoTime());
    }

    private NodeResponseRateSample(long oldestSample, long newestSample) {
      this.oldest = oldestSample;
      this.newest = OptionalLong.of(newestSample);
    }

    @VisibleForTesting
    protected NodeResponseRateSample(AtomicLongArray times) {
      assert times.length() >= 1;
      this.oldest = times.get(0);
      this.newest = (times.length() > 1) ? OptionalLong.of(times.get(1)) : OptionalLong.empty();
    }

    // Our newest sample becomes the oldest in the next generation
    private NodeResponseRateSample next() {
      return new NodeResponseRateSample(this.getNewestValidSample(), nanoTime());
    }

    // If we have a pair of values return the newest, otherwise we have just one value... so just return it
    private long getNewestValidSample() { return this.newest.orElse(this.oldest); }

    // response rate is considered insufficient when less than 2 responses were obtained in
    // the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
    private boolean hasSufficientResponses(long now) {
      // If we only have one sample it's an automatic failure
      if (!this.newest.isPresent())
        return false;
      long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
      return this.oldest - threshold >= 0;
    }
  }

this.responseTimes = CacheBuilder.newBuilder().weakKeys().build(cacheLoader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a RemovalListener here.

If a GC happens and response times for a Node are purged, then we'll end up treating that as "insufficient responses" in isResponseRateInsufficient, which can lead us to mark a node as unhealthy. I recognize that this is a bit of a pathological example, but this behavior does depend on GC timing and would be a pain to track down, so adding logging could make someone's life easier down the line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review. Would you please explain more about this?
If GC collects a node, that means the node is gone. If the node is gone, why do we care about whether it's treated as healthy or not?
Anyway, for RemovalListener, do you mean sth like this?

    this.responseTimes = CacheBuilder.newBuilder().weakKeys().removalListener(
            (RemovalListener<Node, AtomicLongArray>) notification -> 
                    LOG.trace("[{}] Evicting response times for {}: {}", 
                            logPrefix, notification.getKey(), notification.getCause()))
            .build(cacheLoader);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aratno Hi Abe, thank you for your review. Is there any update?

}

@NonNull
Expand Down Expand Up @@ -276,38 +303,23 @@ protected boolean isBusy(@NonNull Node node, @NonNull Session session) {
protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
// response rate is considered insufficient when less than 2 responses were obtained in
// the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
if (responseTimes.containsKey(node)) {
AtomicLongArray array = responseTimes.get(node);
if (array.length() == 2) {
long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
long leastRecent = array.get(0);
return leastRecent - threshold < 0;
}
}
return true;
AtomicLongArray array = responseTimes.getIfPresent(node);
if (array != null && array.length() == 2) {
long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
long leastRecent = array.get(0);
return leastRecent - threshold < 0;
} else return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: Invert the condition and use an early-return if response rate is insufficient, so you don't have else return true

}

/**
* Synchronously updates the response times for the given node. It is synchronous because the
* {@link #DefaultLoadBalancingPolicy(com.datastax.oss.driver.api.core.context.DriverContext,
* java.lang.String) CacheLoader.load} assigned is synchronous.
*
* @param node The node to update.
*/
protected void updateResponseTimes(@NonNull Node node) {
responseTimes.compute(
node,
(n, array) -> {
// The array stores at most two timestamps, since we don't need more;
// the first one is always the least recent one, and hence the one to inspect.
long now = nanoTime();
if (array == null) {
array = new AtomicLongArray(1);
array.set(0, now);
} else if (array.length() == 1) {
long previous = array.get(0);
array = new AtomicLongArray(2);
array.set(0, previous);
array.set(1, now);
} else {
array.set(0, array.get(1));
array.set(1, now);
}
return array;
});
responseTimes.refresh(node);
SiyaoIsHiding marked this conversation as resolved.
Show resolved Hide resolved
}

protected int getInFlight(@NonNull Node node, @NonNull Session session) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import java.lang.ref.WeakReference;
import java.util.Objects;
import javax.annotation.Nullable;
import net.jcip.annotations.Immutable;

/**
Expand All @@ -29,11 +31,16 @@
@Immutable
public class DistanceEvent {
public final NodeDistance distance;
public final DefaultNode node;
private final WeakReference<DefaultNode> node;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as discussed in NodeStateEvent; I don't think we need a WeakReference here.


public DistanceEvent(NodeDistance distance, DefaultNode node) {
this.distance = distance;
this.node = node;
this.node = new WeakReference<>(node);
}

@Nullable
public DefaultNode getNode() {
return node.get();
}

@Override
Expand All @@ -42,19 +49,19 @@ public boolean equals(Object other) {
return true;
} else if (other instanceof DistanceEvent) {
DistanceEvent that = (DistanceEvent) other;
return this.distance == that.distance && Objects.equals(this.node, that.node);
return this.distance == that.distance && Objects.equals(this.getNode(), that.getNode());
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(this.distance, this.node);
return Objects.hash(this.distance, this.getNode());
}

@Override
public String toString() {
return "DistanceEvent(" + distance + ", " + node + ")";
return "DistanceEvent(" + distance + ", " + this.getNode() + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -105,7 +106,7 @@ public LoadBalancingPolicyWrapper(
// Just an alias to make the rest of the code more readable
this.policies = reporters.keySet();

this.distances = new HashMap<>();
this.distances = new WeakHashMap<>();

this.logPrefix = context.getSessionName();
context.getEventBus().register(NodeStateEvent.class, this::onNodeStateEvent);
Expand Down Expand Up @@ -172,6 +173,11 @@ private void onNodeStateEvent(NodeStateEvent event) {

// once it has gone through the filter
private void processNodeStateEvent(NodeStateEvent event) {
DefaultNode node = event.getNode();
if (node == null) {
LOG.debug("[{}] Node for this event was removed, ignoring: {}", logPrefix, event);
return;
}
switch (stateRef.get()) {
case BEFORE_INIT:
case DURING_INIT:
Expand All @@ -181,13 +187,13 @@ private void processNodeStateEvent(NodeStateEvent event) {
case RUNNING:
for (LoadBalancingPolicy policy : policies) {
if (event.newState == NodeState.UP) {
policy.onUp(event.node);
policy.onUp(node);
} else if (event.newState == NodeState.DOWN || event.newState == NodeState.FORCED_DOWN) {
policy.onDown(event.node);
policy.onDown(node);
} else if (event.newState == NodeState.UNKNOWN) {
policy.onAdd(event.node);
policy.onAdd(node);
} else if (event.newState == null) {
policy.onRemove(event.node);
policy.onRemove(node);
} else {
LOG.warn("[{}] Unsupported event: {}", logPrefix, event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import com.datastax.oss.driver.api.core.metadata.NodeState;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import java.lang.ref.WeakReference;
import java.util.Objects;
import javax.annotation.Nullable;
import net.jcip.annotations.Immutable;

/**
Expand Down Expand Up @@ -53,14 +55,19 @@ public static NodeStateEvent removed(DefaultNode node) {
*/
public final NodeState newState;

public final DefaultNode node;
private final WeakReference<DefaultNode> node;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this change. Events of this type are very short-lived; they exist to communicate information between driver components that don't necessarily know about each other. They aren't stockpiled or stored in any meaningful way. You address the problem that was originally reported by changing the distances map in LoadBalancingPolicyWrapper to use weak refs... it's not at all clear to me that making the events use weak references buys you much on top of that.

Perhaps more importantly this change has the potential to make events a lot less useful. The driver uses events to notify components about changes in nodes, but if the actual node affected might not be present what use is the notification? Components that receive events without node information have no choice but to ignore them which means (a) every receiver has to check for null node information and (b) if you just ignore events without node data (which all these receivers appear to do) you'll get a lot more indeterminate behaviour since the presence or absence of node data in events is basically a random value (since it's determined by GC pressure which is essentially random from the perspective of the driver).

I note that the null checks referenced in (a) above are a big chunk of what's actually in this PR. If not for this constraint the change set would be a lot smaller. That wouldn't be the worse thing as the presence of all those null checks obscures the more significant changes in at least a moderate way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@absurdfarce I will try to revert the changes and see whether it still leaks, but:
There are two WeakHashMap held by ControlConnection:

    private final Map<Node, DistanceEvent> lastDistanceEvents = new WeakHashMap<>();
    private final Map<Node, NodeStateEvent> lastStateEvents = new WeakHashMap<>();

However, both DistanceEvent and NodeStateEvent hold strong references to the associated node
From the doc of WeakHashMap:

Implementation note: The value objects in a WeakHashMap are held by ordinary strong references. Thus care should be taken to ensure that value objects do not strongly refer to their own keys, either directly or indirectly, since that will prevent the keys from being discarded.

In this case DistanceEvent and NodeStateEvent are the values that hold strong references to the keys. Therefore, I think it may leak references.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested out. If NodeStateEvent and DistanceEvent hold strong references to node, then objects of these two kinds, together with DefaultNode, will not be garbage collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed that if we put lastStateEvent and lastDistanceEvent as two fields of DefaultNode, then it will work. DefaultNode <-> NodeStateEvent only point to each other, same for DistanceEvent. Objects of all three classes will be collected. However, I understand this is not an ideal solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this:

    private final Map<Node, NodeDistance> lastDistanceEvents = new WeakHashMap<>();
    private final Map<Node, NodeState> lastStateEvents = new WeakHashMap<>();


private NodeStateEvent(NodeState oldState, NodeState newState, DefaultNode node) {
this.node = node;
this.node = new WeakReference<>(node);
this.oldState = oldState;
this.newState = newState;
}

@Nullable
public DefaultNode getNode() {
return node.get();
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand All @@ -69,19 +76,19 @@ public boolean equals(Object other) {
NodeStateEvent that = (NodeStateEvent) other;
return this.oldState == that.oldState
&& this.newState == that.newState
&& Objects.equals(this.node, that.node);
&& Objects.equals(this.getNode(), that.getNode());
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(oldState, newState, node);
return Objects.hash(oldState, newState, this.getNode());
}

@Override
public String toString() {
return "NodeStateEvent(" + oldState + "=>" + newState + ", " + node + ")";
return "NodeStateEvent(" + oldState + "=>" + newState + ", " + this.getNode() + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ protected Timeout newTimeout() {
.getTimer()
.newTimeout(
t -> {
if (t.isExpired()) {
clearMetrics();
}
clearMetrics();
cancelMetricsExpirationTimeout();
Comment on lines +176 to +177
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lambda for timeout. Even after the timeout and lambda triggered, the Timer object is not collected and it still holds a reference to this, until it's canceled.

},
expireAfter.toNanos(),
TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.datastax.oss.driver.api.core.metrics.NodeMetric;
import com.datastax.oss.driver.api.core.metrics.SessionMetric;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import edu.umd.cs.findbugs.annotations.Nullable;
Expand Down Expand Up @@ -118,15 +119,16 @@ public NodeMetricUpdater newNodeUpdater(Node node) {
}

protected void processNodeStateEvent(NodeStateEvent event) {
DefaultNode node = event.getNode();
if (node == null) return;
if (event.newState == NodeState.DOWN
|| event.newState == NodeState.FORCED_DOWN
|| event.newState == null) {
// node is DOWN or REMOVED
((DropwizardNodeMetricUpdater) event.node.getMetricUpdater()).startMetricsExpirationTimeout();
((DropwizardNodeMetricUpdater) node.getMetricUpdater()).startMetricsExpirationTimeout();
} else if (event.newState == NodeState.UP || event.newState == NodeState.UNKNOWN) {
// node is UP or ADDED
((DropwizardNodeMetricUpdater) event.node.getMetricUpdater())
.cancelMetricsExpirationTimeout();
((DropwizardNodeMetricUpdater) node.getMetricUpdater()).cancelMetricsExpirationTimeout();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.context.LifecycleListener;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
import com.datastax.oss.driver.internal.core.metadata.MetadataManager.RefreshSchemaResult;
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
Expand Down Expand Up @@ -525,14 +526,18 @@ private void notifyListeners() {

private void onNodeStateChanged(NodeStateEvent event) {
assert adminExecutor.inEventLoop();
if (event.newState == null) {
context.getNodeStateListener().onRemove(event.node);
DefaultNode node = event.getNode();
if (node == null) {
LOG.debug(
"[{}] Node for this event was removed, ignoring state change: {}", logPrefix, event);
} else if (event.newState == null) {
context.getNodeStateListener().onRemove(node);
} else if (event.oldState == null && event.newState == NodeState.UNKNOWN) {
context.getNodeStateListener().onAdd(event.node);
context.getNodeStateListener().onAdd(node);
} else if (event.newState == NodeState.UP) {
context.getNodeStateListener().onUp(event.node);
context.getNodeStateListener().onUp(node);
} else if (event.newState == NodeState.DOWN || event.newState == NodeState.FORCED_DOWN) {
context.getNodeStateListener().onDown(event.node);
context.getNodeStateListener().onDown(node);
}
}

Expand Down
Loading