Skip to content

Commit

Permalink
Fixes to kafka tests
Browse files Browse the repository at this point in the history
  • Loading branch information
peterdemartini committed Nov 8, 2018
1 parent c8a2f82 commit ec5b15a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
26 changes: 20 additions & 6 deletions e2e/test/cases/kafka/kafka-spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const uuidv4 = require('uuid/v4');
const signale = require('signale');
const misc = require('../../misc');
const wait = require('../../wait');
Expand All @@ -13,20 +14,33 @@ describe('Kafka Tests', () => {
const teraslice = misc.teraslice();

it('should be able to read and write from kafka', async () => {
const sender = await teraslice.jobs.submit(misc.newJob('kafka-sender'));
const reader = await teraslice.jobs.submit(misc.newJob('kafka-reader', true));
const topic = uuidv4();
const groupId = uuidv4();

await waitForJobStatus(sender, 'completed');
const senderSpec = misc.newJob('kafka-sender');
const readerSpec = misc.newJob('kafka-reader');

await reader.start();
await waitForIndexCount('kafka-logs-10', 10);
senderSpec.operations[1].topic = topic;

readerSpec.operations[0].topic = topic;
readerSpec.operations[0].group = groupId;
const { index } = readerSpec.operations[1];

const sender = await teraslice.jobs.submit(senderSpec);

const [reader] = await Promise.all([
teraslice.jobs.submit(readerSpec),
waitForJobStatus(sender, 'completed'),
]);

await waitForIndexCount(index, 10);
await reader.stop();

await waitForJobStatus(reader, 'stopped');

let count = 0;
try {
({ count } = await misc.indexStats('kafka-logs-10'));
({ count } = await misc.indexStats(index));
} catch (err) {
signale.error(err);
}
Expand Down
1 change: 0 additions & 1 deletion e2e/test/fixtures/jobs/kafka-reader.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"group": "example-kafka-group",
"size": 10,
"wait": 5000,
"offset_reset": "beginning",
"_encoding": "json"
},
{
Expand Down
1 change: 1 addition & 0 deletions e2e/test/fixtures/jobs/kafka-sender.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"_op": "teraslice_kafka_sender",
"connection": "default",
"topic": "example-logs-10",
"size": 10,
"timestamp_field": "created",
"_encoding": "json"
}
Expand Down
7 changes: 3 additions & 4 deletions e2e/test/wait.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,9 @@ function waitForJobStatus(job, status) {
});
}

async function waitForIndexCount(index, expected, remainingMs = 15 * 1000) {
async function waitForIndexCount(index, expected, remainingMs = 30 * 1000) {
if (remainingMs <= 0) {
expect().fail(`Timeout waiting for ${index} to have count of ${expected}`);
return 0;
throw new Error(`Timeout waiting for ${index} to have count of ${expected}`);
}

const start = Date.now();
Expand All @@ -181,7 +180,7 @@ async function waitForIndexCount(index, expected, remainingMs = 15 * 1000) {

await Promise.delay(100);
const elapsed = Date.now() - start;
return waitForIndexCount(expected, remainingMs - elapsed);
return waitForIndexCount(index, expected, remainingMs - elapsed);
}

module.exports = {
Expand Down

0 comments on commit ec5b15a

Please sign in to comment.