From fb6af6d7a047db00a670d802efba5a1c49edc4ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 17 Oct 2024 14:57:27 +0200 Subject: [PATCH] Fix issues with Hypercore TAM recompression A truncate on a hypercore TAM table is executed across both compressed and non-compressed data. This caused an issue when recompressing because it tries to truncate also the compressed data. Fix this issue by introducing a flag that allows truncating only the non-compressed data. Another issue releated to cache invalidation is also fixed. Since a recompression sometimes creates a new compressed relation, and the compressed relid is cached in the Hypercore TAM's relcache entry, the cache needs to be invalidated during recompression. However, this wasn't done previously leading to an error. This is fixed by adding a relcache invalidation during recompression. Finally, compression using an index scan is disabled for Hypercore TAM since the index covers also compressed data (in the recompression case). While the index could be used when compressing the first time (when only non-compressed data is indexed), it is still disabled completely for Hypercore TAM given that index scans are not used by default anyway. Tests are added to cover all of the issues described above. --- tsl/src/compression/api.c | 22 +++++ tsl/src/compression/compression.c | 26 +++++- tsl/src/hypercore/hypercore_handler.c | 36 +++++--- tsl/src/hypercore/hypercore_handler.h | 3 + tsl/test/expected/hypercore.out | 119 +++++++++++++++++++++++++ tsl/test/expected/hypercore_create.out | 4 +- tsl/test/sql/hypercore.sql | 61 +++++++++++++ 7 files changed, 254 insertions(+), 17 deletions(-) diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index dcb5d769077..257e71b36c4 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -457,6 +457,20 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid) (errmsg("new compressed chunk \"%s.%s\" created", NameStr(compress_ht_chunk->fd.schema_name), NameStr(compress_ht_chunk->fd.table_name)))); + + /* Since a new compressed relation was created it is necessary to + * invalidate the relcache entry for the chunk because Hypercore TAM + * caches information about the compressed relation in the + * relcache. */ + if (ts_is_hypercore_am(cxt.srcht_chunk->amoid)) + { + /* Tell other backends */ + CacheInvalidateRelcacheByRelid(cxt.srcht_chunk->table_id); + + /* Immediately invalidate our own cache */ + RelationCacheInvalidateEntry(cxt.srcht_chunk->table_id); + } + EventTriggerAlterTableEnd(); } else @@ -844,11 +858,19 @@ compress_hypercore(Chunk *chunk, bool rel_is_hypercore, enum UseAccessMethod use TS_FALLTHROUGH; case USE_AM_NULL: Assert(rel_is_hypercore); + /* Don't forward the truncate to the compressed data during recompression */ + bool truncate_compressed = hypercore_set_truncate_compressed(false); relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress); + hypercore_set_truncate_compressed(truncate_compressed); break; case USE_AM_TRUE: if (rel_is_hypercore) + { + /* Don't forward the truncate to the compressed data during recompression */ + bool truncate_compressed = hypercore_set_truncate_compressed(false); relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress); + hypercore_set_truncate_compressed(truncate_compressed); + } else { /* Convert to a compressed hypercore by simply calling ALTER TABLE diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 95b25e47269..fd1bc2f00eb 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -4,6 +4,7 @@ * LICENSE-TIMESCALE for a copy of the license. */ #include +#include #include #include #include @@ -27,6 +28,7 @@ #include "debug_assert.h" #include "debug_point.h" #include "guc.h" +#include "hypercore/hypercore_handler.h" #include "nodes/chunk_dispatch/chunk_insert_state.h" #include "segment_meta.h" #include "ts_catalog/array_utils.h" @@ -180,7 +182,12 @@ static void RelationDeleteAllRows(Relation rel, Snapshot snap) { TupleTableSlot *slot = table_slot_create(rel, NULL); - TableScanDesc scan = table_beginscan(rel, snap, 0, (ScanKey) NULL); + ScanKeyData scankey = { + /* Let compression TAM know it should only return tuples from the + * non-compressed relation. No actual scankey necessary */ + .sk_flags = SK_NO_COMPRESSED, + }; + TableScanDesc scan = table_beginscan(rel, snap, 0, &scankey); while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { @@ -291,8 +298,14 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options) * The following code is trying to find an existing index that * matches the configuration so that we can skip sequential scan and * tuplesort. + * + * Note that Hypercore TAM doesn't support (re-)compression via index at + * this point because the index covers also existing compressed tuples. It + * could be supported for initial compression when there is no compressed + * data, but for now just avoid it altogether since compression indexscan + * isn't enabled by default anyway. */ - if (ts_guc_enable_compression_indexscan) + if (ts_guc_enable_compression_indexscan && !REL_IS_HYPERCORE(in_rel)) { List *in_rel_index_oids = RelationGetIndexList(in_rel); foreach (lc, in_rel_index_oids) @@ -442,6 +455,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options) { int64 nrows_processed = 0; + Assert(!REL_IS_HYPERCORE(in_rel)); elog(ts_guc_debug_compression_path_info ? INFO : DEBUG1, "using index \"%s\" to scan rows for compression", get_rel_name(matched_index_rel->rd_id)); @@ -562,9 +576,13 @@ compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel) Tuplesortstate *tuplesortstate; TableScanDesc scan; TupleTableSlot *slot; - + ScanKeyData scankey = { + /* Let compression TAM know it should only return tuples from the + * non-compressed relation. No actual scankey necessary */ + .sk_flags = SK_NO_COMPRESSED, + }; tuplesortstate = compression_create_tuplesort_state(settings, in_rel); - scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); + scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, &scankey); slot = table_slot_create(in_rel, NULL); while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) diff --git a/tsl/src/hypercore/hypercore_handler.c b/tsl/src/hypercore/hypercore_handler.c index 4b6f240a0ed..72b1ac4f67c 100644 --- a/tsl/src/hypercore/hypercore_handler.c +++ b/tsl/src/hypercore/hypercore_handler.c @@ -64,7 +64,6 @@ #include "debug_assert.h" #include "guc.h" #include "hypercore_handler.h" -#include "process_utility.h" #include "relstats.h" #include "trigger.h" #include "ts_catalog/array_utils.h" @@ -82,6 +81,16 @@ static List *partially_compressed_relids = NIL; /* Relids that needs to have * updated status set at end of * transaction */ +static bool hypercore_truncate_compressed = true; + +bool +hypercore_set_truncate_compressed(bool onoff) +{ + bool old_value = hypercore_truncate_compressed; + hypercore_truncate_compressed = onoff; + return old_value; +} + #define HYPERCORE_AM_INFO_SIZE(natts) \ (sizeof(HypercoreInfo) + (sizeof(ColumnCompressionSettings) * (natts))) @@ -540,12 +549,13 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key ParallelTableScanDesc cptscan = parallel_scan ? (ParallelTableScanDesc) &cpscan->cpscandesc : NULL; - scan->cscan_desc = scan->compressed_rel->rd_tableam->scan_begin(scan->compressed_rel, - snapshot, - scan->rs_base.rs_nkeys, - scan->rs_base.rs_key, - cptscan, - flags); + if (scan->compressed_rel) + scan->cscan_desc = scan->compressed_rel->rd_tableam->scan_begin(scan->compressed_rel, + snapshot, + scan->rs_base.rs_nkeys, + scan->rs_base.rs_key, + cptscan, + flags); return &scan->rs_base; } @@ -560,7 +570,11 @@ hypercore_rescan(TableScanDesc sscan, ScanKey key, bool set_params, bool allow_s scan->reset = true; scan->hs_scan_state = HYPERCORE_SCAN_START; - table_rescan(scan->cscan_desc, key); + if (key && key->sk_flags & SK_NO_COMPRESSED) + scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED; + + if (scan->cscan_desc) + table_rescan(scan->cscan_desc, key); Relation relation = scan->uscan_desc->rs_rd; const TableAmRoutine *oldtam = switch_to_heapam(relation); @@ -1643,7 +1657,7 @@ hypercore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapsh { TM_Result result = TM_Ok; - if (is_compressed_tid(tid)) + if (is_compressed_tid(tid) && hypercore_truncate_compressed) { HypercoreInfo *caminfo = RelationGetHypercoreInfo(relation); Relation crel = table_open(caminfo->compressed_relid, RowExclusiveLock); @@ -1827,7 +1841,7 @@ hypercore_relation_set_new_filelocator(Relation rel, const RelFileLocator *newrl * change the rel file number for it as well. This can happen if you, for * example, execute a transactional TRUNCATE. */ Oid compressed_relid = chunk_get_compressed_chunk_relid(RelationGetRelid(rel)); - if (OidIsValid(compressed_relid)) + if (OidIsValid(compressed_relid) && hypercore_truncate_compressed) { Relation compressed_rel = table_open(compressed_relid, AccessExclusiveLock); #if PG16_GE @@ -1847,7 +1861,7 @@ hypercore_relation_nontransactional_truncate(Relation rel) rel->rd_tableam = oldtam; Oid compressed_relid = chunk_get_compressed_chunk_relid(RelationGetRelid(rel)); - if (OidIsValid(compressed_relid)) + if (OidIsValid(compressed_relid) && hypercore_truncate_compressed) { Relation crel = table_open(compressed_relid, AccessShareLock); crel->rd_tableam->relation_nontransactional_truncate(crel); diff --git a/tsl/src/hypercore/hypercore_handler.h b/tsl/src/hypercore/hypercore_handler.h index 3cc5d5a68fe..e0b0704e1ec 100644 --- a/tsl/src/hypercore/hypercore_handler.h +++ b/tsl/src/hypercore/hypercore_handler.h @@ -25,6 +25,7 @@ extern void hypercore_alter_access_method_begin(Oid relid, bool to_other_am); extern void hypercore_alter_access_method_finish(Oid relid, bool to_other_am); extern Datum hypercore_handler(PG_FUNCTION_ARGS); extern void hypercore_xact_event(XactEvent event, void *arg); +extern bool hypercore_set_truncate_compressed(bool onoff); typedef struct ColumnCompressionSettings { @@ -56,4 +57,6 @@ typedef struct HypercoreInfo ColumnCompressionSettings columns[FLEXIBLE_ARRAY_MEMBER]; } HypercoreInfo; +#define REL_IS_HYPERCORE(rel) ((rel)->rd_tableam == hypercore_routine()) + extern HypercoreInfo *RelationGetHypercoreInfo(Relation rel); diff --git a/tsl/test/expected/hypercore.out b/tsl/test/expected/hypercore.out index 03e5983b7dd..c20d8eed4a4 100644 --- a/tsl/test/expected/hypercore.out +++ b/tsl/test/expected/hypercore.out @@ -592,3 +592,122 @@ SELECT count(*) FROM :chunk; (1 row) drop table readings; +--------------------------------------------- +-- Test recompression via compress_chunk() -- +--------------------------------------------- +show timescaledb.enable_transparent_decompression; + timescaledb.enable_transparent_decompression +---------------------------------------------- + off +(1 row) + +create table recompress (time timestamptz, value int); +select create_hypertable('recompress', 'time', create_default_indexes => false); +NOTICE: adding not-null constraint to column "time" + create_hypertable +------------------------- + (3,public,recompress,t) +(1 row) + +insert into recompress values ('2024-01-01 01:00', 1), ('2024-01-01 02:00', 2); +select format('%I.%I', chunk_schema, chunk_name)::regclass as unique_chunk + from timescaledb_information.chunks + where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'recompress'::regclass + order by unique_chunk asc + limit 1 \gset +alter table recompress set (timescaledb.compress_orderby='time'); +WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes +NOTICE: default segment by for hypertable "recompress" is set to "" +alter table :unique_chunk set access method hypercore; +-- Should already be compressed +select compress_chunk(:'unique_chunk'); +NOTICE: chunk "_hyper_3_34_chunk" is already compressed + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +-- Insert something to compress +insert into recompress values ('2024-01-01 03:00', 3); +select compress_chunk(:'unique_chunk'); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +-- Make sure we see the data after recompression and everything is +-- compressed +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + is_compressed_tid | time | value +-------------------+------------------------------+------- + t | Mon Jan 01 01:00:00 2024 PST | 1 + t | Mon Jan 01 02:00:00 2024 PST | 2 + t | Mon Jan 01 03:00:00 2024 PST | 3 +(3 rows) + +-- Add a time index to test recompression with index scan. Index scans +-- during compression is actually disabled for Hypercore TAM since the +-- index covers also compressed data, so this is only a check that the +-- GUC can be set without negative consequences. +create index on recompress (time); +set timescaledb.enable_compression_indexscan=true; +-- Insert another value to compress +insert into recompress values ('2024-01-02 04:00', 4); +select compress_chunk(:'unique_chunk'); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + is_compressed_tid | time | value +-------------------+------------------------------+------- + t | Mon Jan 01 01:00:00 2024 PST | 1 + t | Mon Jan 01 02:00:00 2024 PST | 2 + t | Mon Jan 01 03:00:00 2024 PST | 3 + t | Tue Jan 02 04:00:00 2024 PST | 4 +(4 rows) + +-- Test using delete instead of truncate when compressing +set timescaledb.enable_delete_after_compression=true; +-- Insert another value to compress +insert into recompress values ('2024-01-02 05:00', 5); +select compress_chunk(:'unique_chunk'); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + is_compressed_tid | time | value +-------------------+------------------------------+------- + t | Mon Jan 01 01:00:00 2024 PST | 1 + t | Mon Jan 01 02:00:00 2024 PST | 2 + t | Mon Jan 01 03:00:00 2024 PST | 3 + t | Tue Jan 02 04:00:00 2024 PST | 4 + t | Tue Jan 02 05:00:00 2024 PST | 5 +(5 rows) + +-- Add a segmentby key to test segmentwise recompression +-- Insert another value to compress that goes into same segment +alter table :unique_chunk set access method heap; +alter table recompress set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='value'); +alter table :unique_chunk set access method hypercore; +insert into recompress values ('2024-01-02 06:00', 5); +select compress_chunk(:'unique_chunk'); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + is_compressed_tid | time | value +-------------------+------------------------------+------- + t | Mon Jan 01 01:00:00 2024 PST | 1 + t | Mon Jan 01 02:00:00 2024 PST | 2 + t | Mon Jan 01 03:00:00 2024 PST | 3 + t | Tue Jan 02 04:00:00 2024 PST | 4 + t | Tue Jan 02 05:00:00 2024 PST | 5 + t | Tue Jan 02 06:00:00 2024 PST | 5 +(6 rows) + diff --git a/tsl/test/expected/hypercore_create.out b/tsl/test/expected/hypercore_create.out index 9e9dd944c2f..f8c548b8576 100644 --- a/tsl/test/expected/hypercore_create.out +++ b/tsl/test/expected/hypercore_create.out @@ -438,7 +438,7 @@ select * from compressed_rel_size_stats order by rel; _timescaledb_internal._hyper_1_7_chunk | hypercore | test2 | 2016 | 10 | 10 _timescaledb_internal._hyper_1_9_chunk | hypercore | test2 | 2016 | 10 | 10 _timescaledb_internal._hyper_1_11_chunk | hypercore | test2 | 373 | 10 | 10 - _timescaledb_internal._hyper_4_13_chunk | hypercore | test3 | 0 | 0 | 0 + _timescaledb_internal._hyper_4_13_chunk | hypercore | test3 | 1 | 1 | 1 _timescaledb_internal._hyper_4_17_chunk | hypercore | test3 | 1 | 1 | 1 _timescaledb_internal._hyper_4_18_chunk | hypercore | test3 | 1 | 1 | 1 (9 rows) @@ -476,7 +476,7 @@ select * from compressed_rel_size_stats order by rel; _timescaledb_internal._hyper_1_7_chunk | heap | test2 | 2016 | 10 | 10 _timescaledb_internal._hyper_1_9_chunk | heap | test2 | 2016 | 10 | 10 _timescaledb_internal._hyper_1_11_chunk | heap | test2 | 373 | 10 | 10 - _timescaledb_internal._hyper_4_13_chunk | heap | test3 | 0 | 0 | 0 + _timescaledb_internal._hyper_4_13_chunk | heap | test3 | 1 | 1 | 1 _timescaledb_internal._hyper_4_17_chunk | heap | test3 | 1 | 1 | 1 _timescaledb_internal._hyper_4_18_chunk | heap | test3 | 1 | 1 | 1 (9 rows) diff --git a/tsl/test/sql/hypercore.sql b/tsl/test/sql/hypercore.sql index 3eba3ae24f8..e8f5978d5d3 100644 --- a/tsl/test/sql/hypercore.sql +++ b/tsl/test/sql/hypercore.sql @@ -310,3 +310,64 @@ SELECT sum(_ts_meta_count) FROM :cchunk; SELECT count(*) FROM :chunk; drop table readings; + +--------------------------------------------- +-- Test recompression via compress_chunk() -- +--------------------------------------------- +show timescaledb.enable_transparent_decompression; + +create table recompress (time timestamptz, value int); +select create_hypertable('recompress', 'time', create_default_indexes => false); +insert into recompress values ('2024-01-01 01:00', 1), ('2024-01-01 02:00', 2); + +select format('%I.%I', chunk_schema, chunk_name)::regclass as unique_chunk + from timescaledb_information.chunks + where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'recompress'::regclass + order by unique_chunk asc + limit 1 \gset + +alter table recompress set (timescaledb.compress_orderby='time'); +alter table :unique_chunk set access method hypercore; + +-- Should already be compressed +select compress_chunk(:'unique_chunk'); + +-- Insert something to compress +insert into recompress values ('2024-01-01 03:00', 3); + +select compress_chunk(:'unique_chunk'); + +-- Make sure we see the data after recompression and everything is +-- compressed +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + +-- Add a time index to test recompression with index scan. Index scans +-- during compression is actually disabled for Hypercore TAM since the +-- index covers also compressed data, so this is only a check that the +-- GUC can be set without negative consequences. +create index on recompress (time); +set timescaledb.enable_compression_indexscan=true; + +-- Insert another value to compress +insert into recompress values ('2024-01-02 04:00', 4); +select compress_chunk(:'unique_chunk'); +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + +-- Test using delete instead of truncate when compressing +set timescaledb.enable_delete_after_compression=true; + +-- Insert another value to compress +insert into recompress values ('2024-01-02 05:00', 5); + +select compress_chunk(:'unique_chunk'); +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + +-- Add a segmentby key to test segmentwise recompression +-- Insert another value to compress that goes into same segment +alter table :unique_chunk set access method heap; +alter table recompress set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='value'); +alter table :unique_chunk set access method hypercore; +insert into recompress values ('2024-01-02 06:00', 5); + +select compress_chunk(:'unique_chunk'); +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;