Skip to content

Commit

Permalink
IGNITE-23634 Make hardcoded REBALANCE_RETRY_DELAY_MS configurable (#4951
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kgusakov authored Dec 24, 2024
1 parent 8ace698 commit 02af2a9
Show file tree
Hide file tree
Showing 22 changed files with 446 additions and 173 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.configuration.utils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.SystemDistributedView;
import org.apache.ignite.internal.configuration.SystemPropertyView;

/** Holder of system distributed configuration property with auto-update and support of external listener. */
public class SystemDistributedConfigurationPropertyHolder<T> {
/** Configuration property name. */
private final String propertyName;

/** Default value. */
private final T defaultValue;

/** System distributed configuration. */
private final SystemDistributedConfiguration systemDistributedConfig;

/** Current value of target system distributed configuration property. */
private final AtomicReference<T> currentValue = new AtomicReference<>();

/** Listener, which receives (newValue, revision) on every configuration update. */
private final BiConsumer<T, Long> valueListener;

/** Converter to translate {@link String} representation of property value to target type. */
private final Function<String, T> propertyConverter;

/**
* Constructor.
*
* @param systemDistributedConfig System distributed configuration.
* @param valueListener Listener, which receives (newValue, revision) on every configuration update.
* @param propertyName Configuration property name.
* @param defaultValue Default value.
* @param propertyConverter Converter to translate {@link String} representation of property value to target type.
*/
public SystemDistributedConfigurationPropertyHolder(
SystemDistributedConfiguration systemDistributedConfig,
BiConsumer<T, Long> valueListener,
String propertyName,
T defaultValue,
Function<String, T> propertyConverter
) {
this.systemDistributedConfig = systemDistributedConfig;
this.valueListener = valueListener;
this.propertyName = propertyName;
this.defaultValue = defaultValue;
this.propertyConverter = propertyConverter;

systemDistributedConfig.listen(ctx -> {
updateSystemProperties(ctx.newValue(), ctx.storageRevision());

return CompletableFuture.completedFuture(null);
});
}

/**
* Init property value, but doesn't call the listener.
*
* <p>If this method's call or first configuration update will not occur before holder usage, it will produce a {@code null} value.
*/
public void init() {
updateSystemProperties(systemDistributedConfig.value(), -1);
}

/**
* Returns current value of configuration property.
*
* @return Current value.
*/
public T currentValue() {
return currentValue.get();
}

/**
* Update current value and call listener (if revision != -1).
*
* @param view System distributed view.
* @param revision Metastorage revision.
*/
private void updateSystemProperties(SystemDistributedView view, long revision) {
SystemPropertyView systemPropertyView = view.properties().get(propertyName);

T value = (systemPropertyView == null) ? defaultValue : propertyConverter.apply(systemPropertyView.propertyValue());

currentValue.set(value);

if (revision != -1) {
valueListener.accept(value, revision);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ignite.internal.distributionzones.configuration;
package org.apache.ignite.internal.configuration.utils;

import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
Expand All @@ -27,74 +27,88 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

/** Tests for {@link DistributionZonesHighAvailabilityConfiguration}. */
/** Tests for {@link SystemDistributedConfigurationPropertyHolder}. */
@ExtendWith(ConfigurationExtension.class)
public class DistributionZonesHighAvailabilityConfigurationTest extends BaseIgniteAbstractTest {
private static final String PARTITION_DISTRIBUTION_RESET_TIMEOUT = "partitionDistributionResetTimeout";
public class SystemDistributedConfigurationPropertyHolderTest extends BaseIgniteAbstractTest {
private static final String PROPERTY_NAME = "distributedPropertyName";

private static final long PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE = 0;
private static final String DEFAULT_VALUE = "defaultValue";

private static final BiConsumer<Integer, Long> noOpConsumer = (partitionDistributionResetTimeout, revision) -> {};
private static final BiConsumer<String, Long> noOpConsumer = (value, revision) -> {};

@Test
void testEmptySystemProperties(@InjectConfiguration SystemDistributedConfiguration systemConfig) {
var config = new DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
config.startAndInit();
var config = new SystemDistributedConfigurationPropertyHolder<>(
systemConfig,
noOpConsumer,
PROPERTY_NAME,
DEFAULT_VALUE,
Function.identity()
);
config.init();

assertEquals(PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE, config.partitionDistributionResetTimeoutSeconds());
assertEquals(DEFAULT_VALUE, config.currentValue());
}

@Test
void testValidSystemPropertiesOnStart(
@InjectConfiguration("mock.properties = {"
+ PARTITION_DISTRIBUTION_RESET_TIMEOUT + ".propertyValue = \"5\"}")
+ PROPERTY_NAME + ".propertyValue = \"newValue\"}")
SystemDistributedConfiguration systemConfig
) {
var config = new DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
config.startAndInit();
var config = noopConfigHolder(systemConfig);

config.init();

assertEquals(5, config.partitionDistributionResetTimeoutSeconds());
assertEquals("newValue", config.currentValue());
}

@Test
void testValidSystemPropertiesOnChange(@InjectConfiguration SystemDistributedConfiguration systemConfig) {
var config = new DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
config.startAndInit();
var config = noopConfigHolder(systemConfig);

changeSystemConfig(systemConfig, "10");
config.init();

changeSystemConfig(systemConfig, "newValue");

assertEquals(10, config.partitionDistributionResetTimeoutSeconds());
assertEquals("newValue", config.currentValue());
}

@Test
void testUpdateConfigListener(@InjectConfiguration SystemDistributedConfiguration systemConfig) throws InterruptedException {
AtomicReference<Integer> partitionDistributionResetTimeoutValue = new AtomicReference<>();
void testUpdateConfigListenerWithConverter(
@InjectConfiguration SystemDistributedConfiguration systemConfig
) throws InterruptedException {
AtomicReference<Integer> currentValue = new AtomicReference<>();
AtomicReference<Long> revisionValue = new AtomicReference<>();

var config = new DistributionZonesHighAvailabilityConfiguration(
var config = new SystemDistributedConfigurationPropertyHolder<>(
systemConfig,
(partitionDistributionResetTimeout, revision) -> {
partitionDistributionResetTimeoutValue.set(partitionDistributionResetTimeout);
(v, revision) -> {
currentValue.set(v);
revisionValue.set(revision);
}
},
PROPERTY_NAME,
0,
Integer::parseInt
);
config.startAndInit();
config.init();

assertNotEquals(10, partitionDistributionResetTimeoutValue.get());
assertNotEquals(10, currentValue.get());
assertNotEquals(1, revisionValue.get());

changeSystemConfig(systemConfig, "10");

assertTrue(waitForCondition(() ->
partitionDistributionResetTimeoutValue.get() != null
&& partitionDistributionResetTimeoutValue.get() == 10, 1_000));
currentValue.get() != null
&& currentValue.get().equals(10), 1_000));
assertEquals(1, revisionValue.get());
}

Expand All @@ -103,9 +117,19 @@ private static void changeSystemConfig(
String partitionDistributionReset
) {
CompletableFuture<Void> changeFuture = systemConfig.change(c0 -> c0.changeProperties()
.create(PARTITION_DISTRIBUTION_RESET_TIMEOUT, c1 -> c1.changePropertyValue(partitionDistributionReset))
.create(PROPERTY_NAME, c1 -> c1.changePropertyValue(partitionDistributionReset))
);

assertThat(changeFuture, willCompleteSuccessfully());
}

private static SystemDistributedConfigurationPropertyHolder<String> noopConfigHolder(SystemDistributedConfiguration systemConfig) {
return new SystemDistributedConfigurationPropertyHolder<>(
systemConfig,
noOpConsumer,
PROPERTY_NAME,
DEFAULT_VALUE,
Function.identity()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_DROP;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForRecoverableStateChanges;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForZoneCreation;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForZoneRemoval;
Expand Down Expand Up @@ -98,8 +100,8 @@
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
import org.apache.ignite.internal.distributionzones.causalitydatanodes.CausalityDataNodesEngine;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesHighAvailabilityConfiguration;
import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
Expand Down Expand Up @@ -210,7 +212,7 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
private final ScheduledExecutorService rebalanceScheduler;

/** Configuration of HA mode. */
private final DistributionZonesHighAvailabilityConfiguration configuration;
private final SystemDistributedConfigurationPropertyHolder<Integer> partitionDistributionResetTimeoutConfiguration;

/**
* Creates a new distribution zone manager.
Expand Down Expand Up @@ -264,9 +266,12 @@ public DistributionZoneManager(
catalogManager
);

configuration = new DistributionZonesHighAvailabilityConfiguration(
partitionDistributionResetTimeoutConfiguration = new SystemDistributedConfigurationPropertyHolder<>(
systemDistributedConfiguration,
this::onUpdatePartitionDistributionResetBusy
this::onUpdatePartitionDistributionResetBusy,
PARTITION_DISTRIBUTION_RESET_TIMEOUT,
PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE,
Integer::parseInt
);
}

Expand Down Expand Up @@ -296,7 +301,7 @@ public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
// fires CatalogManager's ZONE_CREATE event, and the state of DistributionZoneManager becomes consistent.
int catalogVersion = catalogManager.latestCatalogVersion();

configuration.start();
partitionDistributionResetTimeoutConfiguration.init();

return allOf(
createOrRestoreZonesStates(recoveryRevision, catalogVersion),
Expand Down Expand Up @@ -944,7 +949,7 @@ private CompletableFuture<Void> scheduleTimers(CatalogZoneDescriptor zone, boole
int autoAdjust = zone.dataNodesAutoAdjust();
int autoAdjustScaleDown = zone.dataNodesAutoAdjustScaleDown();
int autoAdjustScaleUp = zone.dataNodesAutoAdjustScaleUp();
int partitionReset = configuration.partitionDistributionResetTimeoutSeconds();
int partitionReset = partitionDistributionResetTimeoutConfiguration.currentValue();

int zoneId = zone.id();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ public class DistributionZonesUtil {
private static final ByteArray DISTRIBUTION_ZONES_DATA_NODES_KEY =
new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX);

/**
* Internal property that determines partition group members reset timeout after the partition group majority loss.
*
* <p>Default value is {@link #PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE}.</p>
*/
public static final String PARTITION_DISTRIBUTION_RESET_TIMEOUT = "partitionDistributionResetTimeout";

/** Default value for the {@link #PARTITION_DISTRIBUTION_RESET_TIMEOUT}. */
static final int PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE = 0;

/**
* Internal property that determines delay between unsuccessful trial of a rebalance and a new trial, ms.
*
* <p>Default value is {@link #REBALANCE_RETRY_DELAY_DEFAULT}.</p>
*/
public static final String REBALANCE_RETRY_DELAY_MS = "rebalanceRetryDelay";

/** Default value for the {@link #REBALANCE_RETRY_DELAY_MS}. */
public static final int REBALANCE_RETRY_DELAY_DEFAULT = 200;

/**
* ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.ignite.internal.distributionzones.configuration;

import static org.apache.ignite.internal.distributionzones.configuration.DistributionZonesHighAvailabilityConfiguration.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.REBALANCE_RETRY_DELAY_MS;

import com.google.auto.service.AutoService;
import java.util.Set;
Expand All @@ -37,7 +38,8 @@ public ConfigurationType type() {
@Override
public Set<Validator<?, ?>> validators() {
return Set.of(
new NonNegativeIntegerNumberSystemPropertyValueValidator(PARTITION_DISTRIBUTION_RESET_TIMEOUT)
new NonNegativeIntegerNumberSystemPropertyValueValidator(PARTITION_DISTRIBUTION_RESET_TIMEOUT),
new NonNegativeIntegerNumberSystemPropertyValueValidator(REBALANCE_RETRY_DELAY_MS)
);
}
}
Loading

0 comments on commit 02af2a9

Please sign in to comment.