Skip to content

Commit

Permalink
Add dump/restore support for Hypercore TAM
Browse files Browse the repository at this point in the history
Add support for dumping and restoring hypertables that have chunks
that use the Hypercore TAM.

Dumping a Hypercore table requires special consideration because its
data is internally stored in two separate relations: one for
compressed data and one for non-compressed data. The TAM returns data
from both relations, but they may be dumped as separate tables. This
risks dumping the compressed data twice: once via the TAM and once via
the compressed table in compressed format.

The `pg_dump` tool uses `COPY TO` to create dumps of each table, and,
to avoid data duplication when used on Hypercore tables, this change
introduces a GUC that allows selecting one of these two behaviors:

1. A `COPY TO` on a Hypercore table returns all data via the TAM,
   including data stored in the compressed relation. A `COPY TO` on
   the internal compressed relation returns no data.

2. A `COPY TO` on a Hypercore returns only non-compressed data, while
   a `COPY TO` on the compressed relation returns compressed data. A
   `SELECT` still returns all the data as normal.

The second approach is the default because it is consistent with
compression when Hypercore TAM is not used. It will produce a
`pg_dump` archive that includes data in compressed form (if data was
compressed when dumped). Conversely, option (1) will produce an
archive that looks identical to a dump from an non-compressed table.

There are pros and cons of each dump format. A non-compressed archive
is a platform-agnostic logical dump that can be restored to any
platform and architecture, while a compressed archive includes data
that is compressed in a platform-dependent way and needs to be
restored to a compatible system.

A test is added that tests both these settings and corresponding
dumping and restoring.
  • Loading branch information
erimatnor committed Oct 18, 2024
1 parent efce9ce commit 304334f
Show file tree
Hide file tree
Showing 10 changed files with 647 additions and 7 deletions.
23 changes: 23 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ static const struct config_enum_entry transparent_decompression_options[] = {
{ NULL, 0, false }
};

static const struct config_enum_entry hypercore_copy_to_options[] = {
{ "all_data", HYPERCORE_COPY_ALL_DATA, false },
{ "no_compressed_data", HYPERCORE_COPY_NO_COMPRESSED_DATA, false },
{ NULL, 0, false }
};

bool ts_guc_enable_deprecation_warnings = true;
bool ts_guc_enable_optimizations = true;
bool ts_guc_restoring = false;
Expand Down Expand Up @@ -147,6 +153,8 @@ bool ts_guc_enable_tss_callbacks = true;
TSDLLEXPORT bool ts_guc_enable_delete_after_compression = false;
TSDLLEXPORT bool ts_guc_enable_merge_on_cagg_refresh = false;
TSDLLEXPORT char *ts_guc_hypercore_indexam_whitelist;
TSDLLEXPORT HypercoreCopyToBehavior ts_guc_hypercore_copy_to_behavior =
HYPERCORE_COPY_NO_COMPRESSED_DATA;

/* default value of ts_guc_max_open_chunks_per_insert and
* ts_guc_max_cached_chunks_per_hypertable will be set as their respective boot-value when the
Expand All @@ -163,6 +171,7 @@ char *ts_last_tune_time = NULL;
char *ts_last_tune_version = NULL;

bool ts_guc_debug_require_batch_sorted_merge = false;

bool ts_guc_debug_allow_cagg_with_deprecated_funcs = false;

#ifdef TS_DEBUG
Expand Down Expand Up @@ -973,6 +982,20 @@ _guc_init(void)
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

DefineCustomEnumVariable(MAKE_EXTOPTION("hypercore_copy_to_behavior"),
"The behavior of COPY TO on a hypercore table",
"Set to 'all_data' to return both compressed and uncompressed data "
"via the Hypercore table when using COPY TO. Set to "
"'no_compressed_data' to skip compressed data.",
/* valueAddr= */ (int *) &ts_guc_hypercore_copy_to_behavior,
/* bootValue= */ HYPERCORE_COPY_NO_COMPRESSED_DATA,
/* options= */ hypercore_copy_to_options,
/* context= */ PGC_USERSET,
0,
NULL,
NULL,
NULL);

#ifdef TS_DEBUG
DefineCustomBoolVariable(/* name= */ MAKE_EXTOPTION("shutdown_bgw_scheduler"),
/* short_desc= */ "immediately shutdown the bgw scheduler",
Expand Down
19 changes: 19 additions & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ extern TSDLLEXPORT bool ts_guc_debug_require_batch_sorted_merge;
extern TSDLLEXPORT bool ts_guc_debug_allow_cagg_with_deprecated_funcs;
extern TSDLLEXPORT char *ts_guc_hypercore_indexam_whitelist;

/*
* Defines the behavior of COPY TO when used on a Hypercore table.
*
* If set to COPY_ALL_DATA, all data is copied from a Hypercore table,
* including compressed data (but in uncompressed form) from the internal
* compressed relation. When doing a COPY TO on the internal compressed
* relation, no data is returned.
*
* If set to COPY_NO_COMPRESSED_DATA, then only uncompressed data is copied
* (if any). This behavior is compatible with compression without hypercore.
*/
typedef enum HypercoreCopyToBehavior
{
HYPERCORE_COPY_ALL_DATA,
HYPERCORE_COPY_NO_COMPRESSED_DATA,
} HypercoreCopyToBehavior;

extern TSDLLEXPORT HypercoreCopyToBehavior ts_guc_hypercore_copy_to_behavior;

void _guc_init(void);

typedef enum
Expand Down
3 changes: 2 additions & 1 deletion test/sql/utils/pg_dump_aux_dump.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
DUMPFILE=${DUMPFILE:-$1}
EXTRA_PGOPTIONS=${EXTRA_PGOPTIONS:-$2}
# Override PGOPTIONS to remove verbose output
PGOPTIONS='--client-min-messages=warning'
PGOPTIONS="--client-min-messages=warning $EXTRA_PGOPTIONS"

export PGOPTIONS

Expand Down
8 changes: 6 additions & 2 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,10 @@ fetch_unmatched_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tuples
TableScanDesc scan;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);
Snapshot snapshot = GetLatestSnapshot();

scan = table_beginscan(uncompressed_chunk_rel, snapshot, 0, NULL);
/* If scan is using Hypercore, configure the scan to only return
* compressed data */
hypercore_scan_set_skip_compressed(scan);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand Down Expand Up @@ -1206,11 +1209,12 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
}

snapshot = GetLatestSnapshot();
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. */

scan = table_beginscan(uncompressed_chunk_rel, snapshot, nsegbycols_nonnull, scankey);
/* Let Hypercore TAM know it should only return tuples from the
* non-compressed relation. */
hypercore_scan_set_skip_compressed(scan);

TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand Down
47 changes: 43 additions & 4 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include "compression/compression.h"
#include "compression/create.h"
#include "debug_assert.h"
#include "extension.h"
#include "guc.h"
#include "hypercore_handler.h"
#include "relstats.h"
Expand All @@ -80,6 +81,20 @@ static void convert_to_hypercore_finish(Oid relid);
static List *partially_compressed_relids = NIL; /* Relids that needs to have
* updated status set at end of
* transaction */
/*
* For COPY <hypercore_rel> TO commands, track the relid of the hypercore
* being copied from. It is needed to filter out compressed data in the COPY
* scan so that pg_dump does not dump compressed data twice: once in
* uncompressed format via the hypercore rel and once in compressed format in
* the internal compressed rel that gets dumped separately.
*/
static Oid hypercore_skip_compressed_data_relid = InvalidOid;

void
hypercore_skip_compressed_data_for_relation(Oid relid)
{
hypercore_skip_compressed_data_relid = relid;
}

static bool hypercore_truncate_compressed = true;

Expand Down Expand Up @@ -176,7 +191,7 @@ static HypercoreInfo *
lazy_build_hypercore_info_cache(Relation rel, bool create_chunk_constraints,
bool *compressed_relation_created)
{
Assert(OidIsValid(rel->rd_id) && !ts_is_hypertable(rel->rd_id));
Assert(OidIsValid(rel->rd_id) && (!ts_extension_is_loaded() || !ts_is_hypertable(rel->rd_id)));

HypercoreInfo *hsinfo;
CompressionSettings *settings;
Expand Down Expand Up @@ -491,6 +506,27 @@ get_scan_type(uint32 flags)
}
#endif

static inline bool
should_skip_compressed_data(const TableScanDesc scan)
{
/*
* Skip compressed data in a scan if any of these apply:
*
* 1. Transaparent decompression (DecompressChunk) is enabled for
* hypercore.
*
* 2. The scan was started with a flag indicating no compressed data
* should be returned.
*
* 3. A COPY <hypercore> TO <file> on the hypercore is executed and we
* want to ensure such commands issued by pg_dump doesn't lead to
* dumping compressed data twice.
*/
return (ts_guc_enable_transparent_decompression == 2) ||
RelationGetRelid(scan->rs_rd) == hypercore_skip_compressed_data_relid ||
(scan->rs_flags & SO_HYPERCORE_SKIP_COMPRESSED);
}

static TableScanDesc
hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey keys,
ParallelTableScanDesc parallel_scan, uint32 flags)
Expand Down Expand Up @@ -527,8 +563,7 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key
HypercoreInfo *hsinfo = RelationGetHypercoreInfo(relation);
scan->compressed_rel = table_open(hsinfo->compressed_relid, AccessShareLock);

if ((ts_guc_enable_transparent_decompression == 2) ||
(flags & SO_HYPERCORE_SKIP_COMPRESSED))
if (should_skip_compressed_data(&scan->rs_base))
{
/*
* Don't read compressed data if transparent decompression is enabled
Expand All @@ -537,7 +572,7 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key
* Transparent decompression reads compressed data itself, directly
* from the compressed chunk, so avoid reading it again here.
*/
scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;
hypercore_scan_set_skip_compressed(&scan->rs_base);
}

initscan(scan, keys, nkeys);
Expand Down Expand Up @@ -634,6 +669,9 @@ hypercore_endscan(TableScanDesc sscan)
pfree(scan->rs_base.rs_key);

pfree(scan);

/* Clear the COPY TO filter state */
hypercore_skip_compressed_data_relid = InvalidOid;
}

static bool
Expand Down Expand Up @@ -3402,6 +3440,7 @@ hypercore_xact_event(XactEvent event, void *arg)
Ensure(OidIsValid(hsinfo->compressed_relid),
"hypercore \"%s\" has no compressed data relation",
get_rel_name(relid));

Chunk *chunk = ts_chunk_get_by_relid(relid, true);
ts_chunk_set_partial(chunk);
table_close(rel, NoLock);
Expand Down
1 change: 1 addition & 0 deletions tsl/src/hypercore/hypercore_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extern void hypercore_alter_access_method_finish(Oid relid, bool to_other_am);
extern Datum hypercore_handler(PG_FUNCTION_ARGS);
extern void hypercore_xact_event(XactEvent event, void *arg);
extern bool hypercore_set_truncate_compressed(bool onoff);
extern void hypercore_skip_compressed_data_for_relation(Oid relid);
extern void hypercore_scan_set_skip_compressed(TableScanDesc scan);

typedef struct ColumnCompressionSettings
Expand Down
75 changes: 75 additions & 0 deletions tsl/src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,84 @@

#include "compression/create.h"
#include "continuous_aggs/create.h"
#include "guc.h"
#include "hypercore/hypercore_handler.h"
#include "hypercore/utils.h"
#include "hypertable_cache.h"
#include "process_utility.h"
#include "ts_catalog/continuous_agg.h"

static DDLResult
process_copy(ProcessUtilityArgs *args)
{
CopyStmt *stmt = castNode(CopyStmt, args->parsetree);

if (!stmt->relation || stmt->is_from)
return DDL_CONTINUE;

Oid relid = RangeVarGetRelid(stmt->relation, NoLock, true);

if (!OidIsValid(relid))
return DDL_CONTINUE;

Check warning on line 37 in tsl/src/process_utility.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/process_utility.c#L37

Added line #L37 was not covered by tests

Oid amoid = ts_get_rel_am(relid);

if (ts_is_hypercore_am(amoid))
{
if (ts_guc_hypercore_copy_to_behavior == HYPERCORE_COPY_NO_COMPRESSED_DATA)
{
hypercore_skip_compressed_data_for_relation(relid);
ereport(NOTICE,
(errmsg("COPY: skipping compressed data for hypercore \"%s\"",
get_rel_name(relid)),
errdetail(
"Use timescaledb.hypercore_copy_to_behavior to change this behavior.")));
}
}
else if (ts_guc_hypercore_copy_to_behavior == HYPERCORE_COPY_ALL_DATA)
{
const Chunk *chunk = ts_chunk_get_by_relid(relid, false);

if (!chunk)
return DDL_CONTINUE;

const Chunk *parent = ts_chunk_get_compressed_chunk_parent(chunk);
Oid parent_amoid = ts_get_rel_am(parent->table_id);

if (parent && ts_is_hypercore_am(parent_amoid))
{
/* To avoid returning compressed data twice in a pg_dump, replace
* the 'COPY <relation> TO' with 'COPY (select where false) TO' so
* that the COPY on the internal compressed relation returns no
* data. The data is instead returned in uncompressed form via the
* parent hypercore relation. */
SelectStmt *select = makeNode(SelectStmt);
A_Const *aconst = makeNode(A_Const);
#if PG15_LT
aconst->val.type = T_Integer;
aconst->val.val.ival = 0;

Check warning on line 74 in tsl/src/process_utility.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/process_utility.c#L73-L74

Added lines #L73 - L74 were not covered by tests
#else
aconst->val.boolval.boolval = false;
aconst->val.boolval.type = T_Boolean;
#endif
select->whereClause = (Node *) aconst;
stmt->relation = NULL;
stmt->attlist = NIL;
stmt->query = (Node *) select;
ereport(NOTICE,
(errmsg("COPY: skipping data for internal compression relation \"%s\"",
get_rel_name(chunk->table_id)),
errdetail("Use COPY TO on hypercore relation \"%s\" to return data in "
"uncompressed form"
" or use timescaledb.hypercore_copy_to_behavior "
"to change this behavior.",
get_rel_name(parent->table_id))));
}
}

return DDL_CONTINUE;
}

DDLResult
tsl_ddl_command_start(ProcessUtilityArgs *args)
{
Expand Down Expand Up @@ -98,6 +170,9 @@ tsl_ddl_command_start(ProcessUtilityArgs *args)
result = DDL_DONE;
break;
}
case T_CopyStmt:
result = process_copy(args);
break;
default:
break;
}
Expand Down
Loading

0 comments on commit 304334f

Please sign in to comment.