Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use shard split copy code for blocking shard moves #6098

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 57 additions & 7 deletions src/backend/distributed/operations/repair_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "access/htup_details.h"
#include "catalog/pg_class.h"
#include "catalog/pg_enum.h"
#include "distributed/adaptive_executor.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
Expand All @@ -38,6 +39,7 @@
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_split.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
Expand Down Expand Up @@ -129,6 +131,7 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval,
int32 sourceNodePort);
static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval,
List *ddlCommandList);
static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode);


/* declarations for dynamic loading */
Expand Down Expand Up @@ -1180,6 +1183,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);

WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);

/* iterate through the colocated shards and copy each */
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
Expand All @@ -1199,23 +1205,48 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}

ddlCommandList = NIL;

int taskId = 0;
List *copyTaskList = NIL;
foreach_ptr(shardInterval, shardIntervalList)
{
/*
* Skip copying data for partitioned tables, because they contain no
* data themselves. Their partitions do contain data, but those are
* different colocated shards that will be copied seperately.
*/
if (!PartitionedTable(shardInterval->relationId))
{
ddlCommandList = CopyShardContentsCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *copyCommand = CreateShardCopyCommand(
shardInterval, targetNode);

Task *copyTask = CreateBasicTask(
INVALID_JOB_ID,
taskId,
READ_TASK,
copyCommand);

marcocitus marked this conversation as resolved.
Show resolved Hide resolved
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceNode);

copyTask->taskPlacementList = list_make1(taskPlacement);

copyTaskList = lappend(copyTaskList, copyTask);
taskId++;
}
ddlCommandList = list_concat(
ddlCommandList,
}

ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);

foreach_ptr(shardInterval, shardIntervalList)
{
List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort));
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);

Expand Down Expand Up @@ -1278,6 +1309,25 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
}


/*
* CreateShardCopyCommand constructs the command to copy a shard to another
* worker node. This command needs to be run on the node wher you want to copy
* the shard from.
*/
static char *
CreateShardCopyCommand(ShardInterval *shard,
JelteF marked this conversation as resolved.
Show resolved Hide resolved
WorkerNode *targetNode)
{
char *shardName = ConstructQualifiedShardName(shard);
StringInfo query = makeStringInfo();
appendStringInfo(query,
"SELECT pg_catalog.worker_copy_table_to_node(%s::regclass, %u);",
quote_literal_cstr(shardName),
targetNode->nodeId);
return query->data;
}


/*
* CopyPartitionShardsCommandList gets a shardInterval which is a shard that
* belongs to partitioned table (this is asserted).
Expand Down
65 changes: 65 additions & 0 deletions src/backend/distributed/operations/worker_copy_table_to_node_udf.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*-------------------------------------------------------------------------
*
* worker_copy_table_to_node_udf.c
*
* This file implements the worker_copy_table_to_node UDF. This UDF can be
* used to copy the data in a shard (or other table) from one worker node to
* another.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/

#include "postgres.h"

#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_shard_copy.h"

PG_FUNCTION_INFO_V1(worker_copy_table_to_node);

/*
* worker_copy_table_to_node copies a shard from this worker to another worker
*
* SQL signature:
*
* worker_copy_table_to_node(
* source_table regclass,
* target_node_id integer
* ) RETURNS VOID
*/
Datum
worker_copy_table_to_node(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
uint32_t targetNodeId = PG_GETARG_INT32(1);

Oid schemaOid = get_rel_namespace(relationId);
char *relationSchemaName = get_namespace_name(schemaOid);
char *relationName = get_rel_name(relationId);
char *relationQualifiedName = quote_qualified_identifier(
relationSchemaName,
relationName);

EState *executor = CreateExecutorState();
DestReceiver *destReceiver = CreateShardCopyDestReceiver(
executor,
list_make2(relationSchemaName, relationName),
targetNodeId);

StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", relationQualifiedName);

ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,
destReceiver);

FreeExecutorState(executor);

PG_RETURN_VOID();
}
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--11.0-3--11.1-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
#include "udfs/get_all_active_transactions/11.1-1.sql"
#include "udfs/citus_split_shard_by_split_points/11.1-1.sql"
#include "udfs/worker_split_copy/11.1-1.sql"
#include "udfs/worker_copy_table_to_node/11.1-1.sql"
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ DROP FUNCTION pg_catalog.worker_split_copy(
splitCopyInfos pg_catalog.split_copy_info[]);
DROP TYPE pg_catalog.split_copy_info;

DROP FUNCTION pg_catalog.worker_copy_table_to_node(
source_table regclass,
target_node_id integer);

DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4,
OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz,
OUT global_pid int8);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_copy_table_to_node(
source_table regclass,
target_node_id integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_copy_table_to_node$$;
COMMENT ON FUNCTION pg_catalog.worker_copy_table_to_node(regclass, integer)
IS 'Perform copy of a shard';
9 changes: 9 additions & 0 deletions src/backend/distributed/utils/reference_table_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement,
newWorkerNode,
transferMode);

/*
* The placement copy command uses distributed execution to copy
* the shard. This is allowed when indicating that the backend is a
* rebalancer backend.
*/
ExecuteCriticalRemoteCommand(connection,
"SET LOCAL application_name TO "
CITUS_REBALANCER_NAME);
ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data);
RemoteTransactionCommit(connection);
}
Expand Down
28 changes: 5 additions & 23 deletions src/test/regress/expected/failure_offline_move_shard_placement.out
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard_offline.t").

SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: canceling statement due to user request
-- failure on blocking append_table_to_shard operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill()');
-- failure on blocking COPY operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
mitmproxy
---------------------------------------------------------------------

Expand All @@ -101,8 +101,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").kill(
SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on blocking append_table_to_shard operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="worker_append_table_to_shard").cancel(' || :pid || ')');
while executing command on localhost:xxxxx
-- cancellation on blocking COPY operation on target node
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------

Expand All @@ -129,25 +130,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT").cancel(' || :pid ||

SELECT master_move_shard_placement(201, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port, 'block_writes');
ERROR: canceling statement due to user request
-- failure on CopyData operation on source node
SELECT citus.mitmproxy('conn.onCopyData().kill()');
mitmproxy
---------------------------------------------------------------------

(1 row)

SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes');
ERROR: could not copy table "t_200" from "localhost:xxxxx"
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on CopyData operation on source node
SELECT citus.mitmproxy('conn.onCopyData().cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------

(1 row)

SELECT master_move_shard_placement(200, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port, 'block_writes');
ERROR: canceling statement due to user request
CALL citus_cleanup_orphaned_shards();
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT citus.mitmproxy('conn.allow()');
Expand Down
3 changes: 2 additions & 1 deletion src/test/regress/expected/multi_extension.out
Original file line number Diff line number Diff line change
Expand Up @@ -1097,10 +1097,11 @@ SELECT * FROM multi_extension.print_extension_changes();
table columnar.stripe |
| function citus_locks() SETOF record
| function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void
| function worker_copy_table_to_node(regclass,integer) void
| function worker_split_copy(bigint,split_copy_info[]) void
| type split_copy_info
| view citus_locks
(26 rows)
(27 rows)

DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version
Expand Down
Loading