From 0f6d46d42f0ba46d533c2780dfeaa403258ccb5e Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Mon, 22 May 2023 09:17:03 +0200 Subject: [PATCH] [HWORKS-437][Append] Fix adminClient initialization and make it stateless (#1404) --- .../dao/kafka/HopsKafkaAdminClient.java | 83 ++++++------------- .../common/kafka/KafkaController.java | 10 +-- .../common/project/ProjectController.java | 11 ++- 3 files changed, 36 insertions(+), 68 deletions(-) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/HopsKafkaAdminClient.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/HopsKafkaAdminClient.java index 2392f8c0d9..77e35ebb9a 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/HopsKafkaAdminClient.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/HopsKafkaAdminClient.java @@ -15,8 +15,11 @@ */ package io.hops.hopsworks.common.dao.kafka; +import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; import io.hops.hopsworks.common.security.BaseHadoopClientsService; +import io.hops.hopsworks.exceptions.KafkaException; +import io.hops.hopsworks.restutils.RESTCodes; import io.hops.hopsworks.servicediscovery.HopsworksService; import io.hops.hopsworks.servicediscovery.tags.KafkaTags; import org.apache.kafka.clients.CommonClientConfigs; @@ -29,23 +32,16 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; -import javax.annotation.PostConstruct; -import javax.ejb.AccessTimeout; import javax.ejb.ConcurrencyManagement; import javax.ejb.ConcurrencyManagementType; -import javax.ejb.DependsOn; import javax.ejb.EJB; -import javax.ejb.Singleton; -import javax.ejb.TransactionAttribute; -import javax.ejb.TransactionAttributeType; -import java.time.Duration; +import javax.ejb.Stateless; import java.util.Collection; import java.util.Properties; import java.util.logging.Level; import java.util.logging.Logger; -@Singleton -@DependsOn("ServiceDiscoveryController") +@Stateless @ConcurrencyManagement(ConcurrencyManagementType.CONTAINER) public class HopsKafkaAdminClient { @@ -56,34 +52,11 @@ public class HopsKafkaAdminClient { @EJB private ServiceDiscoveryController serviceDiscoveryController; - private String brokerFQDN; - private AdminClient adminClient; - - @PostConstruct - private void init() { - try { - LOG.log(Level.FINE, "Initializing Kafka client"); - brokerFQDN = serviceDiscoveryController.constructServiceAddressWithPort( - HopsworksService.KAFKA.getNameWithTag(KafkaTags.broker)); - initClient(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Kafka is currently unavailable. Will periodically retry to connect"); - } - } - - @AccessTimeout(value = 5000) - @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) - private void initClient() { - if (adminClient != null) { - try { - LOG.log(Level.FINE, "Will attempt to close current kafka client"); - adminClient.close(Duration.ofSeconds(3)); - } catch (Exception e) { - LOG.log(Level.WARNING, "Could not close adminClient, will continue with initialization", e); - } - } + private AdminClient initClient() throws ServiceDiscoveryException { Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerFQDN); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + serviceDiscoveryController.constructServiceFQDNWithPort( + (HopsworksService.KAFKA.getNameWithTag(KafkaTags.broker)))); props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, KafkaConst.KAFKA_SECURITY_PROTOCOL); props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, baseHadoopService.getSuperTrustStorePath()); props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, baseHadoopService.getSuperTrustStorePassword()); @@ -93,46 +66,42 @@ private void initClient() { props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM); LOG.log(Level.FINE, "Will attempt to initialize current kafka client"); - adminClient = AdminClient.create(props); + return AdminClient.create(props); } - public ListTopicsResult listTopics() { - try { + public ListTopicsResult listTopics() throws KafkaException { + try (AdminClient adminClient = initClient()) { return adminClient.listTopics(); } catch (Exception e) { - LOG.log(Level.WARNING, "Kafka cluster is unavailable", e); - initClient(); - return adminClient.listTopics(); + throw new KafkaException( + RESTCodes.KafkaErrorCode.BROKER_METADATA_ERROR, Level.WARNING, e.getMessage(), e.getMessage(), e); } } - public CreateTopicsResult createTopics(Collection newTopics) { - try { + public CreateTopicsResult createTopics(Collection newTopics) throws KafkaException { + try (AdminClient adminClient = initClient()) { return adminClient.createTopics(newTopics); } catch (Exception e) { - LOG.log(Level.WARNING, "Kafka cluster is unavailable", e); - initClient(); - return adminClient.createTopics(newTopics); + throw new KafkaException( + RESTCodes.KafkaErrorCode.TOPIC_CREATION_FAILED, Level.WARNING, e.getMessage(), e.getMessage(), e); } } - public DeleteTopicsResult deleteTopics(Collection topics) { - try { + public DeleteTopicsResult deleteTopics(Collection topics) throws KafkaException { + try (AdminClient adminClient = initClient()){ return adminClient.deleteTopics(topics); } catch (Exception e) { - LOG.log(Level.WARNING, "Kafka cluster is unavailable", e); - initClient(); - return adminClient.deleteTopics(topics); + throw new KafkaException( + RESTCodes.KafkaErrorCode.TOPIC_DELETION_FAILED, Level.WARNING, e.getMessage(), e.getMessage(), e); } } - public DescribeTopicsResult describeTopics(Collection topics) { - try { + public DescribeTopicsResult describeTopics(Collection topics) throws KafkaException { + try (AdminClient adminClient = initClient()){ return adminClient.describeTopics(topics); } catch (Exception e) { - LOG.log(Level.WARNING, "Kafka cluster is unavailable", e); - initClient(); - return adminClient.describeTopics(topics); + throw new KafkaException( + RESTCodes.KafkaErrorCode.BROKER_METADATA_ERROR, Level.WARNING, e.getMessage(), e.getMessage(), e); } } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java index afcd992035..4f27929390 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java @@ -233,7 +233,7 @@ public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) th return pt; } - private KafkaFuture createTopicInKafka(TopicDTO topicDTO) { + private KafkaFuture createTopicInKafka(TopicDTO topicDTO) throws KafkaException { return hopsKafkaAdminClient.listTopics().names().thenApply( set -> { if (set.contains(topicDTO.getName())) { @@ -251,7 +251,8 @@ private KafkaFuture createTopicInKafka(TopicDTO topicDTO) { }); } - private KafkaFuture> getTopicDetailsFromKafkaCluster(String topicName) { + private KafkaFuture> getTopicDetailsFromKafkaCluster(String topicName) + throws KafkaException{ return hopsKafkaAdminClient.describeTopics(Collections.singleton(topicName)) .all() .thenApply((map) -> map.getOrDefault(topicName, null)) @@ -279,9 +280,8 @@ private KafkaFuture> getTopicDetailsFromKafkaCluster(S }); } - public KafkaFuture> getTopicDetails(Project project, String topicName) throws - KafkaException { - + public KafkaFuture> getTopicDetails(Project project, String topicName) + throws KafkaException { projectTopicsFacade.findTopicByNameAndProject(project, topicName).orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName) ); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java index ef552de471..8f616a5e1d 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java @@ -1605,11 +1605,10 @@ public void cleanup(Project project, List> projectCreationFutures, } private void removeProjectInt(Project project, List usersToClean, - List groupsToClean, List> projectCreationFutures, - boolean decreaseCreatedProj, Users owner) - throws IOException, InterruptedException, HopsSecurityException, - ServiceException, ProjectException, - GenericException, TensorBoardException, FeaturestoreException { + List groupsToClean, List> projectCreationFutures, + boolean decreaseCreatedProj, Users owner) + throws IOException, InterruptedException, HopsSecurityException, ServiceException, ProjectException, + GenericException, TensorBoardException, KafkaException, FeaturestoreException { DistributedFileSystemOps dfso = null; try { dfso = dfs.getDfsOps(); @@ -1745,7 +1744,7 @@ private List getGroupsToClean(Project project) { } @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) - private void removeKafkaTopics(Project project) { + private void removeKafkaTopics(Project project) throws KafkaException { List topics = projectTopicsFacade.findTopicsByProject(project); List topicNameList = topics.stream()