Skip to content

Commit

Permalink
Adding file check and completing happy case of testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark2307 committed Jan 20, 2025
1 parent 0225556 commit f6a8ff4
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 25 deletions.
4 changes: 4 additions & 0 deletions apps/testing/src/main/java/com/akto/testing/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ public void run() {
loggerMaker.infoAndAddToDb("os.arch: " + System.getProperty("os.arch"), LogDb.TESTING);
loggerMaker.infoAndAddToDb("os.version: " + System.getProperty("os.version"), LogDb.TESTING);

// create /testing-info folder in the memory from here
boolean val = Utils.createFolder(Constants.TESTING_STATE_FOLDER_PATH);
logger.info("Testing info folder status: " + val);

SingleTypeInfo.init();
while (true) {
AccountTask.instance.executeTaskForNonHybridAccounts(account -> {
Expand Down
14 changes: 13 additions & 1 deletion apps/testing/src/main/java/com/akto/testing/TestExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.slf4j.LoggerFactory;

import static com.akto.test_editor.execution.Build.modifyRequest;
import static com.akto.testing.Utils.writeJsonContentInFile;

import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -149,6 +150,13 @@ public void workflowInit (TestingRun testingRun, ObjectId summaryId, boolean deb
}

public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug, List<TestingRunResult.TestLog> testLogs, SyncLimit syncLimit, boolean shouldInitOnly) {

// write producer running here as producer has been initiated now
BasicDBObject dbObject = new BasicDBObject();
dbObject.put("PRODUCER_RUNNING", true);
dbObject.put("CONSUMER_RUNNING", false);
writeJsonContentInFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, dbObject);

int accountId = Context.accountId.get();
TestingEndpoints testingEndpoints = testingRun.getTestingEndpoints();

Expand Down Expand Up @@ -340,7 +348,11 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug
} catch (InterruptedException e) {
throw new RuntimeException(e);
}


dbObject = new BasicDBObject();
dbObject.put("PRODUCER_RUNNING", false);
dbObject.put("CONSUMER_RUNNING", true);
writeJsonContentInFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, dbObject);
loggerMaker.infoAndAddToDb("Finished inserting records in kafka", LogDb.TESTING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@

public class TestingConsumer {

Properties properties = com.akto.runtime.utils.Utils.configProperties(Constants.LOCAL_KAFKA_BROKER_URL, Constants.AKTO_KAFKA_GROUP_ID_CONFIG, Constants.AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG);
private Consumer<String, String> consumer = new KafkaConsumer<>(properties); ;
static Properties properties = com.akto.runtime.utils.Utils.configProperties(Constants.LOCAL_KAFKA_BROKER_URL, Constants.AKTO_KAFKA_GROUP_ID_CONFIG, Constants.AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG);
private static Consumer<String, String> consumer = new KafkaConsumer<>(properties);
final private TestExecutor testExecutor = new TestExecutor();

private static final LoggerMaker loggerMaker = new LoggerMaker(TestingConsumer.class);
Expand Down Expand Up @@ -87,28 +87,13 @@ public Void runTestFromMessage(String message){
}
public void init(){
initializeConsumer();
consumer.wakeup();

String topicName = Constants.TEST_RESULTS_TOPIC_NAME;

final Thread mainThread = Thread.currentThread();
final AtomicBoolean exceptionOnCommitSync = new AtomicBoolean(false);

List<Future<Void>> futures = new ArrayList<>();

Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.wakeup();
try {
if (!exceptionOnCommitSync.get()) {
mainThread.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Error e){
loggerMaker.errorAndAddToDb("Error in main thread: "+ e.getMessage(), LogDb.TESTING);
}
}
});

long lastSyncOffset = 0;
try {
consumer.subscribe(Arrays.asList(topicName));
Expand All @@ -118,6 +103,11 @@ public void run() {

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
if(records.isEmpty()){
logger.info("Returning as no records were found, so now complete the test.");
consumer.close();
return;
}
try {
consumer.commitSync();
} catch (Exception e) {
Expand All @@ -143,7 +133,6 @@ public void run() {
Utils.printL(e);
loggerMaker.errorAndAddToDb("Error in main testing consumer: " + e.getMessage(),LogDb.TESTING);
e.printStackTrace();
System.exit(0);
} finally {
consumer.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
package com.akto.testing.testing_with_kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.bson.types.ObjectId;

import com.akto.dto.billing.SyncLimit;
Expand All @@ -23,8 +36,47 @@ public static Void pushMessagesToKafka(List<TestMessages> messages){
return null;
}

private boolean shouldClearKafkaRecords(TestingRun testingRun, ObjectId summaryId){
return false;
private static void deleteAllMessagesFromTopic(String bootstrapServers, String topicName)
throws ExecutionException, InterruptedException {

// 1) Build minimal properties for AdminClient
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

// 2) Create AdminClient
try (AdminClient adminClient = AdminClient.create(adminProps)) {

// 3) Describe the topic to get partition info
DescribeTopicsResult describeTopicsResult = adminClient
.describeTopics(Collections.singletonList(topicName));
TopicDescription topicDescription = describeTopicsResult.values()
.get(topicName).get(); // may throw if topic doesn’t exist

// 4) Collect partitions for the topic
List<TopicPartition> topicPartitions = topicDescription.partitions().stream()
.map(info -> new TopicPartition(topicName, info.partition()))
.collect(Collectors.toList());

// 5) Request the latest offsets for each partition
Map<TopicPartition, org.apache.kafka.clients.admin.OffsetSpec> latestOffsetsRequest =
new HashMap<>();
for (TopicPartition tp : topicPartitions) {
latestOffsetsRequest.put(tp, org.apache.kafka.clients.admin.OffsetSpec.latest());
}
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(latestOffsetsRequest);

// 6) Build the map for deleteRecords (partition -> RecordsToDelete)
Map<TopicPartition, RecordsToDelete> partitionRecordsToDelete = new HashMap<>();
for (TopicPartition tp : topicPartitions) {
long latestOffset = listOffsetsResult.partitionResult(tp).get().offset();
partitionRecordsToDelete.put(tp, RecordsToDelete.beforeOffset(latestOffset));
}

// 7) Delete all records up to the latest offset
adminClient.deleteRecords(partitionRecordsToDelete).all().get();

System.out.println("All existing messages in topic \"" + topicName + "\" have been deleted.");
}
}

private boolean isKafkaEmpty(){
Expand All @@ -35,6 +87,13 @@ public void initProducer(TestingRun testingRun, ObjectId summaryId, SyncLimit sy
TestExecutor executor = new TestExecutor();

boolean doInitOnly = !isKafkaEmpty();
if(doInitOnly){
try {
deleteAllMessagesFromTopic(Constants.LOCAL_KAFKA_BROKER_URL, Constants.TEST_RESULTS_TOPIC_NAME);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
executor.init(testingRun, summaryId, syncLimit, doInitOnly);
}
}
2 changes: 2 additions & 0 deletions libs/dao/src/main/java/com/akto/util/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ private Constants() {}
public static final String TEST_RESULTS_TOPIC_NAME = "akto.test.messages";
public static final String AKTO_KAFKA_GROUP_ID_CONFIG = "asdf";
public static final int AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG = 100;
public static final String TESTING_STATE_FOLDER_PATH = "testing-info";
public static final String TESTING_STATE_FILE_NAME = "testing-state.json";

public static final String UNDERSCORE = "_";

Expand Down
41 changes: 39 additions & 2 deletions libs/utils/src/main/java/com/akto/testing/Utils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.akto.testing;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -20,7 +21,6 @@
import com.akto.dao.testing_run_findings.TestingRunIssuesDao;
import com.akto.dto.ApiInfo.ApiInfoKey;
import com.akto.dto.CollectionConditions.ConditionsType;
import com.akto.dto.ApiInfo;
import com.akto.dto.OriginalHttpRequest;
import com.akto.dto.RawApi;
import com.akto.dto.test_editor.DataOperandsFilterResponse;
Expand All @@ -32,7 +32,6 @@
import com.akto.dto.testing.TestResult;
import com.akto.dto.testing.TestResult.Confidence;
import com.akto.dto.testing.TestResult.TestError;
import com.akto.dto.testing.TestingRunConfig;
import com.akto.dto.testing.TestingRunResult;
import com.akto.dto.testing.WorkflowUpdatedSampleData;
import com.akto.dto.type.RequestTemplate;
Expand All @@ -46,6 +45,7 @@
import com.akto.util.JSONUtils;
import com.akto.util.enums.GlobalEnums;
import com.akto.util.enums.GlobalEnums.Severity;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoCursor;
Expand Down Expand Up @@ -530,5 +530,42 @@ public static TestingRunResult generateFailedRunResultForMessage(ObjectId testin
}
return testingRunResult;
}

public static boolean createFolder(String folderName){
File statusDir = new File(folderName);

if (!statusDir.exists()) {
boolean created = statusDir.mkdirs();
if (!created) {
System.err.println("Failed to create directory: " + folderName);
return false;
}
return true;
}
return false;
}

public static void writeJsonContentInFile(String folderName, String fileName, Object content){
try {
File file = new File(folderName, fileName);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.writerWithDefaultPrettyPrinter().writeValue(file, content);
} catch (Exception e) {
e.printStackTrace();
}
}

public static <T> T readJsonContentFromFile(String folderName, String fileName, Class<T> valueType) {
T result = null;
try {
File file = new File(folderName, fileName);
ObjectMapper objectMapper = new ObjectMapper();
result = objectMapper.readValue(file, valueType);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}


}

0 comments on commit f6a8ff4

Please sign in to comment.