Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(schema-compiler): fix FILTER_PARAMS flow in pre aggregations filtering #8761

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -2866,8 +2866,11 @@ export class BaseQuery {

newSubQueryForCube(cube, options) {
if (this.options.queryFactory) {
options.paramAllocator = null;
return this.options.queryFactory.createQuery(cube, this.compilers, this.subQueryOptions(options));
// When dealing with rollup joins, it's crucial to use the correct parameter allocator for the specific cube in use.
// By default, we'll use BaseQuery, but it's important to note that different databases (Oracle, PostgreSQL, MySQL, Druid, etc.)
// have unique parameter allocator symbols. Using the wrong allocator can break the query, especially when rollup joins involve
// different cubes that require different allocators.
return this.options.queryFactory.createQuery(cube, this.compilers, { ...this.subQueryOptions(options), paramAllocator: null });
}

return this.newSubQuery(options);
Expand Down Expand Up @@ -3522,7 +3525,7 @@ export class BaseQuery {
if (preAggregation.refreshKey) {
if (preAggregation.refreshKey.sql) {
return [
this.paramAllocator.buildSqlAndParams(
preAggregationQueryForSql.paramAllocator.buildSqlAndParams(
preAggregationQueryForSql.evaluateSql(cube, preAggregation.refreshKey.sql)
).concat({
external: false,
Expand Down
106 changes: 96 additions & 10 deletions packages/cubejs-schema-compiler/test/unit/pre-aggregations.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { prepareCompiler, prepareYamlCompiler } from './PrepareCompiler';
import { createECommerceSchema, createSchemaYaml } from './utils';
import { PostgresQuery } from '../../src';
import { PostgresQuery, queryClass, QueryFactory } from '../../src';

describe('pre-aggregations', () => {
it('rollupJoin scheduledRefresh', async () => {
Expand All @@ -9,36 +9,36 @@ describe('pre-aggregations', () => {
`
cube(\`Users\`, {
sql: \`SELECT * FROM public.users\`,

preAggregations: {
usersRollup: {
dimensions: [CUBE.id],
},
},

measures: {
count: {
type: \`count\`,
},
},

dimensions: {
id: {
sql: \`id\`,
type: \`string\`,
primaryKey: true,
},

name: {
sql: \`name\`,
type: \`string\`,
},
},
});

cube('Orders', {
sql: \`SELECT * FROM orders\`,

preAggregations: {
ordersRollup: {
measures: [CUBE.count],
Expand All @@ -52,20 +52,20 @@ describe('pre-aggregations', () => {
rollups: [Users.usersRollup, CUBE.ordersRollup],
},
},

joins: {
Users: {
relationship: \`belongsTo\`,
sql: \`\${CUBE.userId} = \${Users.id}\`,
},
},

measures: {
count: {
type: \`count\`,
},
},

dimensions: {
id: {
sql: \`id\`,
Expand Down Expand Up @@ -232,4 +232,90 @@ describe('pre-aggregations', () => {
expect(indexesSql[0].indexName).toEqual('orders_indexes_orders_by_day_with_day_by_status_regular_index');
expect(indexesSql[1].indexName).toEqual('orders_indexes_orders_by_day_with_day_by_status_agg_index');
});

it('pre-aggregation with FILTER_PARAMS', async () => {
const { compiler, cubeEvaluator, joinGraph } = prepareYamlCompiler(
createSchemaYaml({
cubes: [
{
name: 'orders',
sql_table: 'orders',
measures: [{
name: 'count',
type: 'count',
}],
dimensions: [
{
name: 'created_at',
sql: 'created_at',
type: 'time',
},
{
name: 'updated_at',
sql: '{created_at}',
type: 'time',
},
{
name: 'status',
sql: 'status',
type: 'string',
}
],
preAggregations: [
{
name: 'orders_by_day_with_day',
measures: ['count'],
dimensions: ['status'],
timeDimension: 'CUBE.created_at',
granularity: 'day',
partition_granularity: 'month',
build_range_start: {
sql: 'SELECT \'2022-01-01\'::timestamp',
},
build_range_end: {
sql: 'SELECT \'2024-01-01\'::timestamp'
},
refresh_key: {
every: '4 hours',
sql: `
SELECT max(created_at) as max_created_at
FROM orders
WHERE {FILTER_PARAMS.orders.created_at.filter('date(created_at)')}`,
},
},
]
}
]
})
);

await compiler.compile();

// It's important to provide a queryFactory, as it triggers flow
// with paramAllocator reset in BaseQuery->newSubQueryForCube()
const queryFactory = new QueryFactory(
{
orders: PostgresQuery
}
);

const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
measures: [
'orders.count'
],
timeDimensions: [{
dimension: 'orders.created_at',
granularity: 'day',
dateRange: ['2023-01-01', '2023-01-10']
}],
dimensions: ['orders.status'],
queryFactory
});

const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
expect(preAggregationsDescription[0].loadSql[0].includes('WHERE ("orders".created_at >= $1::timestamptz AND "orders".created_at <= $2::timestamptz)')).toBeTruthy();
expect(preAggregationsDescription[0].loadSql[1]).toEqual(['__FROM_PARTITION_RANGE', '__TO_PARTITION_RANGE']);
expect(preAggregationsDescription[0].invalidateKeyQueries[0][0].includes('WHERE ((date(created_at) >= $1::timestamptz AND date(created_at) <= $2::timestamptz))')).toBeTruthy();
expect(preAggregationsDescription[0].invalidateKeyQueries[0][1]).toEqual(['__FROM_PARTITION_RANGE', '__TO_PARTITION_RANGE']);
});
});
Loading