diff --git a/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsListener.java b/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsListener.java index 8d46277e..031cf9d9 100644 --- a/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsListener.java +++ b/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsListener.java @@ -4,6 +4,7 @@ import static datawave.microservice.query.messaging.AcknowledgementCallback.Status.NACK; import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.TOPIC_PREFIX; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -154,7 +155,7 @@ public void onMessage(ConsumerRecord data, final Acknowledgment a latch.countDown(); }); } catch (JsonProcessingException e) { - acknowledgment.nack(0); + acknowledgment.nack(Duration.ofNanos(0)); if (log.isTraceEnabled()) { log.trace("Query {} Nacking record from topic {} and partition {} at offset {} because the result could not be deserialized", queryId, data.topic(), data.partition(), data.offset()); @@ -181,21 +182,21 @@ public void onMessage(ConsumerRecord data, final Acknowledgment a data.offset()); } } else if (ackStatus.get() == NACK) { - acknowledgment.nack(0); + acknowledgment.nack(Duration.ofNanos(0)); if (log.isTraceEnabled()) { log.trace("Query {} Nacking record {} from topic {} and partition {} at offset {} because the record was rejected", queryId, resultId, data.topic(), data.partition(), data.offset()); } } } catch (InterruptedException ie) { - acknowledgment.nack(0); + acknowledgment.nack(Duration.ofNanos(0)); if (log.isTraceEnabled()) { log.trace("Query {} Nacking record {} from topic {} and partition {} at offset {} because the latch was interrupted", queryId, resultId, data.topic(), data.partition(), data.offset()); } } } else { - acknowledgment.nack(0); + acknowledgment.nack(Duration.ofNanos(0)); if (log.isTraceEnabled()) { log.trace("Query {} Nacking record from topic {} and partition {} at offset {} because the container was stopped", queryId, data.topic(), data.partition(), data.offset()); diff --git a/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java b/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java index f82c249e..a69ef38b 100644 --- a/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java +++ b/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java @@ -192,7 +192,7 @@ private TopicDescription describeTopic(String topic) { // @formatter:off topicDesc = adminClient .describeTopics(Collections.singleton(topic)) - .values() + .topicNameValues() .get(topic) .get(); // @formatter:on diff --git a/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java b/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java index 2caa752f..1c583ee5 100644 --- a/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java +++ b/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java @@ -66,8 +66,6 @@ public CachingConnectionFactory createCachingConnectionFactory(MessagingProperti finalConnectionFactory.setPublisherConfirmType(rabbitMqProperties.getPublisherConfirmType()); } - finalConnectionFactory.setPublisherConfirms(CachingConnectionFactory.ConfirmType.SIMPLE != rabbitMqProperties.getPublisherConfirmType() - && rabbitMqProperties.isPublisherConfirms()); finalConnectionFactory.setPublisherReturns(rabbitMqProperties.isPublisherReturns()); } return finalConnectionFactory;