Skip to content

Commit

Permalink
Completing basic code to run tests from consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark2307 committed Jan 18, 2025
1 parent cc34d40 commit 0225556
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public String runTestForGivenTemplate() {
List<String> samples = testingUtil.getSampleMessages().get(infoKey);
TestingRunResult testingRunResult = Utils.generateFailedRunResultForMessage(null, infoKey, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory(), null,samples , null);
if(testingRunResult == null){
testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, testingRunConfig, true, testLogs, samples.get(samples.size() - 1), Context.accountId.get());
testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, testingRunConfig, true, testLogs, samples.get(samples.size() - 1));
}
if (testingRunResult == null) {
testingRunResult = new TestingRunResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public static void main(String[] args) {
List<String> samples = testingUtil.getSampleMessages().get(it);
testingRunResult = Utils.generateFailedRunResultForMessage(null, it, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory(), null,samples , null);
if(testingRunResult == null){
testingRunResult = testExecutor.runTestNew(it, null, testingUtil, null, testConfig, null, false, new ArrayList<>(), samples.get(samples.size() - 1), Context.accountId.get());
testingRunResult = testExecutor.runTestNew(it, null, testingUtil, null, testConfig, null, false, new ArrayList<>(), samples.get(samples.size() - 1));
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
6 changes: 5 additions & 1 deletion apps/testing/src/main/java/com/akto/testing/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.akto.rules.RequiredConfigs;
import com.akto.task.Cluster;
import com.akto.test_editor.execution.Executor;
import com.akto.testing.testing_with_kafka.TestingConsumer;
import com.akto.testing.testing_with_kafka.TestingProducer;
import com.akto.util.AccountTask;
import com.akto.util.Constants;
import com.akto.util.DashboardMode;
Expand Down Expand Up @@ -537,6 +539,7 @@ public void run() {
}

TestingProducer testingProducer = new TestingProducer();
TestingConsumer testingConsumer = new TestingConsumer();
if (trrs.getState() == State.SCHEDULED) {
if (trrs.getMetadata()!= null && trrs.getMetadata().containsKey("pull_request_id") && trrs.getMetadata().containsKey("commit_sha_head") ) {
//case of github status push
Expand All @@ -548,7 +551,8 @@ public void run() {
if(!maxRetriesReached){
// init producer and the consumer here
// producer for testing is currently calls init functions from test-executor
testingProducer.initProducer(testingRun, summaryId, syncLimit);
testingProducer.initProducer(testingRun, summaryId, syncLimit);
testingConsumer.init();
}

} catch (Exception e) {
Expand Down
19 changes: 14 additions & 5 deletions apps/testing/src/main/java/com/akto/testing/TestExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.akto.dto.testing.TestResult.Confidence;
import com.akto.dto.testing.TestResult.TestError;
import com.akto.dto.testing.TestingRun.State;
import com.akto.dto.testing.info.TestMessages;
import com.akto.dto.type.RequestTemplate;
import com.akto.dto.type.SingleTypeInfo;
import com.akto.dto.type.URLMethods;
Expand All @@ -44,6 +45,8 @@
import com.akto.test_editor.execution.Executor;
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.testing.testing_with_kafka.CommonSingletonForTesting;
import com.akto.testing.testing_with_kafka.TestingProducer;
import com.akto.testing.yaml_tests.YamlTestTemplate;
import com.akto.testing_issues.TestingIssuesHandler;
import com.akto.usage.UsageMetricCalculator;
Expand Down Expand Up @@ -256,6 +259,9 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug
// }
// }

// init the singleton class here
CommonSingletonForTesting.getInstance().init(testingUtil, testingRun.getTestingRunConfig(), debug, testConfigMap);

if(!shouldInitOnly){
int maxThreads = Math.min(testingRunSubCategories.size(), 1000);
if(maxThreads == 0){
Expand All @@ -271,8 +277,6 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug
CountDownLatch latch = new CountDownLatch(apiInfoKeyList.size());

final int maxRunTime = 10 * 60;

TestingConsumer testingConsumer = new TestingConsumer();
for (ApiInfo.ApiInfoKey apiInfoKey: apiInfoKeyList) {
List<String> messages = testingUtil.getSampleMessages().get(apiInfoKey);
for (String testSubCategory: testingRunSubCategories) {
Expand Down Expand Up @@ -302,9 +306,14 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug
loggerMaker.infoAndAddToDb("Skipping test from producers because: " + failMessage + " apiinfo: " + apiInfoKey.toString(), LogDb.TESTING);
}else{
// push data to kafka here and inside that call run test new function
// create an object of TestMessage
TestMessages testMessages = new TestMessages(
testingRun.getId(), summaryId, apiInfoKey, testSubType, testLogs, accountId
);
logger.info("Inserting record for apiInfoKey: " + apiInfoKey.toString() + " subcategory: " + testSubType);
try {
Future<Void> future = threadPool.submit(() ->
testingConsumer.runAndInsertNewTestResult(apiInfoKey, testingRun.getId(), testingUtil, summaryId, testConfig, testingRun.getTestingRunConfig(), debug, testLogs, messages.get(messages.size() - 1), accountId)
TestingProducer.pushMessagesToKafka(Arrays.asList(testMessages))
);
testingRecords.add(future);
} catch (Exception e) {
Expand Down Expand Up @@ -332,7 +341,7 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug
throw new RuntimeException(e);
}

loggerMaker.infoAndAddToDb("Finished testing", LogDb.TESTING);
loggerMaker.infoAndAddToDb("Finished inserting records in kafka", LogDb.TESTING);
}

}
Expand Down Expand Up @@ -763,7 +772,7 @@ public boolean applyRunOnceCheck(ApiInfoKey apiInfoKey, TestConfig testConfig, C
}

public TestingRunResult runTestNew(ApiInfo.ApiInfoKey apiInfoKey, ObjectId testRunId, TestingUtil testingUtil,
ObjectId testRunResultSummaryId, TestConfig testConfig, TestingRunConfig testingRunConfig, boolean debug, List<TestingRunResult.TestLog> testLogs, String message, int accountId) {
ObjectId testRunResultSummaryId, TestConfig testConfig, TestingRunConfig testingRunConfig, boolean debug, List<TestingRunResult.TestLog> testLogs, String message) {

String testSuperType = testConfig.getInfo().getCategory().getName();
String testSubType = testConfig.getInfo().getSubCategory();
Expand Down
36 changes: 0 additions & 36 deletions apps/testing/src/main/java/com/akto/testing/TestingConsumer.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.akto.testing.testing_with_kafka;


import java.util.Map;

import com.akto.dto.test_editor.TestConfig;
import com.akto.dto.testing.TestingRunConfig;
import com.akto.store.TestingUtil;

public class CommonSingletonForTesting {

private static final CommonSingletonForTesting instance = new CommonSingletonForTesting();

private TestingUtil testingUtil;
private TestingRunConfig testingRunConfig;
private boolean debug;
Map<String, TestConfig> testConfigMap;

private CommonSingletonForTesting() {
}

public static CommonSingletonForTesting getInstance() {
return instance;
}

public synchronized void init(TestingUtil testingUtil, TestingRunConfig testingRunConfig, boolean debug, Map<String, TestConfig> testConfigMap) {
this.testingUtil = testingUtil;
this.testingRunConfig = testingRunConfig;
this.debug = debug;
this.testConfigMap = testConfigMap;
}

public boolean isDebug() {
return debug;
}

public TestingRunConfig getTestingRunConfig() {
return testingRunConfig;
}

public TestingUtil getTestingUtil() {
return testingUtil;
}

public Map<String, TestConfig> getTestConfigMap() {
return testConfigMap;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.akto.testing.testing_with_kafka;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.akto.DaoInit;
import com.akto.dao.context.Context;
import com.akto.dto.ApiInfo;
import com.akto.dto.ApiInfo.ApiInfoKey;
import com.akto.dto.test_editor.TestConfig;
import com.akto.dto.testing.TestingRunResult;
import com.akto.dto.testing.info.TestMessages;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
import com.akto.runtime.utils.Utils;
import com.akto.testing.TestExecutor;
import com.akto.util.Constants;
import com.akto.util.DashboardMode;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;

public class TestingConsumer {

Properties properties = com.akto.runtime.utils.Utils.configProperties(Constants.LOCAL_KAFKA_BROKER_URL, Constants.AKTO_KAFKA_GROUP_ID_CONFIG, Constants.AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG);
private Consumer<String, String> consumer = new KafkaConsumer<>(properties); ;
final private TestExecutor testExecutor = new TestExecutor();

private static final LoggerMaker loggerMaker = new LoggerMaker(TestingConsumer.class);
private static final Logger logger = LoggerFactory.getLogger(TestingConsumer.class);
private static final ExecutorService threadPool = Executors.newFixedThreadPool(100);

private void initializeConsumer() {
String mongoURI = System.getenv("AKTO_MONGO_CONN");
ReadPreference readPreference = ReadPreference.secondary();
if(DashboardMode.isOnPremDeployment()){
readPreference = ReadPreference.primary();
}
WriteConcern writeConcern = WriteConcern.W1;
DaoInit.init(new ConnectionString(mongoURI), readPreference, writeConcern);
}

public static TestMessages parseTestMessage(String message) {
JSONObject jsonObject = JSON.parseObject(message);
ObjectId testingRunId = new ObjectId(jsonObject.getString("testingRunId"));
ObjectId testingRunResultSummaryId = new ObjectId(jsonObject.getString("testingRunResultSummaryId"));
ApiInfo.ApiInfoKey apiInfoKey = ApiInfo.getApiInfoKeyFromString(jsonObject.getString("apiInfoKey"));
String subcategory = jsonObject.getString("subcategory");
List<TestingRunResult.TestLog> testLogs = JSON.parseArray(jsonObject.getString("testLogs"), TestingRunResult.TestLog.class);
int accountId = jsonObject.getInteger("accountId");
return new TestMessages(testingRunId, testingRunResultSummaryId, apiInfoKey, subcategory, testLogs, accountId);
}

public Void runTestFromMessage(String message){
TestMessages testMessages = parseTestMessage(message);
Context.accountId.set(testMessages.getAccountId());

CommonSingletonForTesting instance = CommonSingletonForTesting.getInstance();
String subCategory = testMessages.getSubcategory();
TestConfig testConfig = instance.getTestConfigMap().get(subCategory);
ApiInfoKey apiInfoKey = testMessages.getApiInfoKey();

List<String> messagesList = instance.getTestingUtil().getSampleMessages().get(apiInfoKey);
if(messagesList == null || messagesList.isEmpty()){
return null;
}else{
String sample = messagesList.get(messagesList.size() - 1);
logger.info("Running test for: " + apiInfoKey + " with subcategory: " + subCategory);
TestingRunResult runResult = testExecutor.runTestNew(apiInfoKey, testMessages.getTestingRunId(), instance.getTestingUtil(), testMessages.getTestingRunResultSummaryId(),testConfig , instance.getTestingRunConfig(), instance.isDebug(), testMessages.getTestLogs(), sample);
testExecutor.insertResultsAndMakeIssues(Arrays.asList(runResult), testMessages.getTestingRunResultSummaryId());
return null;
}
}
public void init(){
initializeConsumer();
String topicName = Constants.TEST_RESULTS_TOPIC_NAME;

final Thread mainThread = Thread.currentThread();
final AtomicBoolean exceptionOnCommitSync = new AtomicBoolean(false);

List<Future<Void>> futures = new ArrayList<>();

Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.wakeup();
try {
if (!exceptionOnCommitSync.get()) {
mainThread.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Error e){
loggerMaker.errorAndAddToDb("Error in main thread: "+ e.getMessage(), LogDb.TESTING);
}
}
});

long lastSyncOffset = 0;
try {
consumer.subscribe(Arrays.asList(topicName));
loggerMaker.infoAndAddToDb("Consumer subscribed", LogDb.RUNTIME);

// poll for new data here

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
try {
consumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
}
for (ConsumerRecord<String, String> record : records) {
lastSyncOffset++;
loggerMaker.infoAndAddToDb("Reading message in testing consumer: " + lastSyncOffset, LogDb.TESTING);
if (lastSyncOffset % 100 == 0) {
logger.info("Committing offset at position: " + lastSyncOffset);
}

Future<Void> future = threadPool.submit(() -> runTestFromMessage(record.value()));
futures.add(future);
}

}

} catch (WakeupException ignored) {
// nothing to catch. This exception is called from the shutdown hook.
} catch (Exception e) {
exceptionOnCommitSync.set(true);
Utils.printL(e);
loggerMaker.errorAndAddToDb("Error in main testing consumer: " + e.getMessage(),LogDb.TESTING);
e.printStackTrace();
System.exit(0);
} finally {
consumer.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
package com.akto.testing;
package com.akto.testing.testing_with_kafka;

import java.util.List;

import org.bson.types.ObjectId;

import com.akto.dto.billing.SyncLimit;
import com.akto.dto.testing.TestingRun;
import com.akto.dto.testing.info.TestMessages;
import com.akto.kafka.Kafka;
import com.akto.testing.TestExecutor;
import com.akto.util.Constants;

public class TestingProducer {

private final Kafka producer = new Kafka(Constants.LOCAL_KAFKA_BROKER_URL, 500, 1000);
public static final Kafka producer = new Kafka(Constants.LOCAL_KAFKA_BROKER_URL, 500, 1000);

public static void performActionAndPushDataToKafka(){

public static Void pushMessagesToKafka(List<TestMessages> messages){
for(TestMessages testMessages: messages){
String messageString = testMessages.toString();
producer.send(messageString, Constants.TEST_RESULTS_TOPIC_NAME);
}
return null;
}

private boolean shouldClearKafkaRecords(TestingRun testingRun, ObjectId summaryId){
Expand All @@ -29,5 +37,4 @@ public void initProducer(TestingRun testingRun, ObjectId summaryId, SyncLimit sy
boolean doInitOnly = !isKafkaEmpty();
executor.init(testingRun, summaryId, syncLimit, doInitOnly);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public WorkflowTestResult.NodeResult processYamlNode(Node node, Map<String, Obje
List<String> samples = testingUtil.getSampleMessages().get(infoKey);
TestingRunResult testingRunResult = Utils.generateFailedRunResultForMessage(null, infoKey, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory(), null,samples , null);
if(testingRunResult == null){
testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, null, debug, testLogs, samples.get(samples.size() - 1), Context.accountId.get());
testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, null, debug, testLogs, samples.get(samples.size() - 1));
}

List<String> errors = new ArrayList<>();
Expand Down
Loading

0 comments on commit 0225556

Please sign in to comment.