Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional force argument to refresh_continuous_aggregate #7521

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_7521
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7521 Add optional `force` argument to `refresh_continuous_aggregate`
3 changes: 2 additions & 1 deletion sql/ddl_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ AS '@MODULE_PATHNAME@', 'ts_tablespace_show' LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any"
window_end "any",
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

11 changes: 11 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,14 @@ CREATE FUNCTION @[email protected]_columnstore_stats (hypertable REGCLASS)
STABLE STRICT
AS 'SELECT * FROM @[email protected]_compression_stats($1)'
SET search_path TO pg_catalog, pg_temp;

-- Recreate `refresh_continuous_aggregate` procedure to add `force` argument
DROP PROCEDURE IF EXISTS @[email protected]_continuous_aggregate (continuous_aggregate REGCLASS, window_start "any", window_end "any");

CREATE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any",
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';
mkindahl marked this conversation as resolved.
Show resolved Hide resolved

9 changes: 9 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,12 @@ DROP VIEW timescaledb_information.hypertable_columnstore_settings;
DROP VIEW timescaledb_information.chunk_columnstore_settings;

DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_update_watermark(INTEGER);

-- Recreate `refresh_continuous_aggregate` procedure to remove the `force` argument
DROP PROCEDURE IF EXISTS @[email protected]_continuous_aggregate (continuous_aggregate REGCLASS, window_start "any", window_end "any", force BOOLEAN);

CREATE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
&policy_data.refresh_window,
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null);
policy_data.end_is_null,
false);

return true;
}
Expand Down
7 changes: 6 additions & 1 deletion tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,12 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
refresh_window.start = cagg_get_time_min(cagg);
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);

continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION, true, true);
continuous_agg_refresh_internal(cagg,
&refresh_window,
CAGG_REFRESH_CREATION,
true,
true,
false);
}

return DDL_DONE;
Expand Down
23 changes: 19 additions & 4 deletions tsl/src/continuous_aggs/invalidation.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ static Invalidation cut_cagg_invalidation_and_compute_remainder(
const CaggInvalidationState *state, const InternalTimeRange *refresh_window,
const Invalidation *mergedentry, const Invalidation *current_remainder);
static void clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window);
const InternalTimeRange *refresh_window,
bool force);
static void invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg,
Oid dimtype, const CaggsInfo *all_caggs);
static void invalidation_state_cleanup(const CaggInvalidationState *state);
Expand Down Expand Up @@ -878,7 +879,7 @@ cut_cagg_invalidation_and_compute_remainder(const CaggInvalidationState *state,
*/
static void
clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window)
const InternalTimeRange *refresh_window, bool force)
{
ScanIterator iterator;
int32 cagg_hyper_id = state->mat_hypertable_id;
Expand All @@ -892,6 +893,20 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,

MemoryContextReset(state->per_tuple_mctx);

/* Force refresh within the entire window */
if (force)
{
Invalidation logentry;

logentry.hyper_id = cagg_hyper_id;
logentry.lowest_modified_value = refresh_window->start;
logentry.greatest_modified_value = refresh_window->end;
logentry.is_modified = false;
ItemPointerSet(&logentry.tid, InvalidBlockNumber, 0);

save_invalidation_for_refresh(state, &logentry);
}

/* Process all invalidations for the continuous aggregate */
ts_scanner_foreach(&iterator)
{
Expand Down Expand Up @@ -981,7 +996,7 @@ InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx)
const CaggRefreshCallContext callctx, bool force)
{
CaggInvalidationState state;
InvalidationStore *store = NULL;
Expand All @@ -991,7 +1006,7 @@ invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange

invalidation_state_init(&state, cagg, refresh_window->type, all_caggs_info);
state.invalidations = tuplestore_begin_heap(false, false, work_mem);
clear_cagg_invalidations_for_refresh(&state, refresh_window);
clear_cagg_invalidations_for_refresh(&state, refresh_window, force);
count = tuplestore_tuple_count(state.invalidations);

if (count == 0)
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/invalidation.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ extern InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx);
const CaggRefreshCallContext callctx, bool force);

extern void invalidation_store_free(InvalidationStore *store);
20 changes: 14 additions & 6 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static void emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshC
static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
int32 chunk_id);
int32 chunk_id, bool force);
static void fill_bucket_offset_origin(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
NullableDatum *offset, NullableDatum *origin);
Expand Down Expand Up @@ -628,6 +628,7 @@ Datum
continuous_agg_refresh(PG_FUNCTION_ARGS)
{
Oid cagg_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool force = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
ContinuousAgg *cagg;
InternalTimeRange refresh_window = {
.type = InvalidOid,
Expand Down Expand Up @@ -659,7 +660,8 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
&refresh_window,
CAGG_REFRESH_WINDOW,
PG_ARGISNULL(1),
PG_ARGISNULL(2));
PG_ARGISNULL(2),
force);

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -703,7 +705,8 @@ continuous_agg_calculate_merged_refresh_window(const ContinuousAgg *cagg,
static bool
process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx, int32 chunk_id)
const CaggRefreshCallContext callctx, int32 chunk_id,
bool force)
{
InvalidationStore *invalidations;
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id, false);
Expand All @@ -727,7 +730,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
ts_guc_cagg_max_individual_materializations,
&do_merged_refresh,
&merged_refresh_window,
callctx);
callctx,
force);

if (invalidations != NULL || do_merged_refresh)
{
Expand Down Expand Up @@ -759,7 +763,7 @@ void
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg,
const CaggRefreshCallContext callctx, const bool start_isnull,
const bool end_isnull)
const bool end_isnull, bool force)
{
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window = *refresh_window_arg;
Expand Down Expand Up @@ -881,7 +885,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);

if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID))
if (!process_cagg_invalidations_and_refresh(cagg,
&refresh_window,
callctx,
INVALID_CHUNK_ID,
force))
emit_up_to_date_notice(cagg, callctx);

/* Restore search_path */
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/refresh.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ extern void continuous_agg_calculate_merged_refresh_window(
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
const bool start_isnull, const bool end_isnull);
const bool start_isnull, const bool end_isnull,
bool force);
Loading
Loading