Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with Hypercore TAM recompression #7364

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,20 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
(errmsg("new compressed chunk \"%s.%s\" created",
NameStr(compress_ht_chunk->fd.schema_name),
NameStr(compress_ht_chunk->fd.table_name))));

/* Since a new compressed relation was created it is necessary to
* invalidate the relcache entry for the chunk because Hypercore TAM
* caches information about the compressed relation in the
* relcache. */
if (ts_is_hypercore_am(cxt.srcht_chunk->amoid))
{
/* Tell other backends */
CacheInvalidateRelcacheByRelid(cxt.srcht_chunk->table_id);

/* Immediately invalidate our own cache */
RelationCacheInvalidateEntry(cxt.srcht_chunk->table_id);
}

EventTriggerAlterTableEnd();
}
else
Expand Down Expand Up @@ -844,11 +858,19 @@ compress_hypercore(Chunk *chunk, bool rel_is_hypercore, enum UseAccessMethod use
TS_FALLTHROUGH;
case USE_AM_NULL:
Assert(rel_is_hypercore);
/* Don't forward the truncate to the compressed data during recompression */
bool truncate_compressed = hypercore_set_truncate_compressed(false);
relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress);
hypercore_set_truncate_compressed(truncate_compressed);
break;
case USE_AM_TRUE:
if (rel_is_hypercore)
{
/* Don't forward the truncate to the compressed data during recompression */
bool truncate_compressed = hypercore_set_truncate_compressed(false);
relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress);
hypercore_set_truncate_compressed(truncate_compressed);
}
else
{
/* Convert to a compressed hypercore by simply calling ALTER TABLE
Expand Down
26 changes: 22 additions & 4 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/skey.h>
#include <catalog/heap.h>
#include <catalog/pg_am.h>
#include <common/base64.h>
Expand All @@ -27,6 +28,7 @@
#include "debug_assert.h"
#include "debug_point.h"
#include "guc.h"
#include "hypercore/hypercore_handler.h"
#include "nodes/chunk_dispatch/chunk_insert_state.h"
#include "segment_meta.h"
#include "ts_catalog/array_utils.h"
Expand Down Expand Up @@ -180,7 +182,12 @@ static void
RelationDeleteAllRows(Relation rel, Snapshot snap)
{
TupleTableSlot *slot = table_slot_create(rel, NULL);
TableScanDesc scan = table_beginscan(rel, snap, 0, (ScanKey) NULL);
ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
TableScanDesc scan = table_beginscan(rel, snap, 0, &scankey);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand Down Expand Up @@ -291,8 +298,14 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
* The following code is trying to find an existing index that
* matches the configuration so that we can skip sequential scan and
* tuplesort.
*
* Note that Hypercore TAM doesn't support (re-)compression via index at
* this point because the index covers also existing compressed tuples. It
* could be supported for initial compression when there is no compressed
* data, but for now just avoid it altogether since compression indexscan
* isn't enabled by default anyway.
*/
if (ts_guc_enable_compression_indexscan)
if (ts_guc_enable_compression_indexscan && !REL_IS_HYPERCORE(in_rel))
{
List *in_rel_index_oids = RelationGetIndexList(in_rel);
foreach (lc, in_rel_index_oids)
Expand Down Expand Up @@ -442,6 +455,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
{
int64 nrows_processed = 0;

Assert(!REL_IS_HYPERCORE(in_rel));
elog(ts_guc_debug_compression_path_info ? INFO : DEBUG1,
"using index \"%s\" to scan rows for compression",
get_rel_name(matched_index_rel->rd_id));
Expand Down Expand Up @@ -562,9 +576,13 @@ compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel)
Tuplesortstate *tuplesortstate;
TableScanDesc scan;
TupleTableSlot *slot;

ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
tuplesortstate = compression_create_tuplesort_state(settings, in_rel);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, &scankey);
slot = table_slot_create(in_rel, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand Down
23 changes: 18 additions & 5 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
#include "debug_assert.h"
#include "guc.h"
#include "hypercore_handler.h"
#include "process_utility.h"
#include "relstats.h"
#include "trigger.h"
#include "ts_catalog/array_utils.h"
Expand All @@ -82,6 +81,16 @@
* updated status set at end of
* transaction */

static bool hypercore_truncate_compressed = true;

bool
hypercore_set_truncate_compressed(bool onoff)
{
bool old_value = hypercore_truncate_compressed;
hypercore_truncate_compressed = onoff;
return old_value;
}

#define HYPERCORE_AM_INFO_SIZE(natts) \
(sizeof(HypercoreInfo) + (sizeof(ColumnCompressionSettings) * (natts)))

Expand Down Expand Up @@ -560,7 +569,11 @@
scan->reset = true;
scan->hs_scan_state = HYPERCORE_SCAN_START;

table_rescan(scan->cscan_desc, key);
if (key && key->sk_flags & SK_NO_COMPRESSED)
scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;

Check warning on line 573 in tsl/src/hypercore/hypercore_handler.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/hypercore_handler.c#L573

Added line #L573 was not covered by tests

if (scan->cscan_desc)
table_rescan(scan->cscan_desc, key);

Relation relation = scan->uscan_desc->rs_rd;
const TableAmRoutine *oldtam = switch_to_heapam(relation);
Expand Down Expand Up @@ -1643,7 +1656,7 @@
{
TM_Result result = TM_Ok;

if (is_compressed_tid(tid))
if (is_compressed_tid(tid) && hypercore_truncate_compressed)
{
HypercoreInfo *caminfo = RelationGetHypercoreInfo(relation);
Relation crel = table_open(caminfo->compressed_relid, RowExclusiveLock);
Expand Down Expand Up @@ -1827,7 +1840,7 @@
* change the rel file number for it as well. This can happen if you, for
* example, execute a transactional TRUNCATE. */
Oid compressed_relid = chunk_get_compressed_chunk_relid(RelationGetRelid(rel));
if (OidIsValid(compressed_relid))
if (OidIsValid(compressed_relid) && hypercore_truncate_compressed)
{
Relation compressed_rel = table_open(compressed_relid, AccessExclusiveLock);
#if PG16_GE
Expand All @@ -1847,7 +1860,7 @@
rel->rd_tableam = oldtam;

Oid compressed_relid = chunk_get_compressed_chunk_relid(RelationGetRelid(rel));
if (OidIsValid(compressed_relid))
if (OidIsValid(compressed_relid) && hypercore_truncate_compressed)
{
Relation crel = table_open(compressed_relid, AccessShareLock);
crel->rd_tableam->relation_nontransactional_truncate(crel);
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/hypercore/hypercore_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern void hypercore_alter_access_method_begin(Oid relid, bool to_other_am);
extern void hypercore_alter_access_method_finish(Oid relid, bool to_other_am);
extern Datum hypercore_handler(PG_FUNCTION_ARGS);
extern void hypercore_xact_event(XactEvent event, void *arg);
extern bool hypercore_set_truncate_compressed(bool onoff);

typedef struct ColumnCompressionSettings
{
Expand Down Expand Up @@ -56,4 +57,6 @@ typedef struct HypercoreInfo
ColumnCompressionSettings columns[FLEXIBLE_ARRAY_MEMBER];
} HypercoreInfo;

#define REL_IS_HYPERCORE(rel) ((rel)->rd_tableam == hypercore_routine())

extern HypercoreInfo *RelationGetHypercoreInfo(Relation rel);
119 changes: 119 additions & 0 deletions tsl/test/expected/hypercore.out
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,122 @@ SELECT count(*) FROM :chunk;
(1 row)

drop table readings;
---------------------------------------------
-- Test recompression via compress_chunk() --
---------------------------------------------
show timescaledb.enable_transparent_decompression;
timescaledb.enable_transparent_decompression
----------------------------------------------
off
(1 row)

create table recompress (time timestamptz, value int);
select create_hypertable('recompress', 'time', create_default_indexes => false);
NOTICE: adding not-null constraint to column "time"
create_hypertable
-------------------------
(3,public,recompress,t)
(1 row)

insert into recompress values ('2024-01-01 01:00', 1), ('2024-01-01 02:00', 2);
select format('%I.%I', chunk_schema, chunk_name)::regclass as unique_chunk
from timescaledb_information.chunks
where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'recompress'::regclass
order by unique_chunk asc
limit 1 \gset
alter table recompress set (timescaledb.compress_orderby='time');
WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes
NOTICE: default segment by for hypertable "recompress" is set to ""
alter table :unique_chunk set access method hypercore;
-- Should already be compressed
select compress_chunk(:'unique_chunk');
NOTICE: chunk "_hyper_3_34_chunk" is already compressed
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

-- Insert something to compress
insert into recompress values ('2024-01-01 03:00', 3);
select compress_chunk(:'unique_chunk');
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

-- Make sure we see the data after recompression and everything is
-- compressed
select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
is_compressed_tid | time | value
-------------------+------------------------------+-------
t | Mon Jan 01 01:00:00 2024 PST | 1
t | Mon Jan 01 02:00:00 2024 PST | 2
t | Mon Jan 01 03:00:00 2024 PST | 3
(3 rows)

-- Add a time index to test recompression with index scan. Index scans
-- during compression is actually disabled for Hypercore TAM since the
-- index covers also compressed data, so this is only a check that the
-- GUC can be set without negative consequences.
create index on recompress (time);
set timescaledb.enable_compression_indexscan=true;
-- Insert another value to compress
insert into recompress values ('2024-01-02 04:00', 4);
select compress_chunk(:'unique_chunk');
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
is_compressed_tid | time | value
-------------------+------------------------------+-------
t | Mon Jan 01 01:00:00 2024 PST | 1
t | Mon Jan 01 02:00:00 2024 PST | 2
t | Mon Jan 01 03:00:00 2024 PST | 3
t | Tue Jan 02 04:00:00 2024 PST | 4
(4 rows)

-- Test using delete instead of truncate when compressing
set timescaledb.enable_delete_after_compression=true;
-- Insert another value to compress
insert into recompress values ('2024-01-02 05:00', 5);
select compress_chunk(:'unique_chunk');
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
is_compressed_tid | time | value
-------------------+------------------------------+-------
t | Mon Jan 01 01:00:00 2024 PST | 1
t | Mon Jan 01 02:00:00 2024 PST | 2
t | Mon Jan 01 03:00:00 2024 PST | 3
t | Tue Jan 02 04:00:00 2024 PST | 4
t | Tue Jan 02 05:00:00 2024 PST | 5
(5 rows)

-- Add a segmentby key to test segmentwise recompression
-- Insert another value to compress that goes into same segment
alter table :unique_chunk set access method heap;
alter table recompress set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='value');
alter table :unique_chunk set access method hypercore;
insert into recompress values ('2024-01-02 06:00', 5);
select compress_chunk(:'unique_chunk');
compress_chunk
-----------------------------------------
_timescaledb_internal._hyper_3_34_chunk
(1 row)

select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
is_compressed_tid | time | value
-------------------+------------------------------+-------
t | Mon Jan 01 01:00:00 2024 PST | 1
t | Mon Jan 01 02:00:00 2024 PST | 2
t | Mon Jan 01 03:00:00 2024 PST | 3
t | Tue Jan 02 04:00:00 2024 PST | 4
t | Tue Jan 02 05:00:00 2024 PST | 5
t | Tue Jan 02 06:00:00 2024 PST | 5
(6 rows)

4 changes: 2 additions & 2 deletions tsl/test/expected/hypercore_create.out
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ select * from compressed_rel_size_stats order by rel;
_timescaledb_internal._hyper_1_7_chunk | hypercore | test2 | 2016 | 10 | 10
_timescaledb_internal._hyper_1_9_chunk | hypercore | test2 | 2016 | 10 | 10
_timescaledb_internal._hyper_1_11_chunk | hypercore | test2 | 373 | 10 | 10
_timescaledb_internal._hyper_4_13_chunk | hypercore | test3 | 0 | 0 | 0
_timescaledb_internal._hyper_4_13_chunk | hypercore | test3 | 1 | 1 | 1
_timescaledb_internal._hyper_4_17_chunk | hypercore | test3 | 1 | 1 | 1
_timescaledb_internal._hyper_4_18_chunk | hypercore | test3 | 1 | 1 | 1
(9 rows)
Expand Down Expand Up @@ -476,7 +476,7 @@ select * from compressed_rel_size_stats order by rel;
_timescaledb_internal._hyper_1_7_chunk | heap | test2 | 2016 | 10 | 10
_timescaledb_internal._hyper_1_9_chunk | heap | test2 | 2016 | 10 | 10
_timescaledb_internal._hyper_1_11_chunk | heap | test2 | 373 | 10 | 10
_timescaledb_internal._hyper_4_13_chunk | heap | test3 | 0 | 0 | 0
_timescaledb_internal._hyper_4_13_chunk | heap | test3 | 1 | 1 | 1
_timescaledb_internal._hyper_4_17_chunk | heap | test3 | 1 | 1 | 1
_timescaledb_internal._hyper_4_18_chunk | heap | test3 | 1 | 1 | 1
(9 rows)
Expand Down
61 changes: 61 additions & 0 deletions tsl/test/sql/hypercore.sql
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,64 @@ SELECT sum(_ts_meta_count) FROM :cchunk;
SELECT count(*) FROM :chunk;

drop table readings;

---------------------------------------------
-- Test recompression via compress_chunk() --
---------------------------------------------
show timescaledb.enable_transparent_decompression;

create table recompress (time timestamptz, value int);
select create_hypertable('recompress', 'time', create_default_indexes => false);
insert into recompress values ('2024-01-01 01:00', 1), ('2024-01-01 02:00', 2);

select format('%I.%I', chunk_schema, chunk_name)::regclass as unique_chunk
from timescaledb_information.chunks
where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'recompress'::regclass
order by unique_chunk asc
limit 1 \gset

alter table recompress set (timescaledb.compress_orderby='time');
alter table :unique_chunk set access method hypercore;

-- Should already be compressed
select compress_chunk(:'unique_chunk');

-- Insert something to compress
insert into recompress values ('2024-01-01 03:00', 3);

select compress_chunk(:'unique_chunk');

-- Make sure we see the data after recompression and everything is
-- compressed
select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;

-- Add a time index to test recompression with index scan. Index scans
-- during compression is actually disabled for Hypercore TAM since the
-- index covers also compressed data, so this is only a check that the
-- GUC can be set without negative consequences.
create index on recompress (time);
set timescaledb.enable_compression_indexscan=true;

-- Insert another value to compress
insert into recompress values ('2024-01-02 04:00', 4);
select compress_chunk(:'unique_chunk');
select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;

-- Test using delete instead of truncate when compressing
set timescaledb.enable_delete_after_compression=true;

-- Insert another value to compress
insert into recompress values ('2024-01-02 05:00', 5);

select compress_chunk(:'unique_chunk');
select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;

-- Add a segmentby key to test segmentwise recompression
-- Insert another value to compress that goes into same segment
alter table :unique_chunk set access method heap;
alter table recompress set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='value');
alter table :unique_chunk set access method hypercore;
insert into recompress values ('2024-01-02 06:00', 5);

select compress_chunk(:'unique_chunk');
select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time;
Loading