diff --git a/packages/teraslice-state-storage/package.json b/packages/teraslice-state-storage/package.json index 67be5cf62b7..52578004740 100644 --- a/packages/teraslice-state-storage/package.json +++ b/packages/teraslice-state-storage/package.json @@ -1,6 +1,6 @@ { "name": "@terascope/teraslice-state-storage", - "version": "0.2.0", + "version": "0.3.0", "description": "State storage operation api for teraslice", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-state-storage#readme", "bugs": { @@ -28,7 +28,8 @@ "@terascope/elasticsearch-api": "^2.0.5", "@terascope/job-components": "^0.20.4", "bluebird": "^3.5.5", - "lru-cache": "^5.1.1" + "lru-cache": "^5.1.1", + "mnemonist": "^0.29.0" }, "devDependencies": { "@types/lru-cache": "^5.1.0" diff --git a/packages/teraslice-state-storage/src/cached-state-storage/index.ts b/packages/teraslice-state-storage/src/cached-state-storage/index.ts index c5cdee3955a..8d0a56ec8a2 100644 --- a/packages/teraslice-state-storage/src/cached-state-storage/index.ts +++ b/packages/teraslice-state-storage/src/cached-state-storage/index.ts @@ -1,76 +1,63 @@ -import LRU from 'lru-cache'; -import { DataEntity, TSError } from '@terascope/job-components'; -import { CacheConfig, MGetCacheResponse } from '../interfaces'; +import LRU from 'mnemonist/lru-cache'; +import { promisify } from 'util'; +import { CacheConfig, MGetCacheResponse, SetTuple, ValuesFn } from '../interfaces'; -export default class CachedStateStorage { +const immediate = promisify(setImmediate); + +export default class CachedStateStorage { protected IDField: string; - private cache: LRU; + private cache: LRU; constructor(config: CacheConfig) { this.IDField = '_key'; - this.cache = new LRU({ - max: config.cache_size, - maxAge: config.max_age - }); + this.cache = new LRU(config.cache_size); } - private getIdentifier(doc: DataEntity) { - const id = doc.getMetadata(this.IDField); - if (id === '' || id == null) { - throw new TSError(`There is no field "${this.IDField}" set in the metadata`, { context: { doc } }); - } - return id; + get(key: string): T | undefined { + return this.cache.get(key); } - get(doc: DataEntity): DataEntity|undefined { - const identifier = this.getIdentifier(doc); - return this.cache.get(identifier); - } - - mget(docArray: DataEntity[]): MGetCacheResponse { - return docArray.reduce((cachedState, doc) => { - const identifier = this.getIdentifier(doc); - const state = this.cache.get(identifier); - if (state) cachedState[identifier] = state; + mget(keyArray: string[]): MGetCacheResponse { + return keyArray.reduce((cachedState, key) => { + const state = this.cache.get(key); + if (state) cachedState[key] = state; return cachedState; }, {}); } - set(doc: DataEntity) { - const identifier = this.getIdentifier(doc); - this.cache.set(identifier, doc); + set(key: string, value: T) { + const results = this.cache.setpop(key, value); + if (results && results.evicted) return results; + return undefined; } - mset(docArray: DataEntity[]) { - docArray.forEach(doc => this.set(doc)); - } - - delete(doc: DataEntity) { - const identifier = this.getIdentifier(doc); - this.cache.del(identifier); - } - - mdelete(docArray: DataEntity[]) { - docArray.forEach(doc => this.delete(doc)); + mset(docArray: SetTuple[]) { + return docArray.map(doc => this.set(doc.key, doc.data)).filter(Boolean); } count() { - return this.cache.itemCount; + return this.cache.size; } - values() { - return this.cache.values(); + async values(fn: ValuesFn) { + const iterator = this.cache.values(); + // @ts-ignore + while (!iterator.done) { + const next = iterator.next(); + const { done, value } = next; + if (!done) fn(value); + await immediate(); + } } - has(doc: DataEntity) { - const identifier = this.getIdentifier(doc); - return this.cache.has(identifier); + has(key: string) { + return this.cache.has(key); } initialize() {} - shutdown() { - this.cache.reset(); + clear() { + this.cache.clear(); } } diff --git a/packages/teraslice-state-storage/src/elasticsearch-state-storage/index.ts b/packages/teraslice-state-storage/src/elasticsearch-state-storage/index.ts index 3b321d7112c..2381b399390 100644 --- a/packages/teraslice-state-storage/src/elasticsearch-state-storage/index.ts +++ b/packages/teraslice-state-storage/src/elasticsearch-state-storage/index.ts @@ -15,7 +15,8 @@ export default class ESCachedStateStorage { private persist: boolean; private persistField: string; private es: Client; - public cache: CachedStateStorage; + private logger: Logger; + public cache: CachedStateStorage; constructor(client: Client, logger: Logger, config: ESStateStorageConfig) { this.index = config.index; @@ -27,6 +28,7 @@ export default class ESCachedStateStorage { this.persist = config.persist; this.persistField = config.persist_field || this.IDField; this.cache = new CachedStateStorage(config); + this.logger = logger; this.es = esApi(client, logger); } @@ -103,7 +105,8 @@ export default class ESCachedStateStorage { } async get(doc: DataEntity) { - let cached = this.cache.get(doc); + const indentifier = this.getIdentifier(doc); + let cached = this.cache.get(indentifier); if (!cached) { cached = await this._esGet(doc); } @@ -115,11 +118,11 @@ export default class ESCachedStateStorage { const uniqDocs = this._dedupeDocs(docArray); const savedDocs = {}; const unCachedDocKeys: string[] = []; - + let droppedCachedKeys = 0; // need to add valid docs to return object and find non-cached docs uniqDocs.forEach((doc) => { const key = this.getIdentifier(doc); - const cachedDoc = this.cache.get(doc); + const cachedDoc = this.cache.get(key); if (cachedDoc) { savedDocs[key] = cachedDoc; @@ -144,26 +147,34 @@ export default class ESCachedStateStorage { results.forEach((doc: DataEntity) => { const data = mapperFn(doc); // update cache - this.set(data); + // TODO: need to deal with overflows here + const dropped = this.set(data); + if (dropped && dropped.evicted) droppedCachedKeys++; // updated savedDocs object savedDocs[this.getIdentifier(data)] = data; }); }); + if (droppedCachedKeys > 0) { + this.logger.info(`${droppedCachedKeys} keys have been evicted from elasticsearch-state-storgae cache`); + } + return savedDocs; } - async set(doc: DataEntity) { + set(doc: DataEntity) { // update cache, if persistance is needed use mset - return this.cache.set(doc); + const identifier = this.getIdentifier(doc); + return this.cache.set(identifier, doc); } async mset(docArray: DataEntity[], keyField?: string) { const dedupedDocs = this._dedupeDocs(docArray, keyField); + const formattedDocs = dedupedDocs.map((doc) => ({ data: doc, key: this.getIdentifier(doc) })); if (this.persist) { - return bPromise.all([this.cache.mset(dedupedDocs), this._esBulkUpdate(dedupedDocs)]); + return bPromise.all([this.cache.mset(formattedDocs), this._esBulkUpdate(dedupedDocs)]); } - return this.cache.mset(dedupedDocs); + return this.cache.mset(formattedDocs); } count() { @@ -175,7 +186,7 @@ export default class ESCachedStateStorage { } async shutdown() { - this.cache.shutdown(); + this.cache.clear(); } } diff --git a/packages/teraslice-state-storage/src/interfaces.ts b/packages/teraslice-state-storage/src/interfaces.ts index 8a1d4889ebd..100a5ce336a 100644 --- a/packages/teraslice-state-storage/src/interfaces.ts +++ b/packages/teraslice-state-storage/src/interfaces.ts @@ -33,7 +33,6 @@ export interface ESQUery { export interface CacheConfig { cache_size: number; - max_age: number; } export interface MGetCacheResponse { @@ -52,3 +51,10 @@ export interface MGetDoc { found: boolean; _source?: any; } + +export type ValuesFn = (doc: T) => void; + +export interface SetTuple { + key: string; + data: T; +} diff --git a/packages/teraslice-state-storage/test/cache-state-storage-spec.ts b/packages/teraslice-state-storage/test/cache-state-storage-spec.ts index c16c52fb690..c2f5c1d0f59 100644 --- a/packages/teraslice-state-storage/test/cache-state-storage-spec.ts +++ b/packages/teraslice-state-storage/test/cache-state-storage-spec.ts @@ -1,7 +1,7 @@ import 'jest-extended'; import { DataEntity } from '@terascope/job-components'; -import { CachedStateStorage } from '../src'; +import { CachedStateStorage, SetTuple } from '../src'; describe('Cache Storage State', () => { @@ -21,54 +21,68 @@ describe('Cache Storage State', () => { } ].map((obj, index) => DataEntity.make(obj, { [idField]: index + 1 })); + const formattedMSet: SetTuple[] = docArray.map((obj) => ({ data: obj, key: obj.getMetadata(idField) })); + const formattedMGet = docArray.map(data => data.getMetadata(idField)); + const config = { id_field: idField, cache_size: 100000, - max_age: 24 * 3600 * 1000 }; - let cache: CachedStateStorage; + let cache: CachedStateStorage; beforeEach(() => { cache = new CachedStateStorage(config); }); afterEach(() => { - cache.shutdown(); + cache.clear(); }); it('set should add items to the storage', () => { - cache.set(doc); + const key = doc.getMetadata(idField); + cache.set(key, doc); expect(cache.count()).toBe(1); }); it('get should return data from storage', () => { - cache.set(doc); - const cachedData = cache.get(doc); + const key = doc.getMetadata(idField); + cache.set(key, doc); + const cachedData = cache.get(key); expect(cachedData).toEqual(doc); expect(DataEntity.isDataEntity(cachedData)).toEqual(true); }); it('get should return undefined if not stored', () => { - const cachedData = cache.get(doc); + const key = doc.getMetadata(idField); + const cachedData = cache.get(key); expect(cachedData).toBeUndefined(); }); - it('delete should delete item from storage', () => { - cache.set(doc); - cache.delete(doc); - expect(cache.get(doc)).toBeUndefined(); + it('mset should add many items to storage', () => { + cache.mset(formattedMSet); + expect(cache.count()).toEqual(3); }); + // we are making this async becuase thats how the consumer will be using this + it('values returns an iterator to fetch all values from cache', async() => { + const results: DataEntity[] = []; + cache.mset(formattedMSet); + + function mapper(data: DataEntity) { + results.push(data); + } + + await cache.values(mapper); - it('mset should add many items to storage', () => { - cache.mset(docArray); expect(cache.count()).toEqual(3); + expect(results).toBeArrayOfSize(3); + expect(results.reverse()).toEqual(docArray); }); it('mget should return many items from storage', () => { - cache.mset(docArray); - const data = cache.mget(docArray); + cache.mset(formattedMSet); + const data = cache.mget(formattedMGet); const keys = Object.keys(data); expect(keys.length).toBe(3); @@ -81,16 +95,32 @@ describe('Cache Storage State', () => { }); }); - it('mdelete should delete many records from storage', () => { - cache.mset(docArray); - cache.mdelete(docArray); + it('clear should remove all cached data', () => { + cache.mset(formattedMSet); + expect(cache.count()).toBe(docArray.length); + cache.clear(); expect(cache.count()).toBe(0); }); - it('shutdown should remove all cached data', () => { - cache.mset(docArray); - expect(cache.count()).toBe(docArray.length); - cache.shutdown(); - expect(cache.count()).toBe(0); + it('when cache is to large it returns the oldest value on set', async() => { + const smallSizeConfig = { + id_field: idField, + cache_size: 2, + }; + + const testCache = new CachedStateStorage(smallSizeConfig); + const set1 = testCache.set(formattedMSet[0].key, formattedMSet[0].data); + const set2 = testCache.set(formattedMSet[1].key, formattedMSet[1].data); + + expect(set1).toBeUndefined(); + expect(set2).toBeUndefined(); + expect(testCache.count()).toEqual(2); + + const set3 = testCache.set(formattedMSet[2].key, formattedMSet[2].data); + + expect(testCache.count()).toEqual(2); + expect(set3!.key).toEqual(formattedMSet[0].key); + expect(set3!.value).toEqual(formattedMSet[0].data); + expect(set3!.evicted).toBeTrue(); }); }); diff --git a/packages/teraslice-state-storage/test/elasticsearch-state-storage-spec.ts b/packages/teraslice-state-storage/test/elasticsearch-state-storage-spec.ts index 8e444a4fd38..a7d7f972df9 100644 --- a/packages/teraslice-state-storage/test/elasticsearch-state-storage-spec.ts +++ b/packages/teraslice-state-storage/test/elasticsearch-state-storage-spec.ts @@ -108,7 +108,6 @@ describe('elasticsearch cached state storage', () => { source_fields: [], chunk_size: 10, cache_size: 100000, - max_age: 24 * 3600 * 1000, persist: false, persist_field: idField }; @@ -270,4 +269,29 @@ describe('elasticsearch cached state storage', () => { expect(doubleDocs.length).toBe(6); expect(deduped.length).toBe(3); }); + + it('should log if mget drops keys from cache', async () => { + const testConfig: ESStateStorageConfig = { + index: 'some_index', + type: 'sometype', + concurrency: 10, + source_fields: [], + chunk_size: 10, + cache_size: 2, + persist: false, + persist_field: idField + }; + let msg; + + const testLogger = { + info: (_msg:string) => msg = _msg + }; + // @ts-ignore + const testStateStorage = new ESCachedStateStorage(client, testLogger, testConfig); + // create bulk response + client.setMGetData({ docs: createMgetData(docArray.slice()) }); + // state response + await testStateStorage.mget(docArray); + expect(msg).toEqual('1 keys have been evicted from elasticsearch-state-storgae cache'); + }); }); diff --git a/yarn.lock b/yarn.lock index 87a1dae6c65..4bdbc432d74 100644 --- a/yarn.lock +++ b/yarn.lock @@ -11396,6 +11396,13 @@ mkdirp@0.5.1, mkdirp@0.5.x, mkdirp@0.x, mkdirp@^0.5.0, mkdirp@^0.5.1, mkdirp@~0. dependencies: minimist "0.0.8" +mnemonist@^0.29.0: + version "0.29.0" + resolved "https://registry.yarnpkg.com/mnemonist/-/mnemonist-0.29.0.tgz#46f4be7ca8decde47e1150c3b34a2762292a93fa" + integrity sha512-5dCXynwnyjBy/jEQ35185H+XxbiMJtcapd6lmHbVhjPkRtXP3BjA/GS1EJyoMQZOGQs65sBhX30wNAepl5twsQ== + dependencies: + obliterator "^1.5.0" + modify-values@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/modify-values/-/modify-values-1.0.1.tgz#b3939fa605546474e3e3e3c63d64bd43b4ee6022" @@ -12191,6 +12198,11 @@ object.values@^1.1.0: function-bind "^1.1.1" has "^1.0.3" +obliterator@^1.5.0: + version "1.5.0" + resolved "https://registry.yarnpkg.com/obliterator/-/obliterator-1.5.0.tgz#f3535e5be192473ef59efb2d30396738f7c645c6" + integrity sha512-dENe0UviDf8/auXn0bIBKwCcUr49khvSBWDLlszv/ZB2qz1VxWDmkNKFqO2nfmve7hQb/QIDY7+rc7K3LdJimQ== + obuf@^1.0.0, obuf@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e"