Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes in test cases. #1461

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
75891ca
- Fix issue with registering SparkSubmit processes in case of multipl…
Aug 13, 2019
ca22e50
- Closing the snappy connection in case left open.
Aug 14, 2019
60a85d1
- Adding a flag to enable or disable the DEBUG logging for spark appl…
Oct 23, 2019
2571f8a
Merge branch 'master' into SNAP-3138
Oct 23, 2019
0d8602a
Merge branch 'master' into SNAP-3138
sonalsagarwal Nov 11, 2019
09c490c
- Changes to have lead config restored before stopping the cluster.
sonalsagarwal Nov 16, 2019
43beb7a
Merge branch 'master' into SNAP-3138
sonalsagarwal Nov 16, 2019
4242e05
- fixing few issues in test
sonalsagarwal Nov 19, 2019
3239209
Merge branch 'master' into SNAP-3138
sonalsagarwal Nov 24, 2019
64826e3
- Test fixes
sonalsagarwal Nov 27, 2019
60158c0
Merge branch 'SNAP-3138' of https://github.com/SnappyDataInc/snappyda…
sonalsagarwal Nov 27, 2019
6c755e4
Merge branch 'master' into SNAP-3138
sonalsagarwal Nov 27, 2019
39b3fd2
Merge branch 'master' into SNAP-3138
sonalsagarwal Dec 4, 2019
6682e3d
-test fixes
sonalsagarwal Dec 5, 2019
41a0ebf
Merge branch 'master' into SNAP-3138
sonalsagarwal Dec 6, 2019
f6369bc
- Handle "Job ID not found" error in test and sleep for sometime befo…
sonalsagarwal Dec 6, 2019
6a617dc
Merge branch 'master' into SNAP-3138
sonalsagarwal Dec 7, 2019
e9b0d83
Merge branch 'master' into SNAP-3138
sonalsagarwal Dec 8, 2019
19c458d
Merge branch 'master' into SNAP-3138
sonalsagarwal Dec 16, 2019
dc0bb8f
Merge branch 'master' into SNAP-3138
sonalsagarwal Jan 5, 2020
5f7a136
Merge branch 'master' into SNAP-3138
sonalsagarwal Jan 6, 2020
76347cd
- Minor test fixes.
sonalsagarwal Jan 20, 2020
27fd3d9
Merge branch 'master' into SNAP-3138
sonalsagarwal Jan 20, 2020
6268f36
Merge branch 'master' into SNAP-3138
sonalsagarwal Feb 11, 2020
34dc282
Merge branch 'master' into SNAP-3138
sonalsagarwal Mar 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ protected void writeSnappyPocToSparkEnv() {
/**
* Start kafka zookeeper.
*/

public static synchronized void HydraTask_StartKafkaZookeeper() {
snappyAdAnalyticsTest.startZookeeper();
}
Expand Down Expand Up @@ -222,14 +223,15 @@ public static synchronized void HydraTask_StartKafkaBrokers() {
}

protected void startKafkaBroker() {
String command = "";

int numServers = (int)SnappyBB.getBB().getSharedCounters().read(SnappyBB.numServers);
Log.getLogWriter().info("Test will start " + numServers + " kafka brokers.");
String script = snappyTest.getScriptLocation(kafkaDir + sep + "bin/kafka-server-start.sh");
String orgPropFilePath = snappyTest.getScriptLocation(kafkaDir + sep + "config/server.properties");
File orgPropFile = new File(orgPropFilePath);
try {
for (int i = 1; i <= numServers; i++) {
String command = "";
String broker = "broker" + i;
String brokerLogDirPath = kafkaLogDir + sep + broker;
new File(brokerLogDirPath).mkdir();
Expand Down Expand Up @@ -337,6 +339,7 @@ protected void executeSnappyStreamingJob(Vector jobClassNames, String logFileNam
ProcessBuilder pb = null;
File log = null;
File logFile = null;

String userJarPath = getUserAppJarLocation(SnappyPrms.getUserAppJar(),jarPath);
verifyDataForJobExecution(jobClassNames, userJarPath);
leadHost = getLeadHost();
Expand Down Expand Up @@ -373,12 +376,12 @@ protected void executeSnappyStreamingJob(Vector jobClassNames, String logFileNam
throw new TestException("Failed to start the streaming job. Please check the logs.");
} else {
Log.getLogWriter().info("JobID is : " + jobID);
SnappyBB.getBB().getSharedMap().put(appName, jobID);
for (int j = 0; j < 3; j++) {
if (!getJobStatus(jobID)) {
throw new TestException("Got Exception while executing streaming job. Please check " +
"the job status output.");
}
SnappyBB.getBB().getSharedMap().put(appName, jobID);
}
}
}
Expand Down Expand Up @@ -417,7 +420,7 @@ public void verifyResults(){
String aggType = SnappySchemaPrms.getAggregateType();
switch (aggType.toUpperCase()) {
case "JOIN":
query = "select tp.*,pd.language from temp_person tp, person_details pd where tp.id=pd.id";
query = "select tp.*, pd.languages from temp_persoon tp, persoon_details pd where tp.id=pd.id";
break;
case "AVG":
query = "select id, avg(age) as avg_age, avg(numChild) as avg_numchild from temp_persoon group by id";
Expand Down Expand Up @@ -447,7 +450,7 @@ public void verifyResults(){
String errMsg = testInstance.compareFiles(queryResultDirPath, streamTableFile, tmpTableFile,
false, "streaming");
if(errMsg.length()> 0 ){
throw new TestException("Got exception while validating results");
throw new TestException("Got exception while validating results : " + errMsg);
}
} catch (SQLException se) {
Log.getLogWriter().info("Got exception while verifying results");
Expand Down Expand Up @@ -784,6 +787,7 @@ protected void stopSnappyStreamingJob() {
File logFile = null;
leadHost = getLeadHost();
String appName = SnappyPrms.getUserAppName();
Log.getLogWriter().info("User App Name is : " + appName);
String leadPort = (String)SnappyBB.getBB().getSharedMap().get("primaryLeadPort");
String jobID = (String) SnappyBB.getBB().getSharedMap().get(appName);
String snappyCmd = snappyJobScript + " stop --job-id " + jobID + " --lead " + leadHost + ":" + leadPort;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,19 @@ public static void HydraTask_performRebalance() {
}

public void performRebalance() {
Connection conn = null;
try {
Connection conn = SnappyTest.getLocatorConnection();
conn = SnappyTest.getLocatorConnection();
Long startTime = System.currentTimeMillis();
conn.createStatement().execute("call sys.rebalance_all_buckets();");
Long endTime = System.currentTimeMillis();
Long totalTime = endTime - startTime;
Log.getLogWriter().info("The rebalance procedure took " + totalTime + " ms");
} catch (SQLException ex) {
throw new util.TestException("Caught exception in performRebalance() " + ex.getMessage());
} finally {
if(conn!=null)
closeConnection(conn);
}
}

Expand Down
10 changes: 10 additions & 0 deletions dtests/src/test/java/io/snappydata/hydra/cluster/SnappyPrms.java
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,11 @@ public class SnappyPrms extends BasePrms {
*/
public static Long numRowsList;

/**
*(boolean) Property to enable debugging at spark level.
*/
public static Long enableSparkDebug;

/**
* (int)Number of stack dumps to be taken for each thread of locators, servers, leads
*/
Expand Down Expand Up @@ -999,6 +1004,11 @@ public static String getLogFileName() {
return tasktab().stringAt(key, null);
}

public static boolean isSparkDebugEnabled(){
Long key = enableSparkDebug;
return tasktab().booleanAt(key, false);
}

public static Vector getSQLScriptNames() {
Long key = sqlScriptNames;
return BasePrms.tasktab().vecAt(key, BasePrms.tab().vecAt(key, null));
Expand Down
79 changes: 50 additions & 29 deletions dtests/src/test/java/io/snappydata/hydra/cluster/SnappyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ public static synchronized void backUpServerConfigData() {
* the test for stopping all locators which have been started in the test.
**/
public static synchronized void restoreLocatorConfigData() {
if(snappyTest == null) snappyTest = new SnappyTest();
snappyTest.restoreConfigData("locators");
}

Expand All @@ -904,6 +905,7 @@ public static synchronized void restoreLocatorConfigData() {
* which have been started in the test.
**/
public static synchronized void restoreServerConfigData() {
if(snappyTest == null) snappyTest = new SnappyTest();
snappyTest.restoreConfigData("servers");
}

Expand All @@ -917,6 +919,7 @@ public static synchronized void restoreServerConfigData() {
* which have been started in the test.
**/
public static synchronized void restoreLeadConfigData() {
if(snappyTest == null) snappyTest = new SnappyTest();
snappyTest.restoreConfigData("leads");
}

Expand Down Expand Up @@ -1850,20 +1853,7 @@ public void executeProcess(ProcessBuilder pb, File logFile) {
}

public synchronized void recordSnappyProcessIDinNukeRun(String pName) {
Process pr = null;
try {
File log = new File(".");
String dest = log.getCanonicalPath() + File.separator + "PIDs_" + pName + "_" + HostHelper
.getLocalHost() +
".log";
File logFile = new File(dest);
if (!logFile.exists()) {
recordProcessIds(pName, HostHelper.getLocalHost(), logFile);
}
} catch (IOException e) {
String s = "Problem while starting the process : " + pr;
throw new TestException(s, e);
}
recordSnappyProcessIDinNukeRun(pName, HostHelper.getLocalHost());
}

protected synchronized void recordSnappyProcessIDinNukeRun(String pName, String hostName) {
Expand All @@ -1874,19 +1864,11 @@ protected synchronized void recordSnappyProcessIDinNukeRun(String pName, String
String dest = log.getCanonicalPath() + File.separator + "PIDs_" + pName + "_" + hostName + ".log";
File logFile = new File(dest);
if (!logFile.exists()) {
String command;
command = "ssh -n -x -o PasswordAuthentication=no -o StrictHostKeyChecking=no " + hostName;
pb = new ProcessBuilder("/bin/bash", "-c", command);
pr = pb.start();
pr.waitFor();
recordProcessIds(pName, hostName, logFile);
}
} catch (IOException e) {
String s = "Problem while starting the process : " + pr;
throw new TestException(s, e);
} catch (InterruptedException e) {
String s = "Exception occurred while waiting for the process execution : " + pr;
throw new TestException(s, e);
}
}

Expand All @@ -1895,9 +1877,10 @@ protected static void recordProcessIds(String pName, String hostName, File logFi
ProcessBuilder pb;
Process pr = null;
try {
command = "ssh -n -x -o PasswordAuthentication=no -o StrictHostKeyChecking=no " + hostName;
if (pName.equals("Master"))
command = "ps ax | grep -w " + pName + " | grep -v grep | awk '{print $1}'";
else command = "jps | grep " + pName + " | awk '{print $1}'";
else command += " jps | grep " + pName + " | awk '{print $1}'";
hd = TestConfig.getInstance().getMasterDescription()
.getVmDescription().getHostDescription();
pb = new ProcessBuilder("/bin/bash", "-c", command);
Expand Down Expand Up @@ -2314,13 +2297,37 @@ public String setCDCSparkAppCmds(String userAppArgs, String commonArgs, String s
return command;
}

public String getLog4jFile(String dest) {
String log4jFileName = "";
try {
log4jFileName = getCurrentDirPath() + File.separator + "log4j_" + getMyTid() + ".properties";
Log.getLogWriter().info("log4j path :" + log4jFileName);
File log4jFile = new File(log4jFileName);
FileWriter fw = new FileWriter(log4jFile);
fw.write("log4j.rootCategory=DEBUG, file\n");
fw.write("log4j.appender.file=org.apache.log4j.RollingFileAppender\n");
fw.write("log4j.appender.file.append=true\n");
fw.write("log4j.appender.file.file=" + dest + "\n");
fw.write("log4j.appender.file.MaxFileSize=1GB\n");
fw.write("log4j.appender.file.MaxBackupIndex=10000\n");
fw.write("log4j.appender.file.layout=io.snappydata.log4j.PatternLayout\n");
fw.write("log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS zzz} %t %p %c{1}: %m%n\n");
fw.close();
} catch (IOException e) {
throw new TestException("IOException occurred while retriving log4j file at path " +
log4jFileName + "\nError Message:" + e.getMessage());
}
return log4jFileName;
}

public void executeSparkJob(Vector jobClassNames, String logFileName) {
String snappyJobScript = getScriptLocation("spark-submit");
boolean isCDCStream = SnappyCDCPrms.getIsCDCStream();
ProcessBuilder pb = null;
File log = null, logFile = null;
userAppJar = SnappyPrms.getUserAppJar();
snappyTest.verifyDataForJobExecution(jobClassNames, userAppJar);
boolean isSparkDebug = SnappyPrms.isSparkDebugEnabled();
try {
for (int i = 0; i < jobClassNames.size(); i++) {
String userJob = (String) jobClassNames.elementAt(i);
Expand All @@ -2342,13 +2349,16 @@ public void executeSparkJob(Vector jobClassNames, String logFileName) {
throw new TestException("dml props for thread " + getMyTid() + " is null)");
userAppArgs = userAppArgs + " " + dmlProps;
}
String log4jFile = (isSparkDebug)?getLog4jFile(dest):"";
if (SnappyCDCPrms.getIsCDC()) {
command = setCDCSparkAppCmds(userAppArgs, commonArgs, snappyJobScript, userJob, masterHost, masterPort, logFile);
} else {
command = snappyJobScript + " --class " + userJob +
" --master spark://" + masterHost + ":" + masterPort + " " +
SnappyPrms.getExecutorMemory() + " " +
SnappyPrms.getSparkSubmitExtraPrms() + " " + commonArgs + " " + snappyTest.getUserAppJarLocation(userAppJar, jarPath) + " " +
SnappyPrms.getSparkSubmitExtraPrms() + " " + commonArgs + " " +
((isSparkDebug)?(" --driver-java-options -Dlog4j.configuration=file://" + log4jFile):"") + " " +
snappyTest.getUserAppJarLocation(userAppJar, jarPath) + " " +
userAppArgs + " " + primaryLocatorHost + ":" + primaryLocatorPort;
}
Log.getLogWriter().info("spark-submit command is : " + command);
Expand Down Expand Up @@ -2531,11 +2541,16 @@ public boolean getSnappyJobsStatus(String snappyJobScript, File logFile, String
FileInputStream fis = new FileInputStream(commandOutput);
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
line = null;
String searchString = "Connection reset by peer";
String connResetErr = "Connection reset by peer";
String askTimedOutErr = "Ask timed out on ";
String noSuchJobID ="No such job ID";
while ((line = br.readLine()) != null && !found) {
if (line.toLowerCase().contains(searchString.toLowerCase())) {
if (line.toLowerCase().contains(connResetErr.toLowerCase()) || line.toLowerCase().contains(askTimedOutErr.toLowerCase())) {
found = true;
Log.getLogWriter().info("Connection reset by peer...");
Log.getLogWriter().info("Job submission failed with : " + line);
} else if(line.toLowerCase().contains(noSuchJobID.toLowerCase())){
Log.getLogWriter().info("Job ID " + str + " not found. Sleeping before proceeding test ... ");
sleepForMs(30);
}
}
br.close();
Expand Down Expand Up @@ -2848,6 +2863,7 @@ public static synchronized void HydraTask_stopSparkCluster() {
*/
public static synchronized void HydraTask_stopSnappyLeader() {
File log = null;
if(snappyTest==null) snappyTest = new SnappyTest();
try {
ProcessBuilder pb = new ProcessBuilder(snappyTest.getScriptLocation("snappy-leads.sh"),
"stop");
Expand All @@ -2870,6 +2886,7 @@ public static synchronized void HydraTask_stopSnappyLeader() {
*/
public static synchronized void HydraTask_stopSnappyServers() {
File log = null;
if(snappyTest==null) snappyTest = new SnappyTest();
try {
ProcessBuilder pb = new ProcessBuilder(snappyTest.getScriptLocation("snappy-servers.sh"), "stop");
log = new File(".");
Expand All @@ -2891,6 +2908,7 @@ public static synchronized void HydraTask_stopSnappyServers() {
*/
public static synchronized void HydraTask_stopSnappyLocator() {
File log = null;
if(snappyTest==null) snappyTest = new SnappyTest();
try {
ProcessBuilder pb = new ProcessBuilder(snappyTest.getScriptLocation("snappy-locators.sh")
, "stop");
Expand Down Expand Up @@ -3610,13 +3628,16 @@ protected static void getLeadVM(ClientVmInfo info, ArrayList vmList, ArrayList s
if (!isPrimaryLeadUp) {
for (int i = 0; i < 3; i++) {
if (isPrimaryLeadUp) break;
sleepForMs(5);
isPrimaryLeadUp = isPrimaryLeadUpAndRunning();
}
}
if (cycleVms && !isPrimaryLeadUp) {
do {
for (int i = 0; i < 3; i++) {
if (isPrimaryLeadUp) break;
sleepForMs(5);
isPrimaryLeadUp = isPrimaryLeadUpAndRunning();
} while (isPrimaryLeadUp);
}
}
if (!isPrimaryLeadUp) {
throw new TestException("Primary lead node is not up and running in the cluster.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ CLOSETASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = H
io.snappydata.hydra.cluster.SnappyPrms-userAppJar = snappydata-store-scala-tests*tests.jar
threadGroups = snappyStoreThreads, snappyThreads;

CLOSETASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = HydraTask_stopSnappy
threadGroups = snappyThreads;

CLOSETASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreLeadConfigData
threadGroups = snappyThreads;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ TASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = H
maxTimesToRun = 3
;

CLOSETASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreLeadConfigData
threadGroups = snappyThreads;
ENDTASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreLeadConfigData
clientNames = locator1;

io.snappydata.hydra.testDMLOps.SnappySchemaPrms-isHATest = true;
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ TASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = Hy
maxTimesToRun = 3
;

CLOSETASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreServerConfigData
threadGroups = snappyThreads;
ENDTASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreServerConfigData
clientNames = locator1;

io.snappydata.hydra.testDMLOps.SnappySchemaPrms-isHATest = true;
9 changes: 2 additions & 7 deletions dtests/src/test/java/io/snappydata/hydra/misc/misc.bt
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ io/snappydata/hydra/testDMLOps/testDMLOpsCatalogConsistencyWithDDLs.conf
D=worker workerHosts=2 workerVMsPerHost=1 workerThreadsPerVM=1
persistenceMode="sync" redundantCopies=1


io/snappydata/hydra/streaming_sink/smartConnector/streaming_sinkAppCatalogConsistencyWithDDLs.conf
A=snappyStore snappyStoreHosts=2 snappyStoreVMsPerHost=1 snappyStoreThreadsPerVM=2
B=lead leadHosts=1 leadVMsPerHost=1 leadThreadsPerVM=2
C=locator locatorHosts=1 locatorVMsPerHost=1 locatorThreadsPerVM=1
D=worker workerHosts=2 workerVMsPerHost=1 workerThreadsPerVM=1
persistenceMode="sync" redundantCopies=1

kafkaDir="/export/shared/QA_DATA/kafka_2.11-0.10.2.2"
opType=4 isRowTable=false isPartitioned=true withKeyColumn=true

io/snappydata/hydra/testDMLOps/testDMLOpsCatalogConsistencyWithRebalance.conf
A=snappyStore snappyStoreHosts=2 snappyStoreVMsPerHost=1 snappyStoreThreadsPerVM=2
Expand All @@ -55,13 +54,9 @@ io/snappydata/hydra/testDMLOps/testDMLOpsCatalogConsistencyWithRebalance.conf
D=worker workerHosts=2 workerVMsPerHost=1 workerThreadsPerVM=1
persistenceMode="sync" redundantCopies=1


io/snappydata/hydra/testDMLOps/testDMLOpsCatalogConsistencyWithAddNewNodeRebalance.conf
A=snappyStore snappyStoreHosts=2 snappyStoreVMsPerHost=1 snappyStoreThreadsPerVM=2
B=lead leadHosts=1 leadVMsPerHost=1 leadThreadsPerVM=2
C=locator locatorHosts=1 locatorVMsPerHost=1 locatorThreadsPerVM=1
D=worker workerHosts=2 workerVMsPerHost=1 workerThreadsPerVM=1
persistenceMode="sync" redundantCopies=1
logPath=
newNode=
snappyPath=
4 changes: 2 additions & 2 deletions dtests/src/test/java/io/snappydata/hydra/northwind/leadHA.inc
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ TASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = Hy
maxTimesToRun = 3
;

CLOSETASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreLeadConfigData
threadGroups = snappyThreads;
ENDTASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreLeadConfigData
clientNames = locator1;
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ TASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = Hy
maxTimesToRun = 3
;

CLOSETASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreLocatorConfigData
threadGroups = snappyThreads;
ENDTASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = restoreLocatorConfigData
clientNames = locator1;

CLOSETASK taskClass = io.snappydata.hydra.cluster.SnappyTest taskMethod = HydraTask_executeSnappyJob
io.snappydata.hydra.cluster.SnappyPrms-jobClassNames = io.snappydata.hydra.northwind.ValidateNWQueriesJob
Expand Down
Loading