Skip to content

Commit

Permalink
Merge pull request #889 from terascope/data-entity-conversion
Browse files Browse the repository at this point in the history
Add kafka to e2e tests and a few fixes
  • Loading branch information
peterdemartini authored Nov 8, 2018
2 parents f2990d2 + ec5b15a commit 4cdc50d
Show file tree
Hide file tree
Showing 32 changed files with 284 additions and 47 deletions.
7 changes: 7 additions & 0 deletions e2e/config/teraslice-master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ terafoundation:
default:
host:
- "elasticsearch:49200"
# ***********************
# Kafka Configuration
# ***********************
kafka:
default:
brokers:
- "kafka:49092"

teraslice:
worker_disconnect_timeout: 120000
Expand Down
8 changes: 7 additions & 1 deletion e2e/config/teraslice-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ terafoundation:
default:
host:
- "elasticsearch:49200"

# ***********************
# Kafka Configuration
# ***********************
kafka:
default:
brokers:
- "kafka:49092"
teraslice:
worker_disconnect_timeout: 120000
node_disconnect_timeout: 120000
Expand Down
22 changes: 22 additions & 0 deletions e2e/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ services:
memlock:
soft: -1
hard: -1
kafka:
image: terascope/kafka-zookeeper:v1.0.0
ports:
- "42181:42181"
- "49092:49092"
networks:
- cluster
environment:
- "ADVERTISED_HOST=kafka"
- "ADVERTISED_PORT=49092"
- "ZOOKEEPER_PORT=42181"
volumes:
- kafka-data:/kafka
- zookeeper-data:/zookeeper
volumes:
teraslice-assets:
driver_opts:
Expand All @@ -72,5 +86,13 @@ volumes:
driver_opts:
type: tmpfs
device: tmpfs
kafka-data:
driver_opts:
type: tmpfs
device: tmpfs
zookeeper-data:
driver_opts:
type: tmpfs
device: tmpfs
networks:
cluster:
2 changes: 1 addition & 1 deletion e2e/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"devDependencies": {
"@terascope/docker-compose-js": "^1.0.0",
"@terascope/fetch-github-release": "^0.4.1",
"bluebird": "^3.5.2",
"bluebird": "^3.5.3",
"bunyan": "^1.8.12",
"elasticsearch": "^15.1.1",
"jest": "^23.6.0",
Expand Down
4 changes: 2 additions & 2 deletions e2e/test/cases/assets/simple-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('Asset Tests', () => {
.then((result) => {
// NOTE: In this case, the asset is referenced by the ID
// assigned by teraslice and not it's name.
jobSpec.assets = [JSON.parse(result)._id];
jobSpec.assets = [JSON.parse(result)._id, 'elasticsearch'];
return teraslice.jobs.submit(jobSpec)
.then(job => waitForJobStatus(job, 'running')
.then(() => wait.forWorkersJoined(job.id(), workers, 20))
Expand Down Expand Up @@ -128,7 +128,7 @@ describe('Asset Tests', () => {

it('can directly ask for the new asset to be used', async () => {
const jobSpec = misc.newJob('generator-asset');
jobSpec.assets = ['ex1:0.1.1'];
jobSpec.assets = ['ex1:0.1.1', 'elasticsearch'];
const { workers } = jobSpec;

const assetResponse = await teraslice.assets.get('ex1/0.1.1');
Expand Down
50 changes: 50 additions & 0 deletions e2e/test/cases/kafka/kafka-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict';

const uuidv4 = require('uuid/v4');
const signale = require('signale');
const misc = require('../../misc');
const wait = require('../../wait');
const { resetState } = require('../../helpers');

const { waitForJobStatus, waitForIndexCount } = wait;

describe('Kafka Tests', () => {
beforeAll(() => resetState());

const teraslice = misc.teraslice();

it('should be able to read and write from kafka', async () => {
const topic = uuidv4();
const groupId = uuidv4();

const senderSpec = misc.newJob('kafka-sender');
const readerSpec = misc.newJob('kafka-reader');

senderSpec.operations[1].topic = topic;

readerSpec.operations[0].topic = topic;
readerSpec.operations[0].group = groupId;
const { index } = readerSpec.operations[1];

const sender = await teraslice.jobs.submit(senderSpec);

const [reader] = await Promise.all([
teraslice.jobs.submit(readerSpec),
waitForJobStatus(sender, 'completed'),
]);

await waitForIndexCount(index, 10);
await reader.stop();

await waitForJobStatus(reader, 'stopped');

let count = 0;
try {
({ count } = await misc.indexStats(index));
} catch (err) {
signale.error(err);
}

expect(count).toBe(10);
});
});
3 changes: 2 additions & 1 deletion e2e/test/download-assets.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ async function downloadAssets() {
if (!shouldDownload) return;

const bundles = [
'elasticsearch-assets'
'elasticsearch-assets',
'kafka-assets'
];

const promises = bundles.map(async (repo) => {
Expand Down
2 changes: 1 addition & 1 deletion e2e/test/fixtures/jobs/generator-asset.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"slicers": 1,
"lifecycle": "persistent",
"workers": 3,
"assets": ["ex1"],
"assets": ["ex1", "elasticsearch"],
"analytics": false,
"operations": [
{
Expand Down
28 changes: 28 additions & 0 deletions e2e/test/fixtures/jobs/kafka-reader.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "Kafka Reader",
"lifecycle": "persistent",
"workers": 1,
"analytics": true,
"assets": ["kafka", "elasticsearch"],
"operations": [
{
"_op": "teraslice_kafka_reader",
"connection": "default",
"topic": "example-logs-10",
"group": "example-kafka-group",
"size": 10,
"wait": 5000,
"_encoding": "json"
},
{
"_op": "elasticsearch_index_selector",
"type": "events",
"index": "kafka-logs-10",
"preserve_id": true
},
{
"_op": "elasticsearch_bulk",
"size": 10
}
]
}
25 changes: 25 additions & 0 deletions e2e/test/fixtures/jobs/kafka-sender.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "Kafka Sender",
"lifecycle": "once",
"workers": 1,
"analytics": true,
"assets": ["kafka", "elasticsearch"],
"operations": [
{
"_op": "elasticsearch_reader",
"index": "example-logs-10",
"type": "events",
"size": 10,
"date_field_name": "created",
"preserve_id": true
},
{
"_op": "teraslice_kafka_sender",
"connection": "default",
"topic": "example-logs-10",
"size": 10,
"timestamp_field": "created",
"_encoding": "json"
}
]
}
1 change: 1 addition & 0 deletions e2e/test/global.setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ function generateTestData() {
name: `Generate: ${indexName}`,
lifecycle: 'once',
workers: 1,
assets: ['elasticsearch'],
operations: [
{
_op: 'elasticsearch_data_generator',
Expand Down
25 changes: 24 additions & 1 deletion e2e/test/wait.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ function forWorkersJoined(jobId, workerCount, iterations) {
});
}

function waitForClusterState(timeoutMs = 60000) {
function waitForClusterState(timeoutMs = 120000) {
const endAt = Date.now() + timeoutMs;
const { cluster } = misc.teraslice();
function _try() {
Expand Down Expand Up @@ -161,6 +161,28 @@ function waitForJobStatus(job, status) {
});
}

async function waitForIndexCount(index, expected, remainingMs = 30 * 1000) {
if (remainingMs <= 0) {
throw new Error(`Timeout waiting for ${index} to have count of ${expected}`);
}

const start = Date.now();
let count = 0;

try {
({ count } = await misc.indexStats(index));
if (count >= expected) {
return count;
}
} catch (err) {
// it probably okay
}

await Promise.delay(100);
const elapsed = Date.now() - start;
return waitForIndexCount(index, expected, remainingMs - elapsed);
}

module.exports = {
forValue,
forLength,
Expand All @@ -169,5 +191,6 @@ module.exports = {
scaleWorkersAndWait,
forWorkersJoined,
waitForJobStatus,
waitForIndexCount,
waitForClusterState
};
2 changes: 1 addition & 1 deletion packages/docker-compose-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"url": "https://github.com/terascope/teraslice/issues"
},
"dependencies": {
"bluebird": "^3.5.2",
"bluebird": "^3.5.3",
"debug": "^4.1.0"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/elasticsearch-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
"dependencies": {
"@terascope/error-parser": "^1.0.1",
"bluebird": "^3.5.2",
"bluebird": "^3.5.3",
"lodash": "^4.17.11",
"uuid": "^3.3.2"
},
Expand Down
4 changes: 3 additions & 1 deletion packages/job-components/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/job-components",
"version": "0.9.0",
"version": "0.9.1",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -41,6 +41,8 @@
"convict": "^4.4.0",
"datemath-parser": "^1.0.6",
"debug": "^4.1.0",
"is-plain-object": "^2.0.4",
"kind-of": "^6.0.2",
"lodash.clonedeep": "^4.5.0",
"uuid": "^3.3.2"
},
Expand Down
18 changes: 11 additions & 7 deletions packages/job-components/src/operations/data-entity.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { fastAssign, fastMap, isFunction, isPlainObject, parseJSON } from '../utils';
import kindOf from 'kind-of';
import { DataEncoding } from '../interfaces';

// WeakMaps are used as a memory efficient reference to private data
Expand All @@ -14,9 +15,6 @@ export default class DataEntity {
* This will detect if passed an already converted input and return it.
*/
static make(input: DataInput, metadata?: object): DataEntity {
if (DataEntity.isDataEntity(input)) {
return input;
}
return new DataEntity(input, metadata);
}

Expand Down Expand Up @@ -58,7 +56,9 @@ export default class DataEntity {
static isDataEntity(input: any): input is DataEntity {
if (input == null) return false;
if (input instanceof DataEntity) return true;
return isFunction(input.getMetadata) && isFunction(input.setMetadata);
return isFunction(input.getMetadata)
&& isFunction(input.setMetadata)
&& isFunction(input.toBuffer);
}

/**
Expand Down Expand Up @@ -88,12 +88,16 @@ export default class DataEntity {
[prop: string]: any;

constructor(data: object, metadata?: object) {
_metadata.set(this, fastAssign({ createdAt: Date.now() }, metadata));

if (data == null) return;

if (DataEntity.isDataEntity(data)) return data;

if (!isPlainObject(data)) {
throw new Error(`Invalid data source, must be an object, got ${typeof data}`);
throw new Error(`Invalid data source, must be an object, got "${kindOf(data)}"`);
}

_metadata.set(this, fastAssign({ createdAt: Date.now() }, metadata));

fastAssign(this, data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import ProcessorCore from '../core/processor-core';
import ConvictSchema from '../convict-schema';
import { ProcessorModule } from '../interfaces';
import { convertResult } from './shim-utils';
import { toString } from '../../utils';

export default function processorShim<S = any>(legacy: LegacyProcessor): ProcessorModule {
return {
Expand All @@ -17,8 +18,12 @@ export default function processorShim<S = any>(legacy: LegacyProcessor): Process
async handle(input: DataEntity[], sliceRequest: SliceRequest): Promise<DataEntity[]> {
if (this.processorFn != null) {
const result = await this.processorFn(input, this.logger, sliceRequest);
// @ts-ignore
return convertResult(result);
try {
// @ts-ignore
return convertResult(result);
} catch (err) {
throw new Error(`${this.opConfig._op} failed to convert result: ${toString(err)}`);
}
}

throw new Error('Processor has not been initialized');
Expand Down
10 changes: 7 additions & 3 deletions packages/job-components/src/operations/shims/reader-shim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import FetcherCore from '../core/fetcher-core';
import ParallelSlicer from '../parallel-slicer';
import ConvictSchema from '../convict-schema';
import { ReaderModule } from '../interfaces';
import { isInteger, isFunction } from '../../utils';
import { isInteger, isFunction, toString } from '../../utils';
import { convertResult } from './shim-utils';

export default function readerShim<S = any>(legacy: LegacyReader): ReaderModule {
Expand Down Expand Up @@ -64,8 +64,12 @@ export default function readerShim<S = any>(legacy: LegacyReader): ReaderModule
async handle(sliceRequest: SliceRequest): Promise<DataEntity[]> {
if (this.fetcherFn) {
const result = await this.fetcherFn(sliceRequest, this.logger);
// @ts-ignore
return convertResult(result);
try {
// @ts-ignore
return convertResult(result);
} catch (err) {
throw new Error(`${this.opConfig._op} failed to convert result: ${toString(err)}`);
}
}

throw new Error('Fetcher has not been initialized');
Expand Down
Loading

0 comments on commit 4cdc50d

Please sign in to comment.