Skip to content

Commit

Permalink
Make set_chunk_time_interval CAGGs aware (#5852)
Browse files Browse the repository at this point in the history
This patch adds support to pass continuous aggregate names to the
`set_chunk_time_interval` function to align it with functions, such as
`show_chunks`, `drop_chunks`, and others.

It reuses the previously existing function to find a hypertable or
resolve a continuous aggregate to its underlying hypertable found in
chunk.c. It, however, moves the function to hypertable.c and exports it
from here. There is some discussion if this functionality should stay in
chunk.c, though, it feels wrong in that file now that it is exported.
  • Loading branch information
noctarius authored Jul 12, 2023
1 parent 88aaf23 commit 963d4ee
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 75 deletions.
1 change: 1 addition & 0 deletions .unreleased/PR_5852
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5852 Make set_chunk_time_interval CAGGs aware
76 changes: 2 additions & 74 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ static int chunk_cmp(const void *ch1, const void *ch2);
static int chunk_point_find_chunk_id(const Hypertable *ht, const Point *p);
static void init_scan_by_qualified_table_name(ScanIterator *iterator, const char *schema_name,
const char *table_name);
static Hypertable *find_hypertable_from_table_or_cagg(Cache *hcache, Oid relid, bool allow_matht);
static Chunk *get_chunks_in_time_range(Hypertable *ht, int64 older_than, int64 newer_than,
const char *caller_name, MemoryContext mctx,
uint64 *num_chunks_returned, ScanTupLock *tuplock);
Expand Down Expand Up @@ -2187,7 +2186,7 @@ ts_chunk_show_chunks(PG_FUNCTION_ARGS)
Oid time_type;

hcache = ts_hypertable_cache_pin();
ht = find_hypertable_from_table_or_cagg(hcache, relid, true);
ht = ts_resolve_hypertable_from_table_or_cagg(hcache, relid, true);
Assert(ht != NULL);
time_dim = hyperspace_get_open_dimension(ht->space, 0);

Expand Down Expand Up @@ -4083,77 +4082,6 @@ list_return_srf(FunctionCallInfo fcinfo)
SRF_RETURN_DONE(funcctx);
}

/*
* Find either the hypertable or the materialized hypertable, if the relid is
* a continuous aggregate, for the relid.
*
* If allow_matht is false, relid should be a cagg or a hypertable.
* If allow_matht is true, materialized hypertable is also permitted as relid
*/
static Hypertable *
find_hypertable_from_table_or_cagg(Cache *hcache, Oid relid, bool allow_matht)
{
const char *rel_name;
Hypertable *ht;

rel_name = get_rel_name(relid);

if (!rel_name)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("invalid hypertable or continuous aggregate")));

ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);

if (ht)
{
const ContinuousAggHypertableStatus status = ts_continuous_agg_hypertable_status(ht->fd.id);
switch (status)
{
case HypertableIsMaterialization:
case HypertableIsMaterializationAndRaw:
if (!allow_matht)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("operation not supported on materialized hypertable"),
errhint("Try the operation on the continuous aggregate instead."),
errdetail("Hypertable \"%s\" is a materialized hypertable.",
rel_name)));
}
break;

default:
break;
}
}
else
{
ContinuousAgg *const cagg = ts_continuous_agg_find_by_relid(relid);

if (!cagg)
ereport(ERROR,
(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
errmsg("\"%s\" is not a hypertable or a continuous aggregate", rel_name),
errhint("The operation is only possible on a hypertable or continuous"
" aggregate.")));

ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);

if (!ht)
ereport(ERROR,
(errcode(ERRCODE_TS_INTERNAL_ERROR),
errmsg("no materialized table for continuous aggregate"),
errdetail("Continuous aggregate \"%s\" had a materialized hypertable"
" with id %d but it was not found in the hypertable "
"catalog.",
rel_name,
cagg->data.mat_hypertable_id)));
}

return ht;
}

Datum
ts_chunk_drop_single_chunk(PG_FUNCTION_ARGS)
{
Expand Down Expand Up @@ -4224,7 +4152,7 @@ ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
* that does not refer to a hypertable or a continuous aggregate, or a
* relid that does not refer to anything at all. */
hcache = ts_hypertable_cache_pin();
ht = find_hypertable_from_table_or_cagg(hcache, relid, false);
ht = ts_resolve_hypertable_from_table_or_cagg(hcache, relid, false);
Assert(ht != NULL);
time_dim = hyperspace_get_open_dimension(ht->space, 0);

Expand Down
2 changes: 1 addition & 1 deletion src/dimension.c
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ ts_dimension_set_interval(PG_FUNCTION_ARGS)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("hypertable cannot be NULL")));

ht = ts_hypertable_cache_get_entry(hcache, table_relid, CACHE_FLAG_NONE);
ht = ts_resolve_hypertable_from_table_or_cagg(hcache, table_relid, true);
ts_hypertable_permissions_check(table_relid, GetUserId());

if (PG_ARGISNULL(1))
Expand Down
71 changes: 71 additions & 0 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,77 @@ ts_hypertable_from_tupleinfo(const TupleInfo *ti)
return h;
}

/*
* Find either the hypertable or the materialized hypertable, if the relid is
* a continuous aggregate, for the relid.
*
* If allow_matht is false, relid should be a cagg or a hypertable.
* If allow_matht is true, materialized hypertable is also permitted as relid
*/
Hypertable *
ts_resolve_hypertable_from_table_or_cagg(Cache *hcache, Oid relid, bool allow_matht)
{
const char *rel_name;
Hypertable *ht;

rel_name = get_rel_name(relid);

if (!rel_name)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("invalid hypertable or continuous aggregate")));

ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);

if (ht)
{
const ContinuousAggHypertableStatus status = ts_continuous_agg_hypertable_status(ht->fd.id);
switch (status)
{
case HypertableIsMaterialization:
case HypertableIsMaterializationAndRaw:
if (!allow_matht)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("operation not supported on materialized hypertable"),
errhint("Try the operation on the continuous aggregate instead."),
errdetail("Hypertable \"%s\" is a materialized hypertable.",
rel_name)));
}
break;

default:
break;
}
}
else
{
ContinuousAgg *const cagg = ts_continuous_agg_find_by_relid(relid);

if (!cagg)
ereport(ERROR,
(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
errmsg("\"%s\" is not a hypertable or a continuous aggregate", rel_name),
errhint("The operation is only possible on a hypertable or continuous"
" aggregate.")));

ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);

if (!ht)
ereport(ERROR,
(errcode(ERRCODE_TS_INTERNAL_ERROR),
errmsg("no materialized table for continuous aggregate"),
errdetail("Continuous aggregate \"%s\" had a materialized hypertable"
" with id %d but it was not found in the hypertable "
"catalog.",
rel_name,
cagg->data.mat_hypertable_id)));
}

return ht;
}

static ScanTupleResult
hypertable_tuple_get_relid(TupleInfo *ti, void *data)
{
Expand Down
3 changes: 3 additions & 0 deletions src/hypertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "chunk_adaptive.h"
#include "dimension.h"
#include "export.h"
#include "hypertable_cache.h"
#include "scanner.h"
#include "scan_iterator.h"
#include "ts_catalog/tablespace.h"
Expand Down Expand Up @@ -109,6 +110,8 @@ extern TSDLLEXPORT Oid ts_hypertable_permissions_check(Oid hypertable_oid, Oid u

extern TSDLLEXPORT void ts_hypertable_permissions_check_by_id(int32 hypertable_id);
extern Hypertable *ts_hypertable_from_tupleinfo(const TupleInfo *ti);
extern Hypertable *ts_resolve_hypertable_from_table_or_cagg(Cache *hcache, Oid relid,
bool allow_matht);
extern int ts_hypertable_scan_with_memory_context(const char *schema, const char *table,
tuple_found_func tuple_found, void *data,
LOCKMODE lockmode, bool tuplock,
Expand Down
26 changes: 26 additions & 0 deletions tsl/test/expected/cagg_ddl.out
Original file line number Diff line number Diff line change
Expand Up @@ -2013,3 +2013,29 @@ SELECT * FROM transactions_montly ORDER BY bucket;
Wed Oct 31 17:00:00 2018 PDT | 70 | 10 | 10
(2 rows)

-- Check set_chunk_time_interval on continuous aggregate
CREATE MATERIALIZED VIEW cagg_set_chunk_time_interval
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 month', time) AS bucket,
SUM(fiat_value),
MAX(fiat_value),
MIN(fiat_value)
FROM transactions
GROUP BY 1
WITH NO DATA;
SELECT set_chunk_time_interval('cagg_set_chunk_time_interval', chunk_time_interval => interval '1 month');
set_chunk_time_interval
-------------------------

(1 row)

CALL refresh_continuous_aggregate('cagg_set_chunk_time_interval', NULL, NULL);
SELECT _timescaledb_internal.to_interval(d.interval_length) = interval '1 month'
FROM _timescaledb_catalog.dimension d
RIGHT JOIN _timescaledb_catalog.continuous_agg ca ON ca.user_view_name = 'cagg_set_chunk_time_interval'
WHERE d.hypertable_id = ca.mat_hypertable_id;
?column?
----------
t
(1 row)

26 changes: 26 additions & 0 deletions tsl/test/expected/cagg_ddl_dist_ht.out
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,32 @@ SELECT * FROM transactions_montly ORDER BY bucket;
Wed Oct 31 17:00:00 2018 PDT | 70 | 10 | 10
(2 rows)

-- Check set_chunk_time_interval on continuous aggregate
CREATE MATERIALIZED VIEW cagg_set_chunk_time_interval
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 month', time) AS bucket,
SUM(fiat_value),
MAX(fiat_value),
MIN(fiat_value)
FROM transactions
GROUP BY 1
WITH NO DATA;
SELECT set_chunk_time_interval('cagg_set_chunk_time_interval', chunk_time_interval => interval '1 month');
set_chunk_time_interval
-------------------------

(1 row)

CALL refresh_continuous_aggregate('cagg_set_chunk_time_interval', NULL, NULL);
SELECT _timescaledb_internal.to_interval(d.interval_length) = interval '1 month'
FROM _timescaledb_catalog.dimension d
RIGHT JOIN _timescaledb_catalog.continuous_agg ca ON ca.user_view_name = 'cagg_set_chunk_time_interval'
WHERE d.hypertable_id = ca.mat_hypertable_id;
?column?
----------
t
(1 row)

-- cleanup
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
DROP DATABASE :DATA_NODE_1;
Expand Down
19 changes: 19 additions & 0 deletions tsl/test/sql/include/cagg_ddl_common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1341,3 +1341,22 @@ SELECT * FROM transactions_montly ORDER BY bucket;
-- Full refresh the CAGG
CALL refresh_continuous_aggregate('transactions_montly', NULL, NULL);
SELECT * FROM transactions_montly ORDER BY bucket;

-- Check set_chunk_time_interval on continuous aggregate
CREATE MATERIALIZED VIEW cagg_set_chunk_time_interval
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 month', time) AS bucket,
SUM(fiat_value),
MAX(fiat_value),
MIN(fiat_value)
FROM transactions
GROUP BY 1
WITH NO DATA;

SELECT set_chunk_time_interval('cagg_set_chunk_time_interval', chunk_time_interval => interval '1 month');
CALL refresh_continuous_aggregate('cagg_set_chunk_time_interval', NULL, NULL);

SELECT _timescaledb_internal.to_interval(d.interval_length) = interval '1 month'
FROM _timescaledb_catalog.dimension d
RIGHT JOIN _timescaledb_catalog.continuous_agg ca ON ca.user_view_name = 'cagg_set_chunk_time_interval'
WHERE d.hypertable_id = ca.mat_hypertable_id;

0 comments on commit 963d4ee

Please sign in to comment.