Skip to content

Commit

Permalink
KNOX-2942 - Miscellaneous HXR parser improvements (#779)
Browse files Browse the repository at this point in the history
  • Loading branch information
smolnar82 authored Jul 24, 2023
1 parent ab2749b commit 8c4db7a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -625,10 +625,10 @@ private synchronized void start() throws Exception {
File topologiesDir = calculateAbsoluteTopologiesDir();
monitor = services.getService(ServiceType.TOPOLOGY_SERVICE);

// Descriptors should be reloaded before topology reloading at startup, so that any changes to descriptors
// Shared providers and descriptors should be reloaded before topology reloading at startup, so that any changes to descriptors
// will be realized before Knox deploys "old" topologies that would have re-deployed anyway in a matter of seconds
// by the descriptor monitor
monitor.reloadDescriptors();
handleHadoopXmlResources();

monitor.addTopologyChangeListener(listener);
log.loadingTopologiesFromDirectory(topologiesDir.getAbsolutePath());
Expand Down Expand Up @@ -700,8 +700,6 @@ private synchronized void start() throws Exception {

cleanupTopologyDeployments();

handleHadoopXmlResources();

// Start the topology monitor.
monitor.startMonitor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.knox.gateway.topology.hadoop.xml;

import java.nio.file.attribute.FileTime;

import org.apache.knox.gateway.i18n.messages.Message;
import org.apache.knox.gateway.i18n.messages.MessageLevel;
import org.apache.knox.gateway.i18n.messages.Messages;
Expand All @@ -30,8 +32,8 @@ public interface HadoopXmlResourceMessages {
@Message(level = MessageLevel.INFO, text = "Monitoring Knox resources in Hadoop style XML configurations is disabled.")
void disableMonitoringHadoopXmlResources();

@Message(level = MessageLevel.INFO, text = "Parsing Knox resources in Hadoop style XML {0}. Looking up {1}...")
void parseHadoopXmlResource(String path, String topologyName);
@Message(level = MessageLevel.INFO, text = "Parsing Knox resources in Hadoop style XML {0}...")
void parseHadoopXmlResource(String path);

@Message(level = MessageLevel.INFO, text = "Found Knox descriptors {0} in {1}")
void foundKnoxDescriptors(String descriptorList, String path);
Expand All @@ -54,6 +56,12 @@ public interface HadoopXmlResourceMessages {
@Message(level = MessageLevel.ERROR, text = "Parsing XML configuration {0} failed: {1}")
void failedToParseXmlConfiguration(String path, String errorMessage, @StackTrace(level = MessageLevel.DEBUG) Exception e);

@Message(level = MessageLevel.DEBUG, text = "Processing Hadoop XML resource {0} (force = {1}; lastReloadTime = {2}; lastModified = {3})")
void processHadoopXmlResource(String descriptorPath, boolean force, FileTime lastReloadTime, FileTime lastModifiedTime);

@Message(level = MessageLevel.DEBUG, text = "Skipping Hadoop XML resource monitoring of {0} (force = {1}; lastReloadTime = {2}; lastModified = {3})")
void skipMonitorHadoopXmlResource(String descriptorPath, boolean force, FileTime lastReloadTime, FileTime lastModifiedTime);

@Message(level = MessageLevel.ERROR, text = "Error while monitoring Hadoop style XML configuration {0}: {1}")
void failedToMonitorHadoopXmlResource(String descriptorPath, String errorMessage, @StackTrace(level = MessageLevel.DEBUG) Exception e);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.SuffixFileFilter;
Expand All @@ -52,50 +56,57 @@ public class HadoopXmlResourceMonitor implements AdvancedServiceDiscoveryConfigC
private final String descriptorsDir;
private final long monitoringInterval;
private final HadoopXmlResourceParser hadoopXmlResourceParser;
private FileTime lastReloadTime;
private final Map<Path, FileTime> lastReloadTimes;
private final Lock monitorLock = new ReentrantLock();

public HadoopXmlResourceMonitor(GatewayConfig gatewayConfig, HadoopXmlResourceParser hadoopXmlResourceParser) {
this.hadoopXmlResourceParser = hadoopXmlResourceParser;
this.sharedProvidersDir = gatewayConfig.getGatewayProvidersConfigDir();
this.descriptorsDir = gatewayConfig.getGatewayDescriptorsDir();
this.monitoringInterval = gatewayConfig.getClouderaManagerDescriptorsMonitoringInterval();
this.lastReloadTimes = new ConcurrentHashMap<>();
}

public void setupMonitor() {
if (monitoringInterval > 0) {
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("ClouderaManagerDescriptorMonitor-%d").build());
executorService.scheduleAtFixedRate(() -> monitorClouderaManagerDescriptors(null), 0, monitoringInterval, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(() -> monitorClouderaManagerDescriptors(false), 0, monitoringInterval, TimeUnit.MILLISECONDS);
LOG.monitoringHadoopXmlResources(descriptorsDir);
} else {
LOG.disableMonitoringHadoopXmlResources();
}
}

private void monitorClouderaManagerDescriptors(String topologyName) {
private void monitorClouderaManagerDescriptors(boolean force) {
final File[] clouderaManagerDescriptorFiles = new File(descriptorsDir).listFiles((FileFilter) new SuffixFileFilter(HADOOP_XML_RESOURCE_FILE_EXTENSION));
for (File clouderaManagerDescriptorFile : clouderaManagerDescriptorFiles) {
monitorClouderaManagerDescriptor(Paths.get(clouderaManagerDescriptorFile.getAbsolutePath()), topologyName);
monitorClouderaManagerDescriptor(Paths.get(clouderaManagerDescriptorFile.getAbsolutePath()), force);
}
}

private void monitorClouderaManagerDescriptor(Path clouderaManagerDescriptorFile, String topologyName) {
private void monitorClouderaManagerDescriptor(Path clouderaManagerDescriptorFile, boolean force) {
monitorLock.lock();
try {
if (Files.isReadable(clouderaManagerDescriptorFile)) {
final FileTime lastModifiedTime = Files.getLastModifiedTime(clouderaManagerDescriptorFile);
if (topologyName != null || lastReloadTime == null || lastReloadTime.compareTo(lastModifiedTime) < 0) {
lastReloadTime = lastModifiedTime;
processClouderaManagerDescriptor(clouderaManagerDescriptorFile.toString(), topologyName);
FileTime lastReloadTime = lastReloadTimes.get(clouderaManagerDescriptorFile);
if (force || lastReloadTime == null || lastReloadTime.compareTo(lastModifiedTime) < 0) {
lastReloadTimes.put(clouderaManagerDescriptorFile, lastModifiedTime);
LOG.processHadoopXmlResource(clouderaManagerDescriptorFile.toString(), force, lastReloadTime, lastModifiedTime);
processClouderaManagerDescriptor(clouderaManagerDescriptorFile.toString());
} else {
LOG.skipMonitorHadoopXmlResource(clouderaManagerDescriptorFile.toString(), force, lastReloadTime, lastModifiedTime);
}
} else {
LOG.failedToMonitorHadoopXmlResource(clouderaManagerDescriptorFile.toString(), "File is not readable!", null);
}
} catch (IOException e) {
LOG.failedToMonitorHadoopXmlResource(clouderaManagerDescriptorFile.toString(), e.getMessage(), e);
} finally {
monitorLock.unlock();
}
}

private void processClouderaManagerDescriptor(String descriptorFilePath, String topologyName) {
final HadoopXmlResourceParserResult result = hadoopXmlResourceParser.parse(descriptorFilePath, topologyName);
private void processClouderaManagerDescriptor(String descriptorFilePath) {
final HadoopXmlResourceParserResult result = hadoopXmlResourceParser.parse(descriptorFilePath);
processSharedProviders(result);
processDescriptors(result);
processDeleted(descriptorsDir, result.getDeletedDescriptors(), ".json");
Expand Down Expand Up @@ -160,6 +171,6 @@ public void onAdvancedServiceDiscoveryConfigurationChange(Properties newConfigur
if (StringUtils.isBlank(topologyName)) {
throw new IllegalArgumentException("Invalid advanced service discovery configuration: topology name is missing!");
}
monitorClouderaManagerDescriptors(topologyName);
monitorClouderaManagerDescriptors(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,33 +82,21 @@ public HadoopXmlResourceParser(GatewayConfig gatewayConfig) {
this.sharedProvidersDir = gatewayConfig.getGatewayProvidersConfigDir();
}

/**
* Produces a set of {@link SimpleDescriptor}s from the specified file. Parses ALL descriptors listed in the given file.
*
* @param path
* The path to the configuration file which holds descriptor information in a pre-defined format.
* @return A SimpleDescriptor based on the contents of the given file.
*/
public HadoopXmlResourceParserResult parse(String path) {
return parse(path, null);
}

/**
* Produces a set of {@link SimpleDescriptor}s from the specified file.
*
* @param path
* The path to the configuration file which holds descriptor information in a pre-defined format.
* @param topologyName
* if set, the parser should only parse a descriptor with the same name
* @return A SimpleDescriptor based on the contents of the given file.
*/
public HadoopXmlResourceParserResult parse(String path, String topologyName) {
public HadoopXmlResourceParserResult parse(String path) {
try {
log.parseHadoopXmlResource(path, topologyName == null ? "all topologies" : topologyName);
log.parseHadoopXmlResource(path);
final Configuration xmlConfiguration = new Configuration(false);
xmlConfiguration.addResource(Paths.get(path).toUri().toURL());
xmlConfiguration.reloadConfiguration();
final HadoopXmlResourceParserResult parserResult = parseXmlConfig(xmlConfiguration, topologyName);
final HadoopXmlResourceParserResult parserResult = parseXmlConfig(xmlConfiguration);
logParserResult(path, parserResult);
return parserResult;
} catch (Exception e) {
Expand All @@ -132,7 +120,7 @@ private void logParserResult(String path, final HadoopXmlResourceParserResult pa
}
}

private HadoopXmlResourceParserResult parseXmlConfig(Configuration xmlConfiguration, String topologyName) {
private HadoopXmlResourceParserResult parseXmlConfig(Configuration xmlConfiguration) {
final Map<String, ProviderConfiguration> providers = new LinkedHashMap<>();
final Set<SimpleDescriptor> descriptors = new LinkedHashSet<>();
Set<String> deletedDescriptors = new HashSet<>();
Expand All @@ -143,8 +131,8 @@ private HadoopXmlResourceParserResult parseXmlConfig(Configuration xmlConfigurat
final String[] providerConfigurations = xmlConfigurationKey.replace(CONFIG_NAME_PROVIDER_CONFIGS_PREFIX, "").split(",");
Arrays.stream(providerConfigurations).map(String::trim).forEach(providerConfigurationName ->
parseProvider(providerConfigurationName, xmlDescriptor.getValue(), providers, deletedProviders));
} else if (topologyName == null || xmlConfigurationKey.equals(topologyName)) {
parseDescriptor(xmlConfigurationKey, xmlDescriptor.getValue(), descriptors, deletedDescriptors);
} else {
parseDescriptor(xmlConfigurationKey, xmlDescriptor.getValue(), descriptors, deletedDescriptors);
}
});
return new HadoopXmlResourceParserResult(providers, descriptors, deletedDescriptors, deletedProviders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,6 @@ public void testFilteredProviderName() throws Exception {
assertNotNull(parserResult.getProviders().get("admin"));
}

@Test
public void testCMDescriptorParserOnlyTopology2() throws Exception {
final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptor.xml").getPath();
final Set<SimpleDescriptor> descriptors = hadoopXmlResourceParser.parse(testConfigPath, "topology2").getDescriptors();
assertEquals(1, descriptors.size());
validateTopology2Descriptors(descriptors.iterator().next(), true);
}

@Test
public void testCMDescriptorParserWrongDescriptorContent() throws Exception {
final String testConfigPath = this.getClass().getClassLoader().getResource("testDescriptorConfigurationWithWrongDescriptor.xml").getPath();
Expand Down

0 comments on commit 8c4db7a

Please sign in to comment.