Skip to content

Commit

Permalink
Add datasource field in accelerations cache (opensearch-project#1525)
Browse files Browse the repository at this point in the history
* add datasource in accelerations cache

Signed-off-by: Shenoy Pratik <[email protected]>

* fixed nits

Signed-off-by: Shenoy Pratik <[email protected]>

---------

Signed-off-by: Shenoy Pratik <[email protected]>
  • Loading branch information
ps48 authored Mar 13, 2024
1 parent d17cad3 commit b0f0d9b
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 65 deletions.
17 changes: 11 additions & 6 deletions common/types/data_connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ export interface CachedColumn {

export interface CachedTable {
name: string;
columns: CachedColumn[];
columns?: CachedColumn[];
}

export interface CachedDatabase {
name: string;
tables: CachedTable[];
lastUpdated: string; // Assuming date string in UTC format
lastUpdated: string; // date string in UTC format
status: CachedDataSourceStatus;
}

export interface CachedDataSource {
name: string;
lastUpdated: string; // Assuming date string in UTC format
lastUpdated: string; // date string in UTC format
status: CachedDataSourceStatus;
databases: CachedDatabase[];
}
Expand All @@ -115,13 +115,18 @@ export interface CachedAccelerations {
status: string;
}

export interface AccelerationsCacheData {
version: string;
export interface CachedAcclerationByDataSource {
name: string;
accelerations: CachedAccelerations[];
lastUpdated: string; // Assuming date string in UTC format
lastUpdated: string; // date string in UTC format
status: CachedDataSourceStatus;
}

export interface AccelerationsCacheData {
version: string;
dataSources: CachedAcclerationByDataSource[];
}

export interface PollingSuccessResult {
schema: Array<{ name: string; type: string }>;
datarows: Array<Array<string | number | boolean>>;
Expand Down
181 changes: 147 additions & 34 deletions public/framework/catalog_cache/cache_loader.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import {
mockShowTablesPollingResult,
} from '../../../test/datasources';
import {
createLoadQuery,
updateAccelerationsToCache,
updateDatabasesToCache,
updateTablesToCache,
updateToCache,
} from './cache_loader';
import { CatalogCacheManager } from './cache_manager';

Expand Down Expand Up @@ -146,10 +148,7 @@ describe('loadCacheTests', () => {
dataSourceName,
expect.objectContaining({
name: databaseName,
tables: [
{ name: 'Table1', columns: [] },
{ name: 'Table2', columns: [] },
],
tables: [{ name: 'Table1' }, { name: 'Table2' }],
lastUpdated: expect.any(String),
status: CachedDataSourceStatus.Updated,
})
Expand All @@ -166,55 +165,169 @@ describe('loadCacheTests', () => {
it('should save empty accelerations cache and status failed when polling result is null', () => {
const pollingResult = null;

updateAccelerationsToCache(pollingResult);
updateAccelerationsToCache('sampleDS', pollingResult);

// Verify that saveAccelerationsCache is called with the correct parameters
expect(CatalogCacheManager.saveAccelerationsCache).toHaveBeenCalledWith({
version: CATALOG_CACHE_VERSION,
accelerations: [],
lastUpdated: expect.any(String),
status: CachedDataSourceStatus.Failed,
dataSources: [
{
name: 'sampleDS',
accelerations: [],
lastUpdated: expect.any(String),
status: CachedDataSourceStatus.Failed,
},
],
});
});

it('should save new accelerations cache when polling result is not null', () => {
updateAccelerationsToCache(mockShowIndexesPollingResult);
updateAccelerationsToCache('sampleDS', mockShowIndexesPollingResult);

// Verify that saveAccelerationsCache is called with the correct parameters
expect(CatalogCacheManager.saveAccelerationsCache).toHaveBeenCalledWith({
version: CATALOG_CACHE_VERSION,
accelerations: [
{
flintIndexName: 'flint_mys3_default_http_logs_skipping_index',
type: 'skipping',
database: 'default',
table: 'http_logs',
indexName: 'skipping_index',
autoRefresh: false,
status: 'Active',
},
dataSources: [
{
flintIndexName: 'flint_mys3_default_http_logs_status_clientip_and_day_index',
type: 'covering',
database: 'default',
table: 'http_logs',
indexName: 'status_clientip_and_day',
autoRefresh: true,
status: 'Active',
name: 'sampleDS',
accelerations: [
{
flintIndexName: 'flint_mys3_default_http_logs_skipping_index',
type: 'skipping',
database: 'default',
table: 'http_logs',
indexName: 'skipping_index',
autoRefresh: false,
status: 'Active',
},
{
flintIndexName: 'flint_mys3_default_http_logs_status_clientip_and_day_index',
type: 'covering',
database: 'default',
table: 'http_logs',
indexName: 'status_clientip_and_day',
autoRefresh: true,
status: 'Active',
},
{
flintIndexName: 'flint_mys3_default_http_count_view',
type: 'materialized',
database: 'default',
table: '',
indexName: 'http_count_view',
autoRefresh: true,
status: 'Active',
},
],
lastUpdated: expect.any(String),
status: CachedDataSourceStatus.Updated,
},
],
});
});
});

describe('updateToCache', () => {
it('should call updateDatabasesToCache when loadCacheType is "databases"', () => {
const loadCacheType = 'databases';
const dataSourceName = 'TestDataSource';

updateToCache(mockShowDatabasesPollingResult, loadCacheType, dataSourceName);

// Verify that addOrUpdateDataSource is called
expect(CatalogCacheManager.addOrUpdateDataSource).toHaveBeenCalled();
expect(CatalogCacheManager.updateDatabase).not.toHaveBeenCalled();
expect(CatalogCacheManager.saveAccelerationsCache).not.toHaveBeenCalled();
});

it('should call updateTablesToCache when loadCacheType is "tables"', () => {
const loadCacheType = 'tables';
const dataSourceName = 'TestDataSource';
const databaseName = 'TestDatabase';

CatalogCacheManager.addOrUpdateDataSource({
databases: [
{
flintIndexName: 'flint_mys3_default_http_count_view',
type: 'materialized',
database: 'default',
table: '',
indexName: 'http_count_view',
autoRefresh: true,
status: 'Active',
name: databaseName,
lastUpdated: '',
status: CachedDataSourceStatus.Empty,
tables: [],
},
],
lastUpdated: expect.any(String),
name: dataSourceName,
lastUpdated: new Date().toUTCString(),
status: CachedDataSourceStatus.Updated,
});

updateToCache(mockShowTablesPollingResult, loadCacheType, dataSourceName, databaseName);

// Verify that updateDatabase is called
expect(CatalogCacheManager.addOrUpdateDataSource).toHaveBeenCalled();
expect(CatalogCacheManager.updateDatabase).toHaveBeenCalled();
expect(CatalogCacheManager.saveAccelerationsCache).not.toHaveBeenCalled();
});

it('should call updateAccelerationsToCache when loadCacheType is "accelerations"', () => {
const loadCacheType = 'accelerations';
const dataSourceName = 'TestDataSource';

updateToCache(mockShowIndexesPollingResult, loadCacheType, dataSourceName);

// Verify that saveAccelerationsCache is called
expect(CatalogCacheManager.addOrUpdateDataSource).not.toHaveBeenCalled();
expect(CatalogCacheManager.updateDatabase).not.toHaveBeenCalled();
expect(CatalogCacheManager.saveAccelerationsCache).toHaveBeenCalled();
});

it('should not call any update function when loadCacheType is not recognized', () => {
const pollResults = {};
const loadCacheType = '';
const dataSourceName = 'TestDataSource';

updateToCache(pollResults, loadCacheType, dataSourceName);

// Verify that no update function is called
expect(CatalogCacheManager.addOrUpdateDataSource).not.toHaveBeenCalled();
expect(CatalogCacheManager.updateDatabase).not.toHaveBeenCalled();
expect(CatalogCacheManager.saveAccelerationsCache).not.toHaveBeenCalled();
});
});

describe('createLoadQuery', () => {
it('should create a query for loading databases', () => {
const loadCacheType = 'databases';
const dataSourceName = 'example';
const expectedQuery = 'SHOW SCHEMAS IN `example`';
expect(createLoadQuery(loadCacheType, dataSourceName)).toEqual(expectedQuery);
});

it('should create a query for loading tables', () => {
const loadCacheType = 'tables';
const dataSourceName = 'example';
const databaseName = 'test';
const expectedQuery = 'SHOW TABLES IN `example`.`test`';
expect(createLoadQuery(loadCacheType, dataSourceName, databaseName)).toEqual(expectedQuery);
});

it('should create a query for loading accelerations', () => {
const loadCacheType = 'accelerations';
const dataSourceName = 'example';
const expectedQuery = 'SHOW FLINT INDEX in `example`';
expect(createLoadQuery(loadCacheType, dataSourceName)).toEqual(expectedQuery);
});

it('should return an empty string for unknown loadCacheType', () => {
const loadCacheType = 'unknownType';
const dataSourceName = 'example';
expect(createLoadQuery(loadCacheType, dataSourceName)).toEqual('');
});

it('should properly handle backticks in database name', () => {
const loadCacheType = 'tables';
const dataSourceName = 'example';
const databaseName = '`sample`';
const expectedQuery = 'SHOW TABLES IN `example`.`sample`';
expect(createLoadQuery(loadCacheType, dataSourceName, databaseName)).toEqual(expectedQuery);
});
});
});
24 changes: 12 additions & 12 deletions public/framework/catalog_cache/cache_loader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
*/

import { useEffect, useState } from 'react';
import {
ASYNC_POLLING_INTERVAL,
CATALOG_CACHE_VERSION,
} from '../../../common/constants/data_sources';
import { ASYNC_POLLING_INTERVAL } from '../../../common/constants/data_sources';
import {
AsyncPollingResult,
CachedDataSourceStatus,
Expand Down Expand Up @@ -80,7 +77,6 @@ export const updateTablesToCache = (
const combinedData = combineSchemaAndDatarows(pollingResult.schema, pollingResult.datarows);
const newTables = combinedData.map((row: any) => ({
name: row.tableName,
columns: [],
}));

CatalogCacheManager.updateDatabase(dataSourceName, {
Expand All @@ -91,12 +87,15 @@ export const updateTablesToCache = (
});
};

export const updateAccelerationsToCache = (pollingResult: AsyncPollingResult) => {
export const updateAccelerationsToCache = (
dataSourceName: string,
pollingResult: AsyncPollingResult
) => {
const currentTime = new Date().toUTCString();

if (!pollingResult) {
CatalogCacheManager.saveAccelerationsCache({
version: CATALOG_CACHE_VERSION,
CatalogCacheManager.addOrUpdateAccelerationsByDataSource({
name: dataSourceName,
accelerations: [],
lastUpdated: currentTime,
status: CachedDataSourceStatus.Failed,
Expand All @@ -116,8 +115,8 @@ export const updateAccelerationsToCache = (pollingResult: AsyncPollingResult) =>
status: row.status,
}));

CatalogCacheManager.saveAccelerationsCache({
version: CATALOG_CACHE_VERSION,
CatalogCacheManager.addOrUpdateAccelerationsByDataSource({
name: dataSourceName,
accelerations: newAccelerations,
lastUpdated: currentTime,
status: CachedDataSourceStatus.Updated,
Expand All @@ -138,7 +137,7 @@ export const updateToCache = (
updateTablesToCache(dataSourceName, databaseName!, pollResults);
break;
case 'accelerations':
updateAccelerationsToCache(pollResults);
updateAccelerationsToCache(dataSourceName, pollResults);
break;
default:
break;
Expand Down Expand Up @@ -189,6 +188,7 @@ export const useLoadToCache = (loadCacheType: LoadCacheType) => {
}, ASYNC_POLLING_INTERVAL);

const startLoading = (dataSourceName: string, databaseName?: string) => {
setLoadStatus(DirectQueryLoadingStatus.SCHEDULED);
setCurrentDataSourceName(dataSourceName);
setCurrentDatabaseName(databaseName);

Expand Down Expand Up @@ -272,7 +272,7 @@ export const useLoadTablesToCache = () => {
return { loadStatus, startLoading, stopLoading };
};

export const useAccelerationsToCache = () => {
export const useLoadAccelerationsToCache = () => {
const { loadStatus, startLoading, stopLoading } = useLoadToCache('accelerations');
return { loadStatus, startLoading, stopLoading };
};
Loading

0 comments on commit b0f0d9b

Please sign in to comment.