diff --git a/e2e/test/cases/kafka/kafka-spec.js b/e2e/test/cases/kafka/kafka-spec.js index b813feac9d7..7ffb7b715f7 100644 --- a/e2e/test/cases/kafka/kafka-spec.js +++ b/e2e/test/cases/kafka/kafka-spec.js @@ -1,5 +1,6 @@ 'use strict'; +const uuidv4 = require('uuid/v4'); const signale = require('signale'); const misc = require('../../misc'); const wait = require('../../wait'); @@ -13,20 +14,33 @@ describe('Kafka Tests', () => { const teraslice = misc.teraslice(); it('should be able to read and write from kafka', async () => { - const sender = await teraslice.jobs.submit(misc.newJob('kafka-sender')); - const reader = await teraslice.jobs.submit(misc.newJob('kafka-reader', true)); + const topic = uuidv4(); + const groupId = uuidv4(); - await waitForJobStatus(sender, 'completed'); + const senderSpec = misc.newJob('kafka-sender'); + const readerSpec = misc.newJob('kafka-reader'); - await reader.start(); - await waitForIndexCount('kafka-logs-10', 10); + senderSpec.operations[1].topic = topic; + + readerSpec.operations[0].topic = topic; + readerSpec.operations[0].group = groupId; + const { index } = readerSpec.operations[1]; + + const sender = await teraslice.jobs.submit(senderSpec); + + const [reader] = await Promise.all([ + teraslice.jobs.submit(readerSpec), + waitForJobStatus(sender, 'completed'), + ]); + + await waitForIndexCount(index, 10); await reader.stop(); await waitForJobStatus(reader, 'stopped'); let count = 0; try { - ({ count } = await misc.indexStats('kafka-logs-10')); + ({ count } = await misc.indexStats(index)); } catch (err) { signale.error(err); } diff --git a/e2e/test/fixtures/jobs/kafka-reader.json b/e2e/test/fixtures/jobs/kafka-reader.json index 5612926b1f4..91b4468694c 100644 --- a/e2e/test/fixtures/jobs/kafka-reader.json +++ b/e2e/test/fixtures/jobs/kafka-reader.json @@ -12,7 +12,6 @@ "group": "example-kafka-group", "size": 10, "wait": 5000, - "offset_reset": "beginning", "_encoding": "json" }, { diff --git a/e2e/test/fixtures/jobs/kafka-sender.json b/e2e/test/fixtures/jobs/kafka-sender.json index 87e2ce45b1e..212894e8700 100644 --- a/e2e/test/fixtures/jobs/kafka-sender.json +++ b/e2e/test/fixtures/jobs/kafka-sender.json @@ -17,6 +17,7 @@ "_op": "teraslice_kafka_sender", "connection": "default", "topic": "example-logs-10", + "size": 10, "timestamp_field": "created", "_encoding": "json" } diff --git a/e2e/test/wait.js b/e2e/test/wait.js index ba3617c27b8..ea87d4357c5 100644 --- a/e2e/test/wait.js +++ b/e2e/test/wait.js @@ -161,10 +161,9 @@ function waitForJobStatus(job, status) { }); } -async function waitForIndexCount(index, expected, remainingMs = 15 * 1000) { +async function waitForIndexCount(index, expected, remainingMs = 30 * 1000) { if (remainingMs <= 0) { - expect().fail(`Timeout waiting for ${index} to have count of ${expected}`); - return 0; + throw new Error(`Timeout waiting for ${index} to have count of ${expected}`); } const start = Date.now(); @@ -181,7 +180,7 @@ async function waitForIndexCount(index, expected, remainingMs = 15 * 1000) { await Promise.delay(100); const elapsed = Date.now() - start; - return waitForIndexCount(expected, remainingMs - elapsed); + return waitForIndexCount(index, expected, remainingMs - elapsed); } module.exports = {