Skip to content

Commit

Permalink
Use shard split copy code for blocking shard moves
Browse files Browse the repository at this point in the history
The new shard copy code that was created for shard splits has some
advantages over the old code. This one uses binary copy when possible to
make the copy faster. When doing a shard move using `block_writes` it
now uses this better copy logic.
  • Loading branch information
JelteF committed Jul 29, 2022
1 parent ccc3b1b commit 7bed347
Show file tree
Hide file tree
Showing 19 changed files with 414 additions and 231 deletions.
58 changes: 51 additions & 7 deletions src/backend/distributed/operations/repair_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "access/htup_details.h"
#include "catalog/pg_class.h"
#include "catalog/pg_enum.h"
#include "distributed/adaptive_executor.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
Expand All @@ -38,6 +39,7 @@
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_split.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
Expand Down Expand Up @@ -129,6 +131,7 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval,
int32 sourceNodePort);
static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval,
List *ddlCommandList);
static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode);


/* declarations for dynamic loading */
Expand Down Expand Up @@ -1180,6 +1183,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);

WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);

/* iterate through the colocated shards and copy each */
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
Expand All @@ -1199,23 +1205,48 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}

ddlCommandList = NIL;

int taskId = 0;
List *copyTaskList = NIL;
foreach_ptr(shardInterval, shardIntervalList)
{
/*
* Skip copying data for partitioned tables, because they contain no
* data themselves. Their partitions do contain data, but those are
* different colocated shards that will be copied seperately.
*/
if (!PartitionedTable(shardInterval->relationId))
{
ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *copyCommand = CreateShardCopyCommand(
shardInterval, targetNode);

Task *copyTask = CreateBasicTask(
INVALID_JOB_ID,
taskId,
READ_TASK,
copyCommand);

ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceNode);

copyTask->taskPlacementList = list_make1(taskPlacement);

copyTaskList = lappend(copyTaskList, copyTask);
taskId++;
}
ddlCommandList = list_concat(
ddlCommandList,
}

ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);

foreach_ptr(shardInterval, shardIntervalList)
{
List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort));
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);

Expand Down Expand Up @@ -1278,6 +1309,19 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
}


static char *
CreateShardCopyCommand(ShardInterval *shard,
WorkerNode *targetNode)
{
char *shardName = ConstructQualifiedShardName(shard);
StringInfo query = makeStringInfo();
appendStringInfo(query, "SELECT pg_catalog.worker_copy(%s::regclass, %u);",
quote_literal_cstr(shardName),
targetNode->nodeId);
return query->data;
}


/*
* CopyPartitionShardsCommandList gets a shardInterval which is a shard that
* belongs to partitioned table (this is asserted).
Expand Down
70 changes: 70 additions & 0 deletions src/backend/distributed/operations/worker_copy_udf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*-------------------------------------------------------------------------
*
* worker_copy_udf.c
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/

#include "postgres.h"

#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_shard_copy.h"

PG_FUNCTION_INFO_V1(worker_copy);

/*
* worker_copy copies a shard from this worker to another worker
*
* SQL signature:
*
* worker_copy(
* source_table regclass,
* target_node_id integer
* ) RETURNS VOID
*/
Datum
worker_copy(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
uint32_t targetNodeId = PG_GETARG_INT32(1);

if (IsCitusTable(relationId))
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("table %s is a Citus table, only copies of "
"shards or regular postgres tables are supported",
qualifiedRelationName)));
}

Oid schemaOid = get_rel_namespace(relationId);
char *relationSchemaName = get_namespace_name(schemaOid);
char *relationName = get_rel_name(relationId);
char *relationQualifiedName = quote_qualified_identifier(
relationSchemaName,
relationName);

EState *executor = CreateExecutorState();
DestReceiver *destReceiver = CreateShardCopyDestReceiver(
executor,
list_make2(relationSchemaName, relationName),
targetNodeId);

StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", relationQualifiedName);

ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,
destReceiver);

FreeExecutorState(executor);

PG_RETURN_VOID();
}
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--11.0-3--11.1-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
#include "udfs/get_all_active_transactions/11.1-1.sql"
#include "udfs/citus_split_shard_by_split_points/11.1-1.sql"
#include "udfs/worker_split_copy/11.1-1.sql"
#include "udfs/worker_copy/11.1-1.sql"
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ DROP FUNCTION pg_catalog.worker_split_copy(
splitCopyInfos pg_catalog.split_copy_info[]);
DROP TYPE pg_catalog.split_copy_info;

DROP FUNCTION pg_catalog.worker_copy(
source_table regclass,
target_node_id integer);

DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4,
OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz,
OUT global_pid int8);
Expand Down
8 changes: 8 additions & 0 deletions src/backend/distributed/sql/udfs/worker_copy/11.1-1.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/backend/distributed/sql/udfs/worker_copy/latest.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_copy(
source_table regclass,
target_node_id integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_copy(regclass, integer)
IS 'Perform copy of a shard';
9 changes: 9 additions & 0 deletions src/backend/distributed/utils/reference_table_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement,
newWorkerNode,
transferMode);

/*
* The placement copy command uses distributed execution to copy
* the shard. This is allowed when indicating that the backend is a
* rebalancer backend.
*/
ExecuteCriticalRemoteCommand(connection,
"SET LOCAL application_name TO "
CITUS_REBALANCER_NAME);
ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data);
RemoteTransactionCommit(connection);
}
Expand Down
28 changes: 5 additions & 23 deletions src/test/regress/expected/failure_offline_move_shard_placement.out
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t").

SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: canceling statement due to user request
-- failure on blocking append_table_to_shard operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()');
-- failure on blocking COPY operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
mitmproxy
---------------------------------------------------------------------

Expand All @@ -101,8 +101,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill(
SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on blocking append_table_to_shard operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')');
while executing command on localhost:xxxxx
-- cancellation on blocking COPY operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------

Expand All @@ -129,25 +130,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid ||

SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: canceling statement due to user request
-- failure on CopyData operation on source node
SELECT citus.mitmproxy('conn.onCopyData().kill()');
mitmproxy
---------------------------------------------------------------------

(1 row)

SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes');
ERROR: could not copy table "t_200" from "localhost:xxxxx"
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on CopyData operation on source node
SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------

(1 row)

SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes');
ERROR: canceling statement due to user request
CALL citus_cleanup_orphaned_shards();
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT citus.mitmproxy('conn.allow()');
Expand Down
3 changes: 2 additions & 1 deletion src/test/regress/expected/multi_extension.out
Original file line number Diff line number Diff line change
Expand Up @@ -1097,10 +1097,11 @@ SELECT * FROM multi_extension.print_extension_changes();
table columnar.stripe |
| function citus_locks() SETOF record
| function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void
| function worker_copy(regclass,integer) void
| function worker_split_copy(bigint,split_copy_info[]) void
| type split_copy_info
| view citus_locks
(26 rows)
(27 rows)

DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version
Expand Down
Loading

0 comments on commit 7bed347

Please sign in to comment.