Skip to content

Commit

Permalink
[HWORKS-437][Append] Fix adminClient initialization and make it state…
Browse files Browse the repository at this point in the history
…less (#1404)
  • Loading branch information
SirOibaf authored May 22, 2023
1 parent b231040 commit 579b152
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package io.hops.hopsworks.common.dao.kafka;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.security.BaseHadoopClientsService;
import io.hops.hopsworks.exceptions.KafkaException;
import io.hops.hopsworks.restutils.RESTCodes;
import io.hops.hopsworks.servicediscovery.HopsworksService;
import io.hops.hopsworks.servicediscovery.tags.KafkaTags;
import org.apache.kafka.clients.CommonClientConfigs;
Expand All @@ -29,23 +32,16 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;

import javax.annotation.PostConstruct;
import javax.ejb.AccessTimeout;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.DependsOn;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.time.Duration;
import javax.ejb.Stateless;
import java.util.Collection;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

@Singleton
@DependsOn("ServiceDiscoveryController")
@Stateless
@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER)
public class HopsKafkaAdminClient {

Expand All @@ -56,34 +52,11 @@ public class HopsKafkaAdminClient {
@EJB
private ServiceDiscoveryController serviceDiscoveryController;

private String brokerFQDN;
private AdminClient adminClient;

@PostConstruct
private void init() {
try {
LOG.log(Level.FINE, "Initializing Kafka client");
brokerFQDN = serviceDiscoveryController.constructServiceAddressWithPort(
HopsworksService.KAFKA.getNameWithTag(KafkaTags.broker));
initClient();
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka is currently unavailable. Will periodically retry to connect");
}
}

@AccessTimeout(value = 5000)
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
private void initClient() {
if (adminClient != null) {
try {
LOG.log(Level.FINE, "Will attempt to close current kafka client");
adminClient.close(Duration.ofSeconds(3));
} catch (Exception e) {
LOG.log(Level.WARNING, "Could not close adminClient, will continue with initialization", e);
}
}
private AdminClient initClient() throws ServiceDiscoveryException {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerFQDN);
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
serviceDiscoveryController.constructServiceFQDNWithPort(
(HopsworksService.KAFKA.getNameWithTag(KafkaTags.broker))));
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, KafkaConst.KAFKA_SECURITY_PROTOCOL);
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, baseHadoopService.getSuperTrustStorePath());
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, baseHadoopService.getSuperTrustStorePassword());
Expand All @@ -93,46 +66,42 @@ private void initClient() {
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
KafkaConst.KAFKA_ENDPOINT_IDENTIFICATION_ALGORITHM);
LOG.log(Level.FINE, "Will attempt to initialize current kafka client");
adminClient = AdminClient.create(props);
return AdminClient.create(props);
}

public ListTopicsResult listTopics() {
try {
public ListTopicsResult listTopics() throws KafkaException {
try (AdminClient adminClient = initClient()) {
return adminClient.listTopics();
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka cluster is unavailable", e);
initClient();
return adminClient.listTopics();
throw new KafkaException(
RESTCodes.KafkaErrorCode.BROKER_METADATA_ERROR, Level.WARNING, e.getMessage(), e.getMessage(), e);
}
}

public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
try {
public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) throws KafkaException {
try (AdminClient adminClient = initClient()) {
return adminClient.createTopics(newTopics);
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka cluster is unavailable", e);
initClient();
return adminClient.createTopics(newTopics);
throw new KafkaException(
RESTCodes.KafkaErrorCode.TOPIC_CREATION_FAILED, Level.WARNING, e.getMessage(), e.getMessage(), e);
}
}

public DeleteTopicsResult deleteTopics(Collection<String> topics) {
try {
public DeleteTopicsResult deleteTopics(Collection<String> topics) throws KafkaException {
try (AdminClient adminClient = initClient()){
return adminClient.deleteTopics(topics);
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka cluster is unavailable", e);
initClient();
return adminClient.deleteTopics(topics);
throw new KafkaException(
RESTCodes.KafkaErrorCode.TOPIC_DELETION_FAILED, Level.WARNING, e.getMessage(), e.getMessage(), e);
}
}

public DescribeTopicsResult describeTopics(Collection<String> topics) {
try {
public DescribeTopicsResult describeTopics(Collection<String> topics) throws KafkaException {
try (AdminClient adminClient = initClient()){
return adminClient.describeTopics(topics);
} catch (Exception e) {
LOG.log(Level.WARNING, "Kafka cluster is unavailable", e);
initClient();
return adminClient.describeTopics(topics);
throw new KafkaException(
RESTCodes.KafkaErrorCode.BROKER_METADATA_ERROR, Level.WARNING, e.getMessage(), e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) th
return pt;
}

private KafkaFuture<CreateTopicsResult> createTopicInKafka(TopicDTO topicDTO) {
private KafkaFuture<CreateTopicsResult> createTopicInKafka(TopicDTO topicDTO) throws KafkaException {
return hopsKafkaAdminClient.listTopics().names().thenApply(
set -> {
if (set.contains(topicDTO.getName())) {
Expand All @@ -251,7 +251,8 @@ private KafkaFuture<CreateTopicsResult> createTopicInKafka(TopicDTO topicDTO) {
});
}

private KafkaFuture<List<PartitionDetailsDTO>> getTopicDetailsFromKafkaCluster(String topicName) {
private KafkaFuture<List<PartitionDetailsDTO>> getTopicDetailsFromKafkaCluster(String topicName)
throws KafkaException{
return hopsKafkaAdminClient.describeTopics(Collections.singleton(topicName))
.all()
.thenApply((map) -> map.getOrDefault(topicName, null))
Expand Down Expand Up @@ -279,9 +280,8 @@ private KafkaFuture<List<PartitionDetailsDTO>> getTopicDetailsFromKafkaCluster(S
});
}

public KafkaFuture<List<PartitionDetailsDTO>> getTopicDetails(Project project, String topicName) throws
KafkaException {

public KafkaFuture<List<PartitionDetailsDTO>> getTopicDetails(Project project, String topicName)
throws KafkaException {
projectTopicsFacade.findTopicByNameAndProject(project, topicName).orElseThrow(() ->
new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_NOT_FOUND, Level.FINE, "topic: " + topicName)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1621,11 +1621,10 @@ public void cleanup(Project project, List<Future<?>> projectCreationFutures,
}

private void removeProjectInt(Project project, List<HdfsUsers> usersToClean,
List<HdfsGroups> groupsToClean, List<Future<?>> projectCreationFutures,
boolean decreaseCreatedProj, Users owner)
throws IOException, InterruptedException, HopsSecurityException,
ServiceException, ProjectException,
GenericException, TensorBoardException, FeaturestoreException {
List<HdfsGroups> groupsToClean, List<Future<?>> projectCreationFutures,
boolean decreaseCreatedProj, Users owner)
throws IOException, InterruptedException, HopsSecurityException, ServiceException, ProjectException,
GenericException, TensorBoardException, KafkaException, FeaturestoreException {
DistributedFileSystemOps dfso = null;
try {
dfso = dfs.getDfsOps();
Expand Down Expand Up @@ -1761,7 +1760,7 @@ private List<HdfsGroups> getGroupsToClean(Project project) {
}

@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
private void removeKafkaTopics(Project project) {
private void removeKafkaTopics(Project project) throws KafkaException {
List<ProjectTopics> topics = projectTopicsFacade.findTopicsByProject(project);

List<String> topicNameList = topics.stream()
Expand Down

0 comments on commit 579b152

Please sign in to comment.