Skip to content

Commit

Permalink
changed cache implmentation, cache is more versitile in what can be s…
Browse files Browse the repository at this point in the history
…tored, async iteration
  • Loading branch information
jsnoble committed Jul 17, 2019
1 parent dc7afca commit f079be4
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 85 deletions.
5 changes: 3 additions & 2 deletions packages/teraslice-state-storage/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/teraslice-state-storage",
"version": "0.2.0",
"version": "0.3.0",
"description": "State storage operation api for teraslice",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-state-storage#readme",
"bugs": {
Expand Down Expand Up @@ -28,7 +28,8 @@
"@terascope/elasticsearch-api": "^2.0.5",
"@terascope/job-components": "^0.20.4",
"bluebird": "^3.5.5",
"lru-cache": "^5.1.1"
"lru-cache": "^5.1.1",
"mnemonist": "^0.29.0"
},
"devDependencies": {
"@types/lru-cache": "^5.1.0"
Expand Down
81 changes: 34 additions & 47 deletions packages/teraslice-state-storage/src/cached-state-storage/index.ts
Original file line number Diff line number Diff line change
@@ -1,76 +1,63 @@

import LRU from 'lru-cache';
import { DataEntity, TSError } from '@terascope/job-components';
import { CacheConfig, MGetCacheResponse } from '../interfaces';
import LRU from 'mnemonist/lru-cache';
import { promisify } from 'util';
import { CacheConfig, MGetCacheResponse, SetTuple, ValuesFn } from '../interfaces';

export default class CachedStateStorage {
const immediate = promisify(setImmediate);

export default class CachedStateStorage<T> {
protected IDField: string;
private cache: LRU<string, DataEntity>;
private cache: LRU<string, T>;

constructor(config: CacheConfig) {
this.IDField = '_key';
this.cache = new LRU({
max: config.cache_size,
maxAge: config.max_age
});
this.cache = new LRU(config.cache_size);
}

private getIdentifier(doc: DataEntity) {
const id = doc.getMetadata(this.IDField);
if (id === '' || id == null) {
throw new TSError(`There is no field "${this.IDField}" set in the metadata`, { context: { doc } });
}
return id;
get(key: string): T | undefined {
return this.cache.get(key);
}

get(doc: DataEntity): DataEntity<object>|undefined {
const identifier = this.getIdentifier(doc);
return this.cache.get(identifier);
}

mget(docArray: DataEntity[]): MGetCacheResponse {
return docArray.reduce((cachedState, doc) => {
const identifier = this.getIdentifier(doc);
const state = this.cache.get(identifier);
if (state) cachedState[identifier] = state;
mget(keyArray: string[]): MGetCacheResponse {
return keyArray.reduce((cachedState, key) => {
const state = this.cache.get(key);
if (state) cachedState[key] = state;
return cachedState;
}, {});
}

set(doc: DataEntity) {
const identifier = this.getIdentifier(doc);
this.cache.set(identifier, doc);
set(key: string, value: T) {
const results = this.cache.setpop(key, value);
if (results && results.evicted) return results;
return undefined;
}

mset(docArray: DataEntity[]) {
docArray.forEach(doc => this.set(doc));
}

delete(doc: DataEntity) {
const identifier = this.getIdentifier(doc);
this.cache.del(identifier);
}

mdelete(docArray: DataEntity[]) {
docArray.forEach(doc => this.delete(doc));
mset(docArray: SetTuple<T>[]) {
return docArray.map(doc => this.set(doc.key, doc.data)).filter(Boolean);
}

count() {
return this.cache.itemCount;
return this.cache.size;
}

values() {
return this.cache.values();
async values(fn: ValuesFn<T>) {
const iterator = this.cache.values();
// @ts-ignore
while (!iterator.done) {
const next = iterator.next();
const { done, value } = next;
if (!done) fn(value);
await immediate();
}
}

has(doc: DataEntity) {
const identifier = this.getIdentifier(doc);
return this.cache.has(identifier);
has(key: string) {
return this.cache.has(key);
}

initialize() {}

shutdown() {
this.cache.reset();
clear() {
this.cache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export default class ESCachedStateStorage {
private persist: boolean;
private persistField: string;
private es: Client;
public cache: CachedStateStorage;
private logger: Logger;
public cache: CachedStateStorage<DataEntity>;

constructor(client: Client, logger: Logger, config: ESStateStorageConfig) {
this.index = config.index;
Expand All @@ -27,6 +28,7 @@ export default class ESCachedStateStorage {
this.persist = config.persist;
this.persistField = config.persist_field || this.IDField;
this.cache = new CachedStateStorage(config);
this.logger = logger;
this.es = esApi(client, logger);
}

Expand Down Expand Up @@ -103,7 +105,8 @@ export default class ESCachedStateStorage {
}

async get(doc: DataEntity) {
let cached = this.cache.get(doc);
const indentifier = this.getIdentifier(doc);
let cached = this.cache.get(indentifier);
if (!cached) {
cached = await this._esGet(doc);
}
Expand All @@ -115,11 +118,11 @@ export default class ESCachedStateStorage {
const uniqDocs = this._dedupeDocs(docArray);
const savedDocs = {};
const unCachedDocKeys: string[] = [];

let droppedCachedKeys = 0;
// need to add valid docs to return object and find non-cached docs
uniqDocs.forEach((doc) => {
const key = this.getIdentifier(doc);
const cachedDoc = this.cache.get(doc);
const cachedDoc = this.cache.get(key);

if (cachedDoc) {
savedDocs[key] = cachedDoc;
Expand All @@ -144,26 +147,34 @@ export default class ESCachedStateStorage {
results.forEach((doc: DataEntity) => {
const data = mapperFn(doc);
// update cache
this.set(data);
// TODO: need to deal with overflows here
const dropped = this.set(data);
if (dropped && dropped.evicted) droppedCachedKeys++;
// updated savedDocs object
savedDocs[this.getIdentifier(data)] = data;
});
});

if (droppedCachedKeys > 0) {
this.logger.info(`${droppedCachedKeys} keys have been evicted from elasticsearch-state-storgae cache`);
}

return savedDocs;
}

async set(doc: DataEntity) {
set(doc: DataEntity) {
// update cache, if persistance is needed use mset
return this.cache.set(doc);
const identifier = this.getIdentifier(doc);
return this.cache.set(identifier, doc);
}

async mset(docArray: DataEntity[], keyField?: string) {
const dedupedDocs = this._dedupeDocs(docArray, keyField);
const formattedDocs = dedupedDocs.map((doc) => ({ data: doc, key: this.getIdentifier(doc) }));
if (this.persist) {
return bPromise.all([this.cache.mset(dedupedDocs), this._esBulkUpdate(dedupedDocs)]);
return bPromise.all([this.cache.mset(formattedDocs), this._esBulkUpdate(dedupedDocs)]);
}
return this.cache.mset(dedupedDocs);
return this.cache.mset(formattedDocs);
}

count() {
Expand All @@ -175,7 +186,7 @@ export default class ESCachedStateStorage {
}

async shutdown() {
this.cache.shutdown();
this.cache.clear();
}

}
8 changes: 7 additions & 1 deletion packages/teraslice-state-storage/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export interface ESQUery {

export interface CacheConfig {
cache_size: number;
max_age: number;
}

export interface MGetCacheResponse {
Expand All @@ -52,3 +51,10 @@ export interface MGetDoc {
found: boolean;
_source?: any;
}

export type ValuesFn<T> = (doc: T) => void;

export interface SetTuple<T> {
key: string;
data: T;
}
78 changes: 54 additions & 24 deletions packages/teraslice-state-storage/test/cache-state-storage-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import 'jest-extended';
import { DataEntity } from '@terascope/job-components';
import { CachedStateStorage } from '../src';
import { CachedStateStorage, SetTuple } from '../src';

describe('Cache Storage State', () => {

Expand All @@ -21,54 +21,68 @@ describe('Cache Storage State', () => {
}
].map((obj, index) => DataEntity.make(obj, { [idField]: index + 1 }));

const formattedMSet: SetTuple<DataEntity>[] = docArray.map((obj) => ({ data: obj, key: obj.getMetadata(idField) }));
const formattedMGet = docArray.map(data => data.getMetadata(idField));

const config = {
id_field: idField,
cache_size: 100000,
max_age: 24 * 3600 * 1000
};

let cache: CachedStateStorage;
let cache: CachedStateStorage<DataEntity>;

beforeEach(() => {
cache = new CachedStateStorage(config);
});

afterEach(() => {
cache.shutdown();
cache.clear();
});

it('set should add items to the storage', () => {
cache.set(doc);
const key = doc.getMetadata(idField);
cache.set(key, doc);
expect(cache.count()).toBe(1);
});

it('get should return data from storage', () => {
cache.set(doc);
const cachedData = cache.get(doc);
const key = doc.getMetadata(idField);
cache.set(key, doc);
const cachedData = cache.get(key);

expect(cachedData).toEqual(doc);
expect(DataEntity.isDataEntity(cachedData)).toEqual(true);
});

it('get should return undefined if not stored', () => {
const cachedData = cache.get(doc);
const key = doc.getMetadata(idField);
const cachedData = cache.get(key);
expect(cachedData).toBeUndefined();
});

it('delete should delete item from storage', () => {
cache.set(doc);
cache.delete(doc);
expect(cache.get(doc)).toBeUndefined();
it('mset should add many items to storage', () => {
cache.mset(formattedMSet);
expect(cache.count()).toEqual(3);
});
// we are making this async becuase thats how the consumer will be using this
it('values returns an iterator to fetch all values from cache', async() => {
const results: DataEntity[] = [];
cache.mset(formattedMSet);

function mapper(data: DataEntity) {
results.push(data);
}

await cache.values(mapper);

it('mset should add many items to storage', () => {
cache.mset(docArray);
expect(cache.count()).toEqual(3);
expect(results).toBeArrayOfSize(3);
expect(results.reverse()).toEqual(docArray);
});

it('mget should return many items from storage', () => {
cache.mset(docArray);
const data = cache.mget(docArray);
cache.mset(formattedMSet);
const data = cache.mget(formattedMGet);
const keys = Object.keys(data);
expect(keys.length).toBe(3);

Expand All @@ -81,16 +95,32 @@ describe('Cache Storage State', () => {
});
});

it('mdelete should delete many records from storage', () => {
cache.mset(docArray);
cache.mdelete(docArray);
it('clear should remove all cached data', () => {
cache.mset(formattedMSet);
expect(cache.count()).toBe(docArray.length);
cache.clear();
expect(cache.count()).toBe(0);
});

it('shutdown should remove all cached data', () => {
cache.mset(docArray);
expect(cache.count()).toBe(docArray.length);
cache.shutdown();
expect(cache.count()).toBe(0);
it('when cache is to large it returns the oldest value on set', async() => {
const smallSizeConfig = {
id_field: idField,
cache_size: 2,
};

const testCache = new CachedStateStorage(smallSizeConfig);
const set1 = testCache.set(formattedMSet[0].key, formattedMSet[0].data);
const set2 = testCache.set(formattedMSet[1].key, formattedMSet[1].data);

expect(set1).toBeUndefined();
expect(set2).toBeUndefined();
expect(testCache.count()).toEqual(2);

const set3 = testCache.set(formattedMSet[2].key, formattedMSet[2].data);

expect(testCache.count()).toEqual(2);
expect(set3!.key).toEqual(formattedMSet[0].key);
expect(set3!.value).toEqual(formattedMSet[0].data);
expect(set3!.evicted).toBeTrue();
});
});
Loading

0 comments on commit f079be4

Please sign in to comment.