Skip to content

Commit

Permalink
Add optional force argument to refresh_continuous_aggregate
Browse files Browse the repository at this point in the history
Continuous aggregates that are built on top of tiered hypertables cannot
access and materialize tiered data when `timescaledb.enable_tiered_reads`
GUC is disabled at the server level. And there is no way for a user to
manually force the refresh with tiered reads enabled. Here we add an
optional `force` parameter to the `refresh_continuous_aggregate`
procedure that would allow user to partially re-materialize cagg within
a time window that has been previously materialized.
  • Loading branch information
zilder committed Jan 9, 2025
1 parent a03b43a commit e977ffe
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 76 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7521
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7521 Add optional `force` argument to `refresh_continuous_aggregate`
3 changes: 2 additions & 1 deletion sql/ddl_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ AS '@MODULE_PATHNAME@', 'ts_tablespace_show' LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any"
window_end "any",
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

11 changes: 11 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,14 @@ CREATE FUNCTION @[email protected]_columnstore_stats (hypertable REGCLASS)
STABLE STRICT
AS 'SELECT * FROM @[email protected]_compression_stats($1)'
SET search_path TO pg_catalog, pg_temp;

-- Recreate `refresh_continuous_aggregate` procedure to add `force` argument
DROP PROCEDURE IF EXISTS @[email protected]_continuous_aggregate (continuous_aggregate REGCLASS, window_start "any", window_end "any");

CREATE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any",
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';

9 changes: 9 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,12 @@ DROP VIEW timescaledb_information.hypertable_columnstore_settings;
DROP VIEW timescaledb_information.chunk_columnstore_settings;

DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_update_watermark(INTEGER);

-- Recreate `refresh_continuous_aggregate` procedure to remove the `force` argument
DROP PROCEDURE IF EXISTS @[email protected]_continuous_aggregate (continuous_aggregate REGCLASS, window_start "any", window_end "any", force BOOLEAN);

CREATE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
&policy_data.refresh_window,
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null);
policy_data.end_is_null,
false);

return true;
}
Expand Down
7 changes: 6 additions & 1 deletion tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,12 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
refresh_window.start = cagg_get_time_min(cagg);
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);

continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION, true, true);
continuous_agg_refresh_internal(cagg,
&refresh_window,
CAGG_REFRESH_CREATION,
true,
true,
false);
}

return DDL_DONE;
Expand Down
23 changes: 19 additions & 4 deletions tsl/src/continuous_aggs/invalidation.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ static Invalidation cut_cagg_invalidation_and_compute_remainder(
const CaggInvalidationState *state, const InternalTimeRange *refresh_window,
const Invalidation *mergedentry, const Invalidation *current_remainder);
static void clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window);
const InternalTimeRange *refresh_window,
const bool force);
static void invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg,
Oid dimtype, const CaggsInfo *all_caggs);
static void invalidation_state_cleanup(const CaggInvalidationState *state);
Expand Down Expand Up @@ -878,7 +879,7 @@ cut_cagg_invalidation_and_compute_remainder(const CaggInvalidationState *state,
*/
static void
clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window)
const InternalTimeRange *refresh_window, const bool force)
{
ScanIterator iterator;
int32 cagg_hyper_id = state->mat_hypertable_id;
Expand All @@ -892,6 +893,20 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,

MemoryContextReset(state->per_tuple_mctx);

/* Force refresh within the entire window */
if (force)
{
Invalidation logentry;

logentry.hyper_id = cagg_hyper_id;
logentry.lowest_modified_value = refresh_window->start;
logentry.greatest_modified_value = refresh_window->end;
logentry.is_modified = false;
ItemPointerSet(&logentry.tid, InvalidBlockNumber, 0);

save_invalidation_for_refresh(state, &logentry);
}

/* Process all invalidations for the continuous aggregate */
ts_scanner_foreach(&iterator)
{
Expand Down Expand Up @@ -981,7 +996,7 @@ InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx)
const CaggRefreshCallContext callctx, const bool force)
{
CaggInvalidationState state;
InvalidationStore *store = NULL;
Expand All @@ -991,7 +1006,7 @@ invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange

invalidation_state_init(&state, cagg, refresh_window->type, all_caggs_info);
state.invalidations = tuplestore_begin_heap(false, false, work_mem);
clear_cagg_invalidations_for_refresh(&state, refresh_window);
clear_cagg_invalidations_for_refresh(&state, refresh_window, force);
count = tuplestore_tuple_count(state.invalidations);

if (count == 0)
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/invalidation.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ extern InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx);
const CaggRefreshCallContext callctx, const bool force);

extern void invalidation_store_free(InvalidationStore *store);
20 changes: 14 additions & 6 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static void emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshC
static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
int32 chunk_id);
int32 chunk_id, const bool force);
static void fill_bucket_offset_origin(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
NullableDatum *offset, NullableDatum *origin);
Expand Down Expand Up @@ -628,6 +628,7 @@ Datum
continuous_agg_refresh(PG_FUNCTION_ARGS)
{
Oid cagg_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool force = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
ContinuousAgg *cagg;
InternalTimeRange refresh_window = {
.type = InvalidOid,
Expand Down Expand Up @@ -659,7 +660,8 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
&refresh_window,
CAGG_REFRESH_WINDOW,
PG_ARGISNULL(1),
PG_ARGISNULL(2));
PG_ARGISNULL(2),
force);

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -703,7 +705,8 @@ continuous_agg_calculate_merged_refresh_window(const ContinuousAgg *cagg,
static bool
process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx, int32 chunk_id)
const CaggRefreshCallContext callctx, int32 chunk_id,
const bool force)
{
InvalidationStore *invalidations;
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id, false);
Expand All @@ -727,7 +730,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
ts_guc_cagg_max_individual_materializations,
&do_merged_refresh,
&merged_refresh_window,
callctx);
callctx,
force);

if (invalidations != NULL || do_merged_refresh)
{
Expand Down Expand Up @@ -759,7 +763,7 @@ void
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg,
const CaggRefreshCallContext callctx, const bool start_isnull,
const bool end_isnull)
const bool end_isnull, const bool force)
{
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window = *refresh_window_arg;
Expand Down Expand Up @@ -881,7 +885,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);

if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID))
if (!process_cagg_invalidations_and_refresh(cagg,
&refresh_window,
callctx,
INVALID_CHUNK_ID,
force))
emit_up_to_date_notice(cagg, callctx);

/* Restore search_path */
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/refresh.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ extern void continuous_agg_calculate_merged_refresh_window(
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
const bool start_isnull, const bool end_isnull);
const bool start_isnull, const bool end_isnull,
const bool force);
51 changes: 46 additions & 5 deletions tsl/test/expected/cagg_refresh.out
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,47 @@ psql:include/cagg_refresh_common.sql:105: ERROR: invalid time argument type "te
CALL refresh_continuous_aggregate('daily_temp', 0, '2020-05-01');
psql:include/cagg_refresh_common.sql:106: ERROR: invalid time argument type "integer"
\set ON_ERROR_STOP 1
-- Test forceful refreshment. Here we simulate the situation that we've seen
-- with tiered data when `timescaledb.enable_tiered_reads` were disabled on the
-- server level. In that case we would not see materialized tiered data and
-- we wouldn't be able to re-materialize the data using a normal refresh call
-- because it would skip previously materialized ranges, but it should be
-- possible with `force=>true` parameter. To simulate this use-case we clear
-- the materialization hypertable and forefully re-materialize it.
SELECT ht.schema_name || '.' || ht.table_name AS mat_ht, mat_hypertable_id FROM _timescaledb_catalog.continuous_agg cagg
JOIN _timescaledb_catalog.hypertable ht ON cagg.mat_hypertable_id = ht.id
WHERE user_view_name = 'daily_temp' \gset
-- Delete the data from the materialization hypertable
DELETE FROM :mat_ht;
-- Run regular refresh, it should not touch previously materialized range
CALL refresh_continuous_aggregate('daily_temp', '2020-05-02', '2020-05-05 17:00');
psql:include/cagg_refresh_common.sql:124: NOTICE: continuous aggregate "daily_temp" is already up-to-date
SELECT * FROM daily_temp
ORDER BY day DESC, device;
day | device | avg_temp
-----+--------+----------
(0 rows)

-- Run it again with force=>true, the data should be rematerialized
CALL refresh_continuous_aggregate('daily_temp', '2020-05-02', '2020-05-05 17:00', force=>true);
SELECT * FROM daily_temp
ORDER BY day DESC, device;
day | device | avg_temp
------------------------------+--------+------------------
Mon May 04 17:00:00 2020 PDT | 0 | 19.3846153846154
Mon May 04 17:00:00 2020 PDT | 1 | 16.5555555555556
Mon May 04 17:00:00 2020 PDT | 2 | 18.5714285714286
Mon May 04 17:00:00 2020 PDT | 3 | 23.5714285714286
Sun May 03 17:00:00 2020 PDT | 0 | 15.7647058823529
Sun May 03 17:00:00 2020 PDT | 1 | 24.3142857142857
Sun May 03 17:00:00 2020 PDT | 2 | 14.8205128205128
Sun May 03 17:00:00 2020 PDT | 3 | 18.1111111111111
Sat May 02 17:00:00 2020 PDT | 0 | 17
Sat May 02 17:00:00 2020 PDT | 1 | 18.75
Sat May 02 17:00:00 2020 PDT | 2 | 20
Sat May 02 17:00:00 2020 PDT | 3 | 21.5217391304348
(12 rows)

-- Test different time types
CREATE TABLE conditions_date (time date NOT NULL, device int, temp float);
SELECT create_hypertable('conditions_date', 'time');
Expand Down Expand Up @@ -268,7 +309,7 @@ AS
SELECT time_bucket(SMALLINT '20', time) AS bucket, device, avg(temp) AS avg_temp
FROM conditions_smallint c
GROUP BY 1,2 WITH NO DATA;
psql:include/cagg_refresh_common.sql:150: ERROR: custom time function required on hypertable "conditions_smallint"
psql:include/cagg_refresh_common.sql:173: ERROR: custom time function required on hypertable "conditions_smallint"
\set ON_ERROR_STOP 1
SELECT set_integer_now_func('conditions_smallint', 'smallint_now');
set_integer_now_func
Expand Down Expand Up @@ -423,7 +464,7 @@ AS
SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH DATA;
psql:include/cagg_refresh_common.sql:255: NOTICE: refreshing continuous aggregate "weekly_temp_with_data"
psql:include/cagg_refresh_common.sql:278: NOTICE: refreshing continuous aggregate "weekly_temp_with_data"
SELECT * FROM weekly_temp_without_data;
day | device | avg_temp
-----+--------+----------
Expand All @@ -445,7 +486,7 @@ SELECT * FROM weekly_temp_with_data ORDER BY 1,2;
\set ON_ERROR_STOP 0
-- REFRESH MATERIALIZED VIEW is blocked on continuous aggregates
REFRESH MATERIALIZED VIEW weekly_temp_without_data;
psql:include/cagg_refresh_common.sql:262: ERROR: operation not supported on continuous aggregate
psql:include/cagg_refresh_common.sql:285: ERROR: operation not supported on continuous aggregate
-- These should fail since we do not allow refreshing inside a
-- transaction, not even as part of CREATE MATERIALIZED VIEW.
DO LANGUAGE PLPGSQL $$ BEGIN
Expand All @@ -457,7 +498,7 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH DATA;
END $$;
psql:include/cagg_refresh_common.sql:274: ERROR: CREATE MATERIALIZED VIEW ... WITH DATA cannot be executed from a function
psql:include/cagg_refresh_common.sql:297: ERROR: CREATE MATERIALIZED VIEW ... WITH DATA cannot be executed from a function
BEGIN;
CREATE MATERIALIZED VIEW weekly_conditions
WITH (timescaledb.continuous,
Expand All @@ -466,7 +507,7 @@ AS
SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH DATA;
psql:include/cagg_refresh_common.sql:283: ERROR: CREATE MATERIALIZED VIEW ... WITH DATA cannot run inside a transaction block
psql:include/cagg_refresh_common.sql:306: ERROR: CREATE MATERIALIZED VIEW ... WITH DATA cannot run inside a transaction block
COMMIT;
\set ON_ERROR_STOP 1
-- This should not fail since we do not refresh the continuous
Expand Down
51 changes: 46 additions & 5 deletions tsl/test/expected/cagg_refresh_using_merge.out
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,47 @@ psql:include/cagg_refresh_common.sql:105: ERROR: invalid time argument type "te
CALL refresh_continuous_aggregate('daily_temp', 0, '2020-05-01');
psql:include/cagg_refresh_common.sql:106: ERROR: invalid time argument type "integer"
\set ON_ERROR_STOP 1
-- Test forceful refreshment. Here we simulate the situation that we've seen
-- with tiered data when `timescaledb.enable_tiered_reads` were disabled on the
-- server level. In that case we would not see materialized tiered data and
-- we wouldn't be able to re-materialize the data using a normal refresh call
-- because it would skip previously materialized ranges, but it should be
-- possible with `force=>true` parameter. To simulate this use-case we clear
-- the materialization hypertable and forefully re-materialize it.
SELECT ht.schema_name || '.' || ht.table_name AS mat_ht, mat_hypertable_id FROM _timescaledb_catalog.continuous_agg cagg
JOIN _timescaledb_catalog.hypertable ht ON cagg.mat_hypertable_id = ht.id
WHERE user_view_name = 'daily_temp' \gset
-- Delete the data from the materialization hypertable
DELETE FROM :mat_ht;
-- Run regular refresh, it should not touch previously materialized range
CALL refresh_continuous_aggregate('daily_temp', '2020-05-02', '2020-05-05 17:00');
psql:include/cagg_refresh_common.sql:124: NOTICE: continuous aggregate "daily_temp" is already up-to-date
SELECT * FROM daily_temp
ORDER BY day DESC, device;
day | device | avg_temp
-----+--------+----------
(0 rows)

-- Run it again with force=>true, the data should be rematerialized
CALL refresh_continuous_aggregate('daily_temp', '2020-05-02', '2020-05-05 17:00', force=>true);
SELECT * FROM daily_temp
ORDER BY day DESC, device;
day | device | avg_temp
------------------------------+--------+------------------
Mon May 04 17:00:00 2020 PDT | 0 | 19.3846153846154
Mon May 04 17:00:00 2020 PDT | 1 | 16.5555555555556
Mon May 04 17:00:00 2020 PDT | 2 | 18.5714285714286
Mon May 04 17:00:00 2020 PDT | 3 | 23.5714285714286
Sun May 03 17:00:00 2020 PDT | 0 | 15.7647058823529
Sun May 03 17:00:00 2020 PDT | 1 | 24.3142857142857
Sun May 03 17:00:00 2020 PDT | 2 | 14.8205128205128
Sun May 03 17:00:00 2020 PDT | 3 | 18.1111111111111
Sat May 02 17:00:00 2020 PDT | 0 | 17
Sat May 02 17:00:00 2020 PDT | 1 | 18.75
Sat May 02 17:00:00 2020 PDT | 2 | 20
Sat May 02 17:00:00 2020 PDT | 3 | 21.5217391304348
(12 rows)

-- Test different time types
CREATE TABLE conditions_date (time date NOT NULL, device int, temp float);
SELECT create_hypertable('conditions_date', 'time');
Expand Down Expand Up @@ -269,7 +310,7 @@ AS
SELECT time_bucket(SMALLINT '20', time) AS bucket, device, avg(temp) AS avg_temp
FROM conditions_smallint c
GROUP BY 1,2 WITH NO DATA;
psql:include/cagg_refresh_common.sql:150: ERROR: custom time function required on hypertable "conditions_smallint"
psql:include/cagg_refresh_common.sql:173: ERROR: custom time function required on hypertable "conditions_smallint"
\set ON_ERROR_STOP 1
SELECT set_integer_now_func('conditions_smallint', 'smallint_now');
set_integer_now_func
Expand Down Expand Up @@ -424,7 +465,7 @@ AS
SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH DATA;
psql:include/cagg_refresh_common.sql:255: NOTICE: refreshing continuous aggregate "weekly_temp_with_data"
psql:include/cagg_refresh_common.sql:278: NOTICE: refreshing continuous aggregate "weekly_temp_with_data"
SELECT * FROM weekly_temp_without_data;
day | device | avg_temp
-----+--------+----------
Expand All @@ -446,7 +487,7 @@ SELECT * FROM weekly_temp_with_data ORDER BY 1,2;
\set ON_ERROR_STOP 0
-- REFRESH MATERIALIZED VIEW is blocked on continuous aggregates
REFRESH MATERIALIZED VIEW weekly_temp_without_data;
psql:include/cagg_refresh_common.sql:262: ERROR: operation not supported on continuous aggregate
psql:include/cagg_refresh_common.sql:285: ERROR: operation not supported on continuous aggregate
-- These should fail since we do not allow refreshing inside a
-- transaction, not even as part of CREATE MATERIALIZED VIEW.
DO LANGUAGE PLPGSQL $$ BEGIN
Expand All @@ -458,7 +499,7 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH DATA;
END $$;
psql:include/cagg_refresh_common.sql:274: ERROR: CREATE MATERIALIZED VIEW ... WITH DATA cannot be executed from a function
psql:include/cagg_refresh_common.sql:297: ERROR: CREATE MATERIALIZED VIEW ... WITH DATA cannot be executed from a function
BEGIN;
CREATE MATERIALIZED VIEW weekly_conditions
WITH (timescaledb.continuous,
Expand All @@ -467,7 +508,7 @@ AS
SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2 WITH DATA;
psql:include/cagg_refresh_common.sql:283: ERROR: CREATE MATERIALIZED VIEW ... WITH DATA cannot run inside a transaction block
psql:include/cagg_refresh_common.sql:306: ERROR: CREATE MATERIALIZED VIEW ... WITH DATA cannot run inside a transaction block
COMMIT;
\set ON_ERROR_STOP 1
-- This should not fail since we do not refresh the continuous
Expand Down
Loading

0 comments on commit e977ffe

Please sign in to comment.