diff --git a/e2e/config/teraslice-master.yaml b/e2e/config/teraslice-master.yaml index 0f2fe6e59b6..1b1fa61900e 100644 --- a/e2e/config/teraslice-master.yaml +++ b/e2e/config/teraslice-master.yaml @@ -9,6 +9,13 @@ terafoundation: default: host: - "elasticsearch:49200" + # *********************** + # Kafka Configuration + # *********************** + kafka: + default: + brokers: + - "kafka:49092" teraslice: worker_disconnect_timeout: 120000 diff --git a/e2e/config/teraslice-worker.yaml b/e2e/config/teraslice-worker.yaml index 28cf9b8514b..e2f6191d1df 100644 --- a/e2e/config/teraslice-worker.yaml +++ b/e2e/config/teraslice-worker.yaml @@ -9,7 +9,13 @@ terafoundation: default: host: - "elasticsearch:49200" - + # *********************** + # Kafka Configuration + # *********************** + kafka: + default: + brokers: + - "kafka:49092" teraslice: worker_disconnect_timeout: 120000 node_disconnect_timeout: 120000 diff --git a/e2e/docker-compose.yml b/e2e/docker-compose.yml index 1177a6dbbce..7400e4a3bba 100644 --- a/e2e/docker-compose.yml +++ b/e2e/docker-compose.yml @@ -63,6 +63,20 @@ services: memlock: soft: -1 hard: -1 + kafka: + image: terascope/kafka-zookeeper:v1.0.0 + ports: + - "42181:42181" + - "49092:49092" + networks: + - cluster + environment: + - "ADVERTISED_HOST=kafka" + - "ADVERTISED_PORT=49092" + - "ZOOKEEPER_PORT=42181" + volumes: + - kafka-data:/kafka + - zookeeper-data:/zookeeper volumes: teraslice-assets: driver_opts: @@ -72,5 +86,13 @@ volumes: driver_opts: type: tmpfs device: tmpfs + kafka-data: + driver_opts: + type: tmpfs + device: tmpfs + zookeeper-data: + driver_opts: + type: tmpfs + device: tmpfs networks: cluster: diff --git a/e2e/package.json b/e2e/package.json index ef0e24f6097..a7268fb4ed2 100644 --- a/e2e/package.json +++ b/e2e/package.json @@ -28,7 +28,7 @@ "devDependencies": { "@terascope/docker-compose-js": "^1.0.0", "@terascope/fetch-github-release": "^0.4.1", - "bluebird": "^3.5.2", + "bluebird": "^3.5.3", "bunyan": "^1.8.12", "elasticsearch": "^15.1.1", "jest": "^23.6.0", diff --git a/e2e/test/cases/assets/simple-spec.js b/e2e/test/cases/assets/simple-spec.js index 5fdd0a30207..8049f919661 100644 --- a/e2e/test/cases/assets/simple-spec.js +++ b/e2e/test/cases/assets/simple-spec.js @@ -32,7 +32,7 @@ describe('Asset Tests', () => { .then((result) => { // NOTE: In this case, the asset is referenced by the ID // assigned by teraslice and not it's name. - jobSpec.assets = [JSON.parse(result)._id]; + jobSpec.assets = [JSON.parse(result)._id, 'elasticsearch']; return teraslice.jobs.submit(jobSpec) .then(job => waitForJobStatus(job, 'running') .then(() => wait.forWorkersJoined(job.id(), workers, 20)) @@ -128,7 +128,7 @@ describe('Asset Tests', () => { it('can directly ask for the new asset to be used', async () => { const jobSpec = misc.newJob('generator-asset'); - jobSpec.assets = ['ex1:0.1.1']; + jobSpec.assets = ['ex1:0.1.1', 'elasticsearch']; const { workers } = jobSpec; const assetResponse = await teraslice.assets.get('ex1/0.1.1'); diff --git a/e2e/test/cases/kafka/kafka-spec.js b/e2e/test/cases/kafka/kafka-spec.js new file mode 100644 index 00000000000..7ffb7b715f7 --- /dev/null +++ b/e2e/test/cases/kafka/kafka-spec.js @@ -0,0 +1,50 @@ +'use strict'; + +const uuidv4 = require('uuid/v4'); +const signale = require('signale'); +const misc = require('../../misc'); +const wait = require('../../wait'); +const { resetState } = require('../../helpers'); + +const { waitForJobStatus, waitForIndexCount } = wait; + +describe('Kafka Tests', () => { + beforeAll(() => resetState()); + + const teraslice = misc.teraslice(); + + it('should be able to read and write from kafka', async () => { + const topic = uuidv4(); + const groupId = uuidv4(); + + const senderSpec = misc.newJob('kafka-sender'); + const readerSpec = misc.newJob('kafka-reader'); + + senderSpec.operations[1].topic = topic; + + readerSpec.operations[0].topic = topic; + readerSpec.operations[0].group = groupId; + const { index } = readerSpec.operations[1]; + + const sender = await teraslice.jobs.submit(senderSpec); + + const [reader] = await Promise.all([ + teraslice.jobs.submit(readerSpec), + waitForJobStatus(sender, 'completed'), + ]); + + await waitForIndexCount(index, 10); + await reader.stop(); + + await waitForJobStatus(reader, 'stopped'); + + let count = 0; + try { + ({ count } = await misc.indexStats(index)); + } catch (err) { + signale.error(err); + } + + expect(count).toBe(10); + }); +}); diff --git a/e2e/test/download-assets.js b/e2e/test/download-assets.js index d146501f181..5dd33e45ac6 100644 --- a/e2e/test/download-assets.js +++ b/e2e/test/download-assets.js @@ -52,7 +52,8 @@ async function downloadAssets() { if (!shouldDownload) return; const bundles = [ - 'elasticsearch-assets' + 'elasticsearch-assets', + 'kafka-assets' ]; const promises = bundles.map(async (repo) => { diff --git a/e2e/test/fixtures/jobs/generator-asset.json b/e2e/test/fixtures/jobs/generator-asset.json index 1990f45fa14..79cd726f016 100644 --- a/e2e/test/fixtures/jobs/generator-asset.json +++ b/e2e/test/fixtures/jobs/generator-asset.json @@ -3,7 +3,7 @@ "slicers": 1, "lifecycle": "persistent", "workers": 3, - "assets": ["ex1"], + "assets": ["ex1", "elasticsearch"], "analytics": false, "operations": [ { diff --git a/e2e/test/fixtures/jobs/kafka-reader.json b/e2e/test/fixtures/jobs/kafka-reader.json new file mode 100644 index 00000000000..91b4468694c --- /dev/null +++ b/e2e/test/fixtures/jobs/kafka-reader.json @@ -0,0 +1,28 @@ +{ + "name": "Kafka Reader", + "lifecycle": "persistent", + "workers": 1, + "analytics": true, + "assets": ["kafka", "elasticsearch"], + "operations": [ + { + "_op": "teraslice_kafka_reader", + "connection": "default", + "topic": "example-logs-10", + "group": "example-kafka-group", + "size": 10, + "wait": 5000, + "_encoding": "json" + }, + { + "_op": "elasticsearch_index_selector", + "type": "events", + "index": "kafka-logs-10", + "preserve_id": true + }, + { + "_op": "elasticsearch_bulk", + "size": 10 + } + ] +} diff --git a/e2e/test/fixtures/jobs/kafka-sender.json b/e2e/test/fixtures/jobs/kafka-sender.json new file mode 100644 index 00000000000..212894e8700 --- /dev/null +++ b/e2e/test/fixtures/jobs/kafka-sender.json @@ -0,0 +1,25 @@ +{ + "name": "Kafka Sender", + "lifecycle": "once", + "workers": 1, + "analytics": true, + "assets": ["kafka", "elasticsearch"], + "operations": [ + { + "_op": "elasticsearch_reader", + "index": "example-logs-10", + "type": "events", + "size": 10, + "date_field_name": "created", + "preserve_id": true + }, + { + "_op": "teraslice_kafka_sender", + "connection": "default", + "topic": "example-logs-10", + "size": 10, + "timestamp_field": "created", + "_encoding": "json" + } + ] +} diff --git a/e2e/test/global.setup.js b/e2e/test/global.setup.js index 208ca8930e0..05b57470441 100644 --- a/e2e/test/global.setup.js +++ b/e2e/test/global.setup.js @@ -149,6 +149,7 @@ function generateTestData() { name: `Generate: ${indexName}`, lifecycle: 'once', workers: 1, + assets: ['elasticsearch'], operations: [ { _op: 'elasticsearch_data_generator', diff --git a/e2e/test/wait.js b/e2e/test/wait.js index 3940690312f..ea87d4357c5 100644 --- a/e2e/test/wait.js +++ b/e2e/test/wait.js @@ -97,7 +97,7 @@ function forWorkersJoined(jobId, workerCount, iterations) { }); } -function waitForClusterState(timeoutMs = 60000) { +function waitForClusterState(timeoutMs = 120000) { const endAt = Date.now() + timeoutMs; const { cluster } = misc.teraslice(); function _try() { @@ -161,6 +161,28 @@ function waitForJobStatus(job, status) { }); } +async function waitForIndexCount(index, expected, remainingMs = 30 * 1000) { + if (remainingMs <= 0) { + throw new Error(`Timeout waiting for ${index} to have count of ${expected}`); + } + + const start = Date.now(); + let count = 0; + + try { + ({ count } = await misc.indexStats(index)); + if (count >= expected) { + return count; + } + } catch (err) { + // it probably okay + } + + await Promise.delay(100); + const elapsed = Date.now() - start; + return waitForIndexCount(index, expected, remainingMs - elapsed); +} + module.exports = { forValue, forLength, @@ -169,5 +191,6 @@ module.exports = { scaleWorkersAndWait, forWorkersJoined, waitForJobStatus, + waitForIndexCount, waitForClusterState }; diff --git a/packages/docker-compose-js/package.json b/packages/docker-compose-js/package.json index 4dc363aef63..442f9d88ac4 100644 --- a/packages/docker-compose-js/package.json +++ b/packages/docker-compose-js/package.json @@ -26,7 +26,7 @@ "url": "https://github.com/terascope/teraslice/issues" }, "dependencies": { - "bluebird": "^3.5.2", + "bluebird": "^3.5.3", "debug": "^4.1.0" }, "devDependencies": { diff --git a/packages/elasticsearch-api/package.json b/packages/elasticsearch-api/package.json index 048b235222a..a2e3d90b782 100644 --- a/packages/elasticsearch-api/package.json +++ b/packages/elasticsearch-api/package.json @@ -22,7 +22,7 @@ }, "dependencies": { "@terascope/error-parser": "^1.0.1", - "bluebird": "^3.5.2", + "bluebird": "^3.5.3", "lodash": "^4.17.11", "uuid": "^3.3.2" }, diff --git a/packages/job-components/package.json b/packages/job-components/package.json index 030510ba644..df02cbcdd88 100644 --- a/packages/job-components/package.json +++ b/packages/job-components/package.json @@ -1,6 +1,6 @@ { "name": "@terascope/job-components", - "version": "0.9.0", + "version": "0.9.1", "publishConfig": { "access": "public" }, @@ -41,6 +41,8 @@ "convict": "^4.4.0", "datemath-parser": "^1.0.6", "debug": "^4.1.0", + "is-plain-object": "^2.0.4", + "kind-of": "^6.0.2", "lodash.clonedeep": "^4.5.0", "uuid": "^3.3.2" }, diff --git a/packages/job-components/src/operations/data-entity.ts b/packages/job-components/src/operations/data-entity.ts index 317c3008843..025b5a41652 100644 --- a/packages/job-components/src/operations/data-entity.ts +++ b/packages/job-components/src/operations/data-entity.ts @@ -1,4 +1,5 @@ import { fastAssign, fastMap, isFunction, isPlainObject, parseJSON } from '../utils'; +import kindOf from 'kind-of'; import { DataEncoding } from '../interfaces'; // WeakMaps are used as a memory efficient reference to private data @@ -14,9 +15,6 @@ export default class DataEntity { * This will detect if passed an already converted input and return it. */ static make(input: DataInput, metadata?: object): DataEntity { - if (DataEntity.isDataEntity(input)) { - return input; - } return new DataEntity(input, metadata); } @@ -58,7 +56,9 @@ export default class DataEntity { static isDataEntity(input: any): input is DataEntity { if (input == null) return false; if (input instanceof DataEntity) return true; - return isFunction(input.getMetadata) && isFunction(input.setMetadata); + return isFunction(input.getMetadata) + && isFunction(input.setMetadata) + && isFunction(input.toBuffer); } /** @@ -88,12 +88,16 @@ export default class DataEntity { [prop: string]: any; constructor(data: object, metadata?: object) { + _metadata.set(this, fastAssign({ createdAt: Date.now() }, metadata)); + + if (data == null) return; + + if (DataEntity.isDataEntity(data)) return data; + if (!isPlainObject(data)) { - throw new Error(`Invalid data source, must be an object, got ${typeof data}`); + throw new Error(`Invalid data source, must be an object, got "${kindOf(data)}"`); } - _metadata.set(this, fastAssign({ createdAt: Date.now() }, metadata)); - fastAssign(this, data); } diff --git a/packages/job-components/src/operations/shims/processor-shim.ts b/packages/job-components/src/operations/shims/processor-shim.ts index 28296d8bfe8..f3ef29150d5 100644 --- a/packages/job-components/src/operations/shims/processor-shim.ts +++ b/packages/job-components/src/operations/shims/processor-shim.ts @@ -4,6 +4,7 @@ import ProcessorCore from '../core/processor-core'; import ConvictSchema from '../convict-schema'; import { ProcessorModule } from '../interfaces'; import { convertResult } from './shim-utils'; +import { toString } from '../../utils'; export default function processorShim(legacy: LegacyProcessor): ProcessorModule { return { @@ -17,8 +18,12 @@ export default function processorShim(legacy: LegacyProcessor): Process async handle(input: DataEntity[], sliceRequest: SliceRequest): Promise { if (this.processorFn != null) { const result = await this.processorFn(input, this.logger, sliceRequest); - // @ts-ignore - return convertResult(result); + try { + // @ts-ignore + return convertResult(result); + } catch (err) { + throw new Error(`${this.opConfig._op} failed to convert result: ${toString(err)}`); + } } throw new Error('Processor has not been initialized'); diff --git a/packages/job-components/src/operations/shims/reader-shim.ts b/packages/job-components/src/operations/shims/reader-shim.ts index ae34249a5ba..35a4b69cb35 100644 --- a/packages/job-components/src/operations/shims/reader-shim.ts +++ b/packages/job-components/src/operations/shims/reader-shim.ts @@ -4,7 +4,7 @@ import FetcherCore from '../core/fetcher-core'; import ParallelSlicer from '../parallel-slicer'; import ConvictSchema from '../convict-schema'; import { ReaderModule } from '../interfaces'; -import { isInteger, isFunction } from '../../utils'; +import { isInteger, isFunction, toString } from '../../utils'; import { convertResult } from './shim-utils'; export default function readerShim(legacy: LegacyReader): ReaderModule { @@ -64,8 +64,12 @@ export default function readerShim(legacy: LegacyReader): ReaderModule async handle(sliceRequest: SliceRequest): Promise { if (this.fetcherFn) { const result = await this.fetcherFn(sliceRequest, this.logger); - // @ts-ignore - return convertResult(result); + try { + // @ts-ignore + return convertResult(result); + } catch (err) { + throw new Error(`${this.opConfig._op} failed to convert result: ${toString(err)}`); + } } throw new Error('Fetcher has not been initialized'); diff --git a/packages/job-components/src/operations/shims/shim-utils.ts b/packages/job-components/src/operations/shims/shim-utils.ts index dd7d427a2f9..d55a46d67af 100644 --- a/packages/job-components/src/operations/shims/shim-utils.ts +++ b/packages/job-components/src/operations/shims/shim-utils.ts @@ -19,8 +19,8 @@ export function convertResult(input: DataInput[]|Buffer[]|string[]): DataEntity[ // @ts-ignore if (Array.isArray(first)) return input; - if (isPlainObject(first)) return DataEntity.makeArray(input); if (Buffer.isBuffer(first) || isString(first)) return deprecateType(input); + if (isPlainObject(first)) return DataEntity.makeArray(input); throw new Error('Invalid return type for processor'); } diff --git a/packages/job-components/src/utils.ts b/packages/job-components/src/utils.ts index 1cb877a62d7..8308e7d66d5 100644 --- a/packages/job-components/src/utils.ts +++ b/packages/job-components/src/utils.ts @@ -1,3 +1,6 @@ +import isPlainObject from 'is-plain-object'; +import kindOf from 'kind-of'; + /** A simplified implemation of lodash isString */ export function isString(val: any): val is string { return typeof val === 'string' ? true : false; @@ -17,7 +20,7 @@ export function toString(val: any): string { */ export function parseJSON(buf: Buffer|string): T { if (!Buffer.isBuffer(buf) && !isString(buf)) { - throw new TypeError(`Failure to serialize non-buffer, got "${typeof buf}"`); + throw new TypeError(`Failure to serialize non-buffer, got "${kindOf(buf)}"`); } try { @@ -34,14 +37,7 @@ export function isInteger(val: any): val is number { return Number.isInteger(val); } -/** A simplified implemation of lodash isPlainObject */ -export function isPlainObject(input: any): input is object { - if (input == null) return false; - if (Array.isArray(input)) return false; - if (Buffer.isBuffer(input)) return false; - if (typeof input !== 'object') return false; - return true; -} +export { isPlainObject }; /** A simplified implemation of lodash castArray */ export function castArray(input: any): T[] { diff --git a/packages/job-components/test/operations/data-entity-spec.ts b/packages/job-components/test/operations/data-entity-spec.ts index 5d7d27970bb..b9d957dc8cd 100644 --- a/packages/job-components/test/operations/data-entity-spec.ts +++ b/packages/job-components/test/operations/data-entity-spec.ts @@ -83,6 +83,38 @@ describe('DataEntity', () => { }); }); + describe('when constructed with a non-object', () => { + it('should do nothing when called with null', () => { + expect(() => { + // @ts-ignore + new DataEntity(null); + }).not.toThrow(); + }); + + it('should do nothing with called with undefined', () => { + expect(() => { + // @ts-ignore + new DataEntity(); + }).not.toThrow(); + }); + + it('should throw an error when called with an Array', () => { + const arr = [{ hello: true }]; + expect(() => { + // @ts-ignore + new DataEntity(arr); + }).toThrowError('Invalid data source, must be an object, got "array"'); + }); + + it('should throw an error when called with a Buffer', () => { + const buf = Buffer.from(JSON.stringify({ hello:true })); + expect(() => { + // @ts-ignore + new DataEntity(buf); + }).toThrowError('Invalid data source, must be an object, got "buffer"'); + }); + }); + describe('->toBuffer', () => { it('should be convertable to a buffer', () => { const dataEntity = new DataEntity({ foo: 'bar' }, { hello: 'there' }); @@ -195,6 +227,9 @@ describe('DataEntity', () => { }, setMetadata() { + }, + toBuffer() { + } }; expect(DataEntity.isDataEntity(fakeDataEntity)).toBeTrue(); @@ -237,6 +272,9 @@ describe('DataEntity', () => { }, setMetadata() { + }, + toBuffer() { + } }]; expect(DataEntity.isDataEntityArray(fakeDataEntities)).toBeTrue(); diff --git a/packages/job-components/test/operations/shims/shim-utils-spec.ts b/packages/job-components/test/operations/shims/shim-utils-spec.ts index 8d01f1f5939..354b8ac926d 100644 --- a/packages/job-components/test/operations/shims/shim-utils-spec.ts +++ b/packages/job-components/test/operations/shims/shim-utils-spec.ts @@ -47,10 +47,13 @@ describe('Shim Utils', () => { it('should handle an array of Objects', () => { const data = { hello: true }; - const result = convertResult([data]); + // @ts-ignore + const result = convertResult([data, data, null]); - expect(result).toBeArrayOfSize(1); + expect(result).toBeArrayOfSize(3); expect(result[0]).toEqual(data); + expect(result[1]).toEqual(data); + expect(result[2]).toEqual({}); }); it('should handle null', () => { diff --git a/packages/job-components/test/utils-spec.ts b/packages/job-components/test/utils-spec.ts index 95c2cc6775c..772f5127720 100644 --- a/packages/job-components/test/utils-spec.ts +++ b/packages/job-components/test/utils-spec.ts @@ -1,4 +1,5 @@ import 'jest-extended'; +import { DataEntity } from '../src'; import { waterfall, isPlainObject, parseJSON } from '../src/utils'; describe('Utils', () => { @@ -45,13 +46,21 @@ describe('Utils', () => { } it('should correctly detect the an object type', () => { + // @ts-ignore + expect(isPlainObject()).toBeFalse(); expect(isPlainObject(null)).toBeFalse(); expect(isPlainObject(true)).toBeFalse(); expect(isPlainObject([])).toBeFalse(); + expect(isPlainObject([{ hello: true }])).toBeFalse(); expect(isPlainObject('some-string')).toBeFalse(); expect(isPlainObject(Buffer.from('some-string'))).toBeFalse(); - expect(isPlainObject(new TestObj())).toBeTrue(); + expect(isPlainObject(new TestObj())).toBeFalse(); + expect(isPlainObject(new DataEntity({}))).toBeFalse(); + expect(isPlainObject(Promise.resolve())).toBeFalse(); + expect(isPlainObject(Object.create({}))).toBeTrue(); + expect(isPlainObject(Object.create({ hello: true }))).toBeTrue(); expect(isPlainObject({})).toBeTrue(); + expect(isPlainObject({ hello: true })).toBeTrue(); }); }); diff --git a/packages/teraslice-cli/package.json b/packages/teraslice-cli/package.json index 78694c32b6f..fd5f9c2e2f0 100755 --- a/packages/teraslice-cli/package.json +++ b/packages/teraslice-cli/package.json @@ -28,7 +28,7 @@ }, "dependencies": { "archiver": "^2.1.1", - "bluebird": "^3.5.1", + "bluebird": "^3.5.3", "chalk": "^2.3.2", "easy-table": "^1.1.1", "eslint": "^5.5.0", diff --git a/packages/teraslice-client-js/package.json b/packages/teraslice-client-js/package.json index f575365781f..4c86f324e97 100644 --- a/packages/teraslice-client-js/package.json +++ b/packages/teraslice-client-js/package.json @@ -28,7 +28,7 @@ ], "dependencies": { "auto-bind": "^1.2.1", - "bluebird": "^3.5.2", + "bluebird": "^3.5.3", "lodash": "^4.17.11", "request": "^2.88.0", "request-promise": "^4.2.2" diff --git a/packages/teraslice-messaging/package.json b/packages/teraslice-messaging/package.json index b320a072c69..08610a16667 100644 --- a/packages/teraslice-messaging/package.json +++ b/packages/teraslice-messaging/package.json @@ -37,7 +37,7 @@ }, "dependencies": { "@terascope/queue": "^1.1.4", - "bluebird": "^3.5.2", + "bluebird": "^3.5.3", "debug": "^4.1.0", "nanoid": "^2.0.0", "p-event": "^2.1.0", diff --git a/packages/teraslice-op-test-harness/package.json b/packages/teraslice-op-test-harness/package.json index 573149bd917..ef0b9d41b33 100644 --- a/packages/teraslice-op-test-harness/package.json +++ b/packages/teraslice-op-test-harness/package.json @@ -22,8 +22,8 @@ "url": "https://github.com/terascope/teraslice/issues" }, "dependencies": { - "@terascope/job-components": "^0.9.0", - "bluebird": "^3.5.2", + "@terascope/job-components": "^0.9.1", + "bluebird": "^3.5.3", "lodash": "^4.17.11" }, "devDependencies": { diff --git a/packages/teraslice/lib/cluster/storage/assets.js b/packages/teraslice/lib/cluster/storage/assets.js index f978c52a1ae..60810c7c8e5 100644 --- a/packages/teraslice/lib/cluster/storage/assets.js +++ b/packages/teraslice/lib/cluster/storage/assets.js @@ -212,7 +212,7 @@ module.exports = function module(context) { const assets = await findAssetsToAutoload(autoloadDir); - for (const asset of assets) { + const promises = assets.map(async (asset) => { logger.info(`autoloading asset ${asset}...`); const assetPath = path.join(autoloadDir, asset); try { @@ -224,7 +224,9 @@ module.exports = function module(context) { throw err; } } - } + }); + + await Promise.all(promises); } diff --git a/packages/teraslice/lib/processors/stdout/processor.js b/packages/teraslice/lib/processors/stdout/processor.js index 21e020ad4f4..6e8da39b40b 100644 --- a/packages/teraslice/lib/processors/stdout/processor.js +++ b/packages/teraslice/lib/processors/stdout/processor.js @@ -1,13 +1,16 @@ 'use strict'; +/* eslint-disable no-console */ + +const _ = require('lodash'); const { BatchProcessor } = require('@terascope/job-components'); class Stdout extends BatchProcessor { async onBatch(data) { if (this.opConfig.limit === 0) { - console.log(data); // eslint-disable-line + console.log(data); } else { - console.log(_.take(data, opConfig.limit)); // eslint-disable-line + console.log(_.take(data, this.opConfig.limit)); } return data; } diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 02360090e07..89baa5f12f7 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -1,6 +1,6 @@ { "name": "teraslice", - "version": "0.43.1", + "version": "0.43.2", "description": "Slice and dice your Elasticsearch data", "bin": "service.js", "main": "index.js", @@ -36,12 +36,12 @@ "dependencies": { "@terascope/elasticsearch-api": "^1.1.2", "@terascope/error-parser": "^1.0.1", - "@terascope/job-components": "^0.9.0", + "@terascope/job-components": "^0.9.1", "@terascope/queue": "^1.1.4", "@terascope/teraslice-messaging": "^0.2.4", "async-mutex": "^0.1.3", "barbe": "^3.0.14", - "bluebird": "^3.5.2", + "bluebird": "^3.5.3", "bluebird-retry": "^0.11.0", "body-parser": "^1.18.2", "convict": "^4.4.0", diff --git a/types/kind-of/index.d.ts b/types/kind-of/index.d.ts new file mode 100644 index 00000000000..f1b925ca141 --- /dev/null +++ b/types/kind-of/index.d.ts @@ -0,0 +1,3 @@ +/** Declaration file generated by dts-gen */ + +export default function kindOf(input: any): string; diff --git a/yarn.lock b/yarn.lock index b84fe6bff51..106cc6fedb3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1417,6 +1417,11 @@ bluebird@^3.5.0, bluebird@^3.5.1, bluebird@^3.5.2, bluebird@~3.5.0: resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.5.2.tgz#1be0908e054a751754549c270489c1505d4ab15a" integrity sha512-dhHTWMI7kMx5whMQntl7Vr9C6BvV10lFXDAasnqnrMYhXVCzzk6IO9Fo2L75jXHT07WrOngL1WDXOp+yYS91Yg== +bluebird@^3.5.3: + version "3.5.3" + resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.5.3.tgz#7d01c6f9616c9a51ab0f8c549a79dfe6ec33efa7" + integrity sha512-/qKPUQlaW1OyR51WeCPBvRnAlnZFUJkCSG5HzGnuIqhgyJtF+T94lFnn33eiazjRm2LAHVy2guNnaq48X9SJuw== + body-parser@1.18.3, body-parser@^1.18.2: version "1.18.3" resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.18.3.tgz#5b292198ffdd553b3a0f20ded0592b956955c8b4"