Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use shard split copy code for blocking shard moves #6098

Merged
merged 1 commit into from
Aug 1, 2022

Conversation

JelteF
Copy link
Contributor

@JelteF JelteF commented Jul 28, 2022

DESCRIPTION: Improve performance of blocking shard moves

The new shard copy code that was created for shard splits has some
advantages over the old shard copy code. The old code was using
worker_append_table_to_shard, which wrote to disk twice. And it also
didn't use binary copy when that was possible. Both of these issues
were fixed in the new copy code. This PR starts using this new copy
logic also for shard moves, not just for shard splits.

On my local machine I created a single shard table like this.

set citus.shard_count = 1;
create table t(id bigint, a bigint);
select create_distributed_table('t', 'id');

INSERT into t(id, a) SELECT i, i from generate_series(1, 100000000) i;

I then turned fsync off to make sure I wasn't bottlenecked by disk.
Finally I moved this shard between nodes with citus_move_shard_placement
with block_writes.

Before this PR a move took ~127s, after this PR it took only ~38s. So for this
small test this resulted in spending ~70% less time.

And I also tried the same test for a table that contained large strings:

set citus.shard_count = 1;
create table t(id bigint, a bigint, content text);
select create_distributed_table('t', 'id');

INSERT into t(id, a, content) SELECT i, i, 'aunethautnehoautnheaotnuhetnohueoutnehotnuhetncouhaeohuaeochgrhgd.athbetndairgexdbuhaobulrhdbaetoausnetohuracehousncaoehuesousnaceohuenacouhancoexdaseohusnaetobuetnoduhasneouhaceohusnaoetcuhmsnaetohuacoeuhebtokteaoshetouhsanetouhaoug.lcuahesonuthaseauhcoerhuaoecuh.lg;rcydabsnetabuesabhenth' from generate_series(1, 20000000) i;

The result was less astonishing there, but still quite good:
Before ~60s after ~37s, so spending ~38% less time.

@JelteF JelteF force-pushed the copy-shard-move-new-api branch 6 times, most recently from b2afc83 to 88e2d90 Compare July 28, 2022 15:34
src/test/regress/sql/ignoring_orphaned_shards.sql Outdated Show resolved Hide resolved
* the shard, this is all fine so we temporarily allow it.
*/
ExecuteCriticalRemoteCommand(connection,
"SET LOCAL citus.allow_nested_distributed_execution = true");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the rebalancer we do this by setting the application_name:

    StringInfo setApplicationName = makeStringInfo();
    appendStringInfo(setApplicationName, "SET application_name TO %s",
                     CITUS_REBALANCER_NAME);

wondering whether we should do something similar here (maybe generalized the application_name)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to use the same application_name trick, but I don't think it's worth spending time on to make this more generic. Since the rebalancer won't run commands like this anymore in the near future, because of Nils his background daemon changes.

@JelteF JelteF force-pushed the copy-shard-move-new-api branch 6 times, most recently from 30c16c4 to 7bed347 Compare July 29, 2022 14:17
src/backend/distributed/operations/worker_copy_udf.c Outdated Show resolved Hide resolved
Oid relationId = PG_GETARG_OID(0);
uint32_t targetNodeId = PG_GETARG_INT32(1);

if (IsCitusTable(relationId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check? We're reading from the pg_dist_partition metadata, which we determined might not be ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking I guess we don't need it. Even without it we will get this error:

+ERROR: cannot execute a distributed query from a query on a shard
+DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marcocitus , @JelteF , by any chance, do you remember why reading form the pg_dist_partition metadata "might not be ideal" and why you relied on ERROR: cannot execute a distributed query from a query on a shard?
#6795 shows that in some cases that error may not occur.
May be you should reconsider this decision and check for Citus tables in worker_copy_table_to_node?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing because this might happen in the context of a transaction that modifies pg_dist_partition, and if those happen over a different connection then they might not be visible here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you mean. Could you be more specific about what problem might be caused by this check?

Basing on your comment I can only come up with the following:
There is a Citus table.
On connection A this table is converted to a regular table.
On connection B we call worker_copy_table_to_node for this table, and it fails with error for it doesn't see the changes and still sees that the table is a Citus table.

In this case why can't we wait until the changes become visible and retry worker_copy_table_to_node? It seems to me that till we see the table as a Citus table we shouldn't call worker_copy_table_to_node for it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case why can't we wait until the changes become visible

because changes only become visible when the coordinator commits and finalizes a 2PC, which won't happen until it's done waiting for worker_copy_table_to_node, because committing is always the last thing it does

src/backend/distributed/operations/worker_copy_udf.c Outdated Show resolved Hide resolved
@JelteF JelteF force-pushed the copy-shard-move-new-api branch 5 times, most recently from 4e8b63d to 1922ddf Compare August 1, 2022 11:25
The new shard copy code that was created for shard splits has some
advantages over the old code. This one uses binary copy when possible to
make the copy faster. When doing a shard move using `block_writes` it
now uses this better copy logic.
@JelteF JelteF enabled auto-merge (squash) August 1, 2022 17:02
@JelteF JelteF merged commit abffa6c into main Aug 1, 2022
@JelteF JelteF deleted the copy-shard-move-new-api branch August 1, 2022 17:10
yxu2162 pushed a commit that referenced this pull request Sep 15, 2022
The new shard copy code that was created for shard splits has some
advantages over the old shard copy code. The old code was using 
worker_append_table_to_shard, which wrote to disk twice. And it also 
didn't use binary copy when that was possible. Both of these issues
were fixed in the new copy code. This PR starts using this new copy
logic also for shard moves, not just for shard splits.

On my local machine I created a single shard table like this.
```sql
set citus.shard_count = 1;
create table t(id bigint, a bigint);
select create_distributed_table('t', 'id');

INSERT into t(id, a) SELECT i, i from generate_series(1, 100000000) i;
```

I then turned `fsync` off to make sure I wasn't bottlenecked by disk. 
Finally I moved this shard between nodes with `citus_move_shard_placement`
with `block_writes`.

Before this PR a move took ~127s, after this PR it took only ~38s. So for this 
small test this resulted in spending ~70% less time.

And I also tried the same test for a table that contained large strings:
```sql
set citus.shard_count = 1;
create table t(id bigint, a bigint, content text);
select create_distributed_table('t', 'id');

INSERT into t(id, a, content) SELECT i, i, 'aunethautnehoautnheaotnuhetnohueoutnehotnuhetncouhaeohuaeochgrhgd.athbetndairgexdbuhaobulrhdbaetoausnetohuracehousncaoehuesousnaceohuenacouhancoexdaseohusnaetobuetnoduhasneouhaceohusnaoetcuhmsnaetohuacoeuhebtokteaoshetouhsanetouhaoug.lcuahesonuthaseauhcoerhuaoecuh.lg;rcydabsnetabuesabhenth' from generate_series(1, 20000000) i;
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants