Skip to content

Commit

Permalink
DataEntity serialization/deserialization (#883)
Browse files Browse the repository at this point in the history
* update dependencies

* Add fromBuffer support to DataEntity

* Add DataEntity->toBuffer support

* Add benchmark suite and ensure toBuffer is not enumerable

* bump job-components to 0.9.0

* update lodash types
  • Loading branch information
peterdemartini authored and kstaken committed Nov 6, 2018
1 parent 9e7d8ee commit f2990d2
Show file tree
Hide file tree
Showing 18 changed files with 300 additions and 42 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"@types/convict": "^4.2.0",
"@types/debug": "^0.0.31",
"@types/fs-extra": "^5.0.4",
"@types/lodash": "^4.14.117",
"@types/lodash": "^4.14.118",
"@types/lodash.clonedeep": "^4.5.4",
"@types/nanoid": "^1.2.0",
"@types/node": "^10.12.2",
Expand Down
32 changes: 32 additions & 0 deletions packages/job-components/bench/data-encoding-suite.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict';

const { Suite } = require('./helpers');
const { DataEntity } = require('../dist');

const data = JSON.stringify({
id: Math.random(),
hello: 'sir',
hi: 'dude',
howdy: 'there'
});

const dataBuf = Buffer.from(data);

module.exports = () => Suite('DataEncoding')
.add('without DataEntities', {
fn() {
const obj = JSON.parse(dataBuf);
Buffer.from(JSON.stringify(Object.assign({}, obj)));
}
})
.add('with DataEntities', {
fn() {
const dataEntity = DataEntity.fromBuffer(dataBuf);
dataEntity.toBuffer();
}
})
.run({
async: true,
initCount: 2,
maxTime: 5,
});
2 changes: 1 addition & 1 deletion packages/job-components/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/job-components",
"version": "0.8.2",
"version": "0.9.0",
"publishConfig": {
"access": "public"
},
Expand Down
14 changes: 13 additions & 1 deletion packages/job-components/src/interfaces/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@
* OpConfig is the configuration that user specifies
* for a Operation.
* The only required property is `_op` since that is used
* to find the operation
* to find the operation.
* Encoding defaults to "JSON" when DataEntity.fromBuffer() is called
*/
export interface OpConfig {
_op: string;
_encoding?: DataEncoding;
}

/**
* An enum of available encoding formats
*/
export enum DataEncoding {
JSON = 'json',
}

/** A list of supported encoding formats */
export const dataEncodings = [DataEncoding.JSON];

export enum LifeCycle {
Once = 'once',
Persistent = 'persistent',
Expand Down
7 changes: 6 additions & 1 deletion packages/job-components/src/job-schemas.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

import { Context } from './interfaces';
import { Context, DataEncoding, dataEncodings } from './interfaces';
import convict from 'convict';
import { flatten } from './utils';
import os from 'os';
Expand Down Expand Up @@ -190,4 +190,9 @@ export const opSchema: convict.Schema<any> = {
doc: 'Name of operation, it must reflect the name of the file',
format: 'required_String',
},
_encoding: {
doc: 'Used to specify the encoding type of the data',
default: DataEncoding.JSON,
format: dataEncodings,
}
};
37 changes: 36 additions & 1 deletion packages/job-components/src/operations/data-entity.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { fastAssign, fastMap, isFunction, isPlainObject } from '../utils';
import { fastAssign, fastMap, isFunction, isPlainObject, parseJSON } from '../utils';
import { DataEncoding } from '../interfaces';

// WeakMaps are used as a memory efficient reference to private data
const _metadata = new WeakMap();
Expand All @@ -19,6 +20,21 @@ export default class DataEntity {
return new DataEntity(input, metadata);
}

/**
* A utility for safely converting an buffer to a DataEntity.
* @param input A buffer to parse to JSON
* @param opConfig The operation config used to get the encoding type of the buffer, defaults to "json"
* @param metadata Optionally add any metadata
*/
static fromBuffer(input: Buffer, opConfig: EncodingConfig = {}, metadata?: object): DataEntity {
const { _encoding = DataEncoding.JSON } = opConfig || {};
if (_encoding === DataEncoding.JSON) {
return new DataEntity(parseJSON(input), metadata);
}

throw new Error(`Unsupported encoding type, got "${_encoding}"`);
}

/**
* A utility for safely converting an input of an object,
* or an array of objects, to an array of DataEntities.
Expand Down Expand Up @@ -99,6 +115,25 @@ export default class DataEntity {
metadata[key] = value;
_metadata.set(this, metadata);
}

/**
* Convert the DataEntity to an encoded buffer
* @param opConfig The operation config used to get the encoding type of the buffer, defaults to "json"
*/
toBuffer(config: EncodingConfig = {}): Buffer {
const { _encoding = DataEncoding.JSON } = config;
if (_encoding === DataEncoding.JSON) {
return Buffer.from(JSON.stringify(this));
}

throw new Error(`Unsupported encoding type, got "${_encoding}"`);
}
}

/** an encoding focused interfaces */
export interface EncodingConfig {
_op?: string;
_encoding?: DataEncoding;
}

export type DataInput = object|DataEntity;
Expand Down
25 changes: 25 additions & 0 deletions packages/job-components/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,31 @@ export function isString(val: any): val is string {
return typeof val === 'string' ? true : false;
}

/** Safely convert any input to a string */
export function toString(val: any): string {
if (val && isFunction(val.toString)) {
return val.toString();
}

return JSON.stringify(val);
}

/**
* A utility for serializing a buffer to a json object
*/
export function parseJSON<T = object>(buf: Buffer|string): T {
if (!Buffer.isBuffer(buf) && !isString(buf)) {
throw new TypeError(`Failure to serialize non-buffer, got "${typeof buf}"`);
}

try {
// @ts-ignore because it does work with buffers
return JSON.parse(buf);
} catch (err) {
throw new Error(`Failure to parse buffer, ${toString(err)}`);
}
}

/** A simplified implemation of lodash isInteger */
export function isInteger(val: any): val is number {
if (typeof val !== 'number') return false;
Expand Down
45 changes: 45 additions & 0 deletions packages/job-components/test/config-validators-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,57 @@ describe('When using native clustering', () => {
const config = validateOpConfig(schema, op);
expect(config as object).toEqual({
_op: 'some-op',
_encoding: 'json',
example: 'example',
formatted_value: 'hi',
test: true,
});
});

it('should handle a custom encoding', () => {
const op = {
_op: 'some-op',
_encoding: 'json',
example: 'example',
formatted_value: 'hi',
};

const config = validateOpConfig(schema, op);
expect(config as object).toEqual({
_op: 'some-op',
_encoding: 'json',
example: 'example',
formatted_value: 'hi',
test: true,
});
});

it('should handle an invalid encoding', () => {
const op = {
_op: 'some-op',
_encoding: 'uh-oh',
example: 'example',
formatted_value: 'hi',
};

expect(() => {
validateOpConfig(schema, op);
}).toThrow();
});

it('should handle a non-string encoding', () => {
const op = {
_op: 'some-op',
_encoding: 123,
example: 'example',
formatted_value: 'hi',
};

expect(() => {
validateOpConfig(schema, op);
}).toThrow();
});

it('should fail when given invalid input', () => {
const op = {
_op: 'some-op',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ describe('Convict Schema', () => {
example: 'hi'
})).toEqual({
_op: 'hello',
example: 'hi'
_encoding: 'json',
example: 'hi',
});
});

Expand Down
79 changes: 78 additions & 1 deletion packages/job-components/test/operations/data-entity-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'jest-extended'; // require for type definitions
import { DataEntity } from '../../src';
import { DataEntity, DataEncoding } from '../../src';
import { parseJSON } from '../../src/utils';

describe('DataEntity', () => {
describe('when constructed with an object', () => {
Expand Down Expand Up @@ -33,17 +34,20 @@ describe('DataEntity', () => {
const keys = Object.keys(dataEntity);
expect(keys).not.toInclude('getMetadata');
expect(keys).not.toInclude('setMetadata');
expect(keys).not.toInclude('toBuffer');

for (const prop in dataEntity) {
expect(prop).not.toEqual('getMetadata');
expect(prop).not.toEqual('setMetadata');
expect(prop).not.toEqual('toBuffer');
}
});

it('should only convert non-metadata properties with stringified', () => {
const object = JSON.parse(JSON.stringify(dataEntity));
expect(object).not.toHaveProperty('getMetadata');
expect(object).not.toHaveProperty('setMetadata');
expect(object).not.toHaveProperty('toBuffer');

expect(object).toHaveProperty('teal', 'neal');
expect(object).toHaveProperty('blue', 'green');
Expand Down Expand Up @@ -79,6 +83,35 @@ describe('DataEntity', () => {
});
});

describe('->toBuffer', () => {
it('should be convertable to a buffer', () => {
const dataEntity = new DataEntity({ foo: 'bar' }, { hello: 'there' });
const buf = dataEntity.toBuffer({ _encoding: DataEncoding.JSON });
expect(Buffer.isBuffer(buf)).toBeTrue();
const obj = parseJSON(buf);

expect(obj).toEqual({ foo: 'bar' });
});

it('should be able to handle no config', () => {
const dataEntity = new DataEntity({ foo: 'bar' }, { hello: 'there' });
const buf = dataEntity.toBuffer();
expect(Buffer.isBuffer(buf)).toBeTrue();
const obj = parseJSON(buf);

expect(obj).toEqual({ foo: 'bar' });
});

it('should fail if given an invalid encoding', () => {
const dataEntity = new DataEntity({ foo: 'bar' }, { hello: 'there' });

expect(() => {
// @ts-ignore
dataEntity.toBuffer({ _encoding: 'baz' });
}).toThrowError('Unsupported encoding type, got "baz"');
});
});

describe('#make', () => {
describe('when wrapped', () => {
it('should return a single data entity', () => {
Expand Down Expand Up @@ -228,4 +261,48 @@ describe('DataEntity', () => {
expect(DataEntity.getMetadata(null, 'hi')).toBeNil();
});
});

describe('#fromBuffer', () => {
it('should be able to create a DataEntity from a buffer', () => {
const buf = Buffer.from(JSON.stringify({ foo: 'bar' }));
const entity = DataEntity.fromBuffer(buf, {
_op: 'baz',
_encoding: DataEncoding.JSON,
}, {
howdy: 'there'
});

expect(entity.foo).toEqual('bar');
expect(entity.getMetadata('howdy')).toEqual('there');
});

it('should be able handle no config', () => {
const buf = Buffer.from(JSON.stringify({ foo: 'bar' }));
const entity = DataEntity.fromBuffer(buf);

expect(entity.foo).toEqual('bar');
});

it('should throw an error if given invalid buffer', () => {
const buf = Buffer.from('hello:there');
expect(() => {
DataEntity.fromBuffer(buf, {
_op: 'test',
_encoding: DataEncoding.JSON,
});
}).toThrow();
});

it('should throw an error if given an unsupported encoding', () => {
const buf = Buffer.from(JSON.stringify({ hi: 'there' }));
expect(() => {
// @ts-ignore
DataEntity.fromBuffer(buf, {
_op: 'test',
_encoding: 'crazy',
});
}).toThrowError('Unsupported encoding type, got "crazy"');
});

});
});
Loading

0 comments on commit f2990d2

Please sign in to comment.