Skip to content

Commit

Permalink
Merge pull request #1236 from terascope/storage-cache
Browse files Browse the repository at this point in the history
changed cache implementation, cache is more versatile in what can be s…
  • Loading branch information
jsnoble authored Jul 17, 2019
2 parents dc7afca + f079be4 commit 1c02e2d
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 85 deletions.
5 changes: 3 additions & 2 deletions packages/teraslice-state-storage/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/teraslice-state-storage",
"version": "0.2.0",
"version": "0.3.0",
"description": "State storage operation api for teraslice",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-state-storage#readme",
"bugs": {
Expand Down Expand Up @@ -28,7 +28,8 @@
"@terascope/elasticsearch-api": "^2.0.5",
"@terascope/job-components": "^0.20.4",
"bluebird": "^3.5.5",
"lru-cache": "^5.1.1"
"lru-cache": "^5.1.1",
"mnemonist": "^0.29.0"
},
"devDependencies": {
"@types/lru-cache": "^5.1.0"
Expand Down
81 changes: 34 additions & 47 deletions packages/teraslice-state-storage/src/cached-state-storage/index.ts
Original file line number Diff line number Diff line change
@@ -1,76 +1,63 @@

import LRU from 'lru-cache';
import { DataEntity, TSError } from '@terascope/job-components';
import { CacheConfig, MGetCacheResponse } from '../interfaces';
import LRU from 'mnemonist/lru-cache';
import { promisify } from 'util';
import { CacheConfig, MGetCacheResponse, SetTuple, ValuesFn } from '../interfaces';

export default class CachedStateStorage {
const immediate = promisify(setImmediate);

export default class CachedStateStorage<T> {
protected IDField: string;
private cache: LRU<string, DataEntity>;
private cache: LRU<string, T>;

constructor(config: CacheConfig) {
this.IDField = '_key';
this.cache = new LRU({
max: config.cache_size,
maxAge: config.max_age
});
this.cache = new LRU(config.cache_size);
}

private getIdentifier(doc: DataEntity) {
const id = doc.getMetadata(this.IDField);
if (id === '' || id == null) {
throw new TSError(`There is no field "${this.IDField}" set in the metadata`, { context: { doc } });
}
return id;
get(key: string): T | undefined {
return this.cache.get(key);
}

get(doc: DataEntity): DataEntity<object>|undefined {
const identifier = this.getIdentifier(doc);
return this.cache.get(identifier);
}

mget(docArray: DataEntity[]): MGetCacheResponse {
return docArray.reduce((cachedState, doc) => {
const identifier = this.getIdentifier(doc);
const state = this.cache.get(identifier);
if (state) cachedState[identifier] = state;
mget(keyArray: string[]): MGetCacheResponse {
return keyArray.reduce((cachedState, key) => {
const state = this.cache.get(key);
if (state) cachedState[key] = state;
return cachedState;
}, {});
}

set(doc: DataEntity) {
const identifier = this.getIdentifier(doc);
this.cache.set(identifier, doc);
set(key: string, value: T) {
const results = this.cache.setpop(key, value);
if (results && results.evicted) return results;
return undefined;
}

mset(docArray: DataEntity[]) {
docArray.forEach(doc => this.set(doc));
}

delete(doc: DataEntity) {
const identifier = this.getIdentifier(doc);
this.cache.del(identifier);
}

mdelete(docArray: DataEntity[]) {
docArray.forEach(doc => this.delete(doc));
mset(docArray: SetTuple<T>[]) {
return docArray.map(doc => this.set(doc.key, doc.data)).filter(Boolean);
}

count() {
return this.cache.itemCount;
return this.cache.size;
}

values() {
return this.cache.values();
async values(fn: ValuesFn<T>) {
const iterator = this.cache.values();
// @ts-ignore
while (!iterator.done) {
const next = iterator.next();
const { done, value } = next;
if (!done) fn(value);
await immediate();
}
}

has(doc: DataEntity) {
const identifier = this.getIdentifier(doc);
return this.cache.has(identifier);
has(key: string) {
return this.cache.has(key);
}

initialize() {}

shutdown() {
this.cache.reset();
clear() {
this.cache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export default class ESCachedStateStorage {
private persist: boolean;
private persistField: string;
private es: Client;
public cache: CachedStateStorage;
private logger: Logger;
public cache: CachedStateStorage<DataEntity>;

constructor(client: Client, logger: Logger, config: ESStateStorageConfig) {
this.index = config.index;
Expand All @@ -27,6 +28,7 @@ export default class ESCachedStateStorage {
this.persist = config.persist;
this.persistField = config.persist_field || this.IDField;
this.cache = new CachedStateStorage(config);
this.logger = logger;
this.es = esApi(client, logger);
}

Expand Down Expand Up @@ -103,7 +105,8 @@ export default class ESCachedStateStorage {
}

async get(doc: DataEntity) {
let cached = this.cache.get(doc);
const indentifier = this.getIdentifier(doc);
let cached = this.cache.get(indentifier);
if (!cached) {
cached = await this._esGet(doc);
}
Expand All @@ -115,11 +118,11 @@ export default class ESCachedStateStorage {
const uniqDocs = this._dedupeDocs(docArray);
const savedDocs = {};
const unCachedDocKeys: string[] = [];

let droppedCachedKeys = 0;
// need to add valid docs to return object and find non-cached docs
uniqDocs.forEach((doc) => {
const key = this.getIdentifier(doc);
const cachedDoc = this.cache.get(doc);
const cachedDoc = this.cache.get(key);

if (cachedDoc) {
savedDocs[key] = cachedDoc;
Expand All @@ -144,26 +147,34 @@ export default class ESCachedStateStorage {
results.forEach((doc: DataEntity) => {
const data = mapperFn(doc);
// update cache
this.set(data);
// TODO: need to deal with overflows here
const dropped = this.set(data);
if (dropped && dropped.evicted) droppedCachedKeys++;
// updated savedDocs object
savedDocs[this.getIdentifier(data)] = data;
});
});

if (droppedCachedKeys > 0) {
this.logger.info(`${droppedCachedKeys} keys have been evicted from elasticsearch-state-storgae cache`);
}

return savedDocs;
}

async set(doc: DataEntity) {
set(doc: DataEntity) {
// update cache, if persistance is needed use mset
return this.cache.set(doc);
const identifier = this.getIdentifier(doc);
return this.cache.set(identifier, doc);
}

async mset(docArray: DataEntity[], keyField?: string) {
const dedupedDocs = this._dedupeDocs(docArray, keyField);
const formattedDocs = dedupedDocs.map((doc) => ({ data: doc, key: this.getIdentifier(doc) }));
if (this.persist) {
return bPromise.all([this.cache.mset(dedupedDocs), this._esBulkUpdate(dedupedDocs)]);
return bPromise.all([this.cache.mset(formattedDocs), this._esBulkUpdate(dedupedDocs)]);
}
return this.cache.mset(dedupedDocs);
return this.cache.mset(formattedDocs);
}

count() {
Expand All @@ -175,7 +186,7 @@ export default class ESCachedStateStorage {
}

async shutdown() {
this.cache.shutdown();
this.cache.clear();
}

}
8 changes: 7 additions & 1 deletion packages/teraslice-state-storage/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export interface ESQUery {

export interface CacheConfig {
cache_size: number;
max_age: number;
}

export interface MGetCacheResponse {
Expand All @@ -52,3 +51,10 @@ export interface MGetDoc {
found: boolean;
_source?: any;
}

export type ValuesFn<T> = (doc: T) => void;

export interface SetTuple<T> {
key: string;
data: T;
}
78 changes: 54 additions & 24 deletions packages/teraslice-state-storage/test/cache-state-storage-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import 'jest-extended';
import { DataEntity } from '@terascope/job-components';
import { CachedStateStorage } from '../src';
import { CachedStateStorage, SetTuple } from '../src';

describe('Cache Storage State', () => {

Expand All @@ -21,54 +21,68 @@ describe('Cache Storage State', () => {
}
].map((obj, index) => DataEntity.make(obj, { [idField]: index + 1 }));

const formattedMSet: SetTuple<DataEntity>[] = docArray.map((obj) => ({ data: obj, key: obj.getMetadata(idField) }));
const formattedMGet = docArray.map(data => data.getMetadata(idField));

const config = {
id_field: idField,
cache_size: 100000,
max_age: 24 * 3600 * 1000
};

let cache: CachedStateStorage;
let cache: CachedStateStorage<DataEntity>;

beforeEach(() => {
cache = new CachedStateStorage(config);
});

afterEach(() => {
cache.shutdown();
cache.clear();
});

it('set should add items to the storage', () => {
cache.set(doc);
const key = doc.getMetadata(idField);
cache.set(key, doc);
expect(cache.count()).toBe(1);
});

it('get should return data from storage', () => {
cache.set(doc);
const cachedData = cache.get(doc);
const key = doc.getMetadata(idField);
cache.set(key, doc);
const cachedData = cache.get(key);

expect(cachedData).toEqual(doc);
expect(DataEntity.isDataEntity(cachedData)).toEqual(true);
});

it('get should return undefined if not stored', () => {
const cachedData = cache.get(doc);
const key = doc.getMetadata(idField);
const cachedData = cache.get(key);
expect(cachedData).toBeUndefined();
});

it('delete should delete item from storage', () => {
cache.set(doc);
cache.delete(doc);
expect(cache.get(doc)).toBeUndefined();
it('mset should add many items to storage', () => {
cache.mset(formattedMSet);
expect(cache.count()).toEqual(3);
});
// we are making this async becuase thats how the consumer will be using this
it('values returns an iterator to fetch all values from cache', async() => {
const results: DataEntity[] = [];
cache.mset(formattedMSet);

function mapper(data: DataEntity) {
results.push(data);
}

await cache.values(mapper);

it('mset should add many items to storage', () => {
cache.mset(docArray);
expect(cache.count()).toEqual(3);
expect(results).toBeArrayOfSize(3);
expect(results.reverse()).toEqual(docArray);
});

it('mget should return many items from storage', () => {
cache.mset(docArray);
const data = cache.mget(docArray);
cache.mset(formattedMSet);
const data = cache.mget(formattedMGet);
const keys = Object.keys(data);
expect(keys.length).toBe(3);

Expand All @@ -81,16 +95,32 @@ describe('Cache Storage State', () => {
});
});

it('mdelete should delete many records from storage', () => {
cache.mset(docArray);
cache.mdelete(docArray);
it('clear should remove all cached data', () => {
cache.mset(formattedMSet);
expect(cache.count()).toBe(docArray.length);
cache.clear();
expect(cache.count()).toBe(0);
});

it('shutdown should remove all cached data', () => {
cache.mset(docArray);
expect(cache.count()).toBe(docArray.length);
cache.shutdown();
expect(cache.count()).toBe(0);
it('when cache is to large it returns the oldest value on set', async() => {
const smallSizeConfig = {
id_field: idField,
cache_size: 2,
};

const testCache = new CachedStateStorage(smallSizeConfig);
const set1 = testCache.set(formattedMSet[0].key, formattedMSet[0].data);
const set2 = testCache.set(formattedMSet[1].key, formattedMSet[1].data);

expect(set1).toBeUndefined();
expect(set2).toBeUndefined();
expect(testCache.count()).toEqual(2);

const set3 = testCache.set(formattedMSet[2].key, formattedMSet[2].data);

expect(testCache.count()).toEqual(2);
expect(set3!.key).toEqual(formattedMSet[0].key);
expect(set3!.value).toEqual(formattedMSet[0].data);
expect(set3!.evicted).toBeTrue();
});
});
Loading

0 comments on commit 1c02e2d

Please sign in to comment.