Skip to content

Commit

Permalink
Updated HiveSyncTool to execute Snowflake commands directly instead o…
Browse files Browse the repository at this point in the history
…f touching via Hive metastore
  • Loading branch information
Waising Wong committed Jun 27, 2019
1 parent fb184e1 commit 04c5ec1
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ public AddPartition(AlterPartitionEvent alterPartitionEvent,
* @param isCompact internal marker to check if this command was generated
* by compaction
*/
protected AddPartition(Table hiveTable,
Iterator<Partition> partitionsIterator,
SnowflakeConf snowflakeConf,
Configuration hiveConf,
boolean isCompact)
public AddPartition(Table hiveTable,
Iterator<Partition> partitionsIterator,
SnowflakeConf snowflakeConf,
Configuration hiveConf,
boolean isCompact)
{
super(hiveTable);
this.hiveTable = Preconditions.checkNotNull(hiveTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public CreateExternalTable(CreateTableEvent createTableEvent,
* @param hiveConf The Hive configuration
* @param canReplace Whether to replace existing resources or not
*/
protected CreateExternalTable(Table hiveTable,
SnowflakeConf snowflakeConf,
Configuration hiveConf,
boolean canReplace)
public CreateExternalTable(Table hiveTable,
SnowflakeConf snowflakeConf,
Configuration hiveConf,
boolean canReplace)
{
super(hiveTable);
this.hiveTable = Preconditions.checkNotNull(hiveTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

import com.google.common.base.Preconditions;
import net.snowflake.hivemetastoreconnector.SnowflakeConf;
import net.snowflake.hivemetastoreconnector.commands.AddPartition;
import net.snowflake.hivemetastoreconnector.commands.CreateExternalTable;
import net.snowflake.hivemetastoreconnector.commands.DropPartition;
import net.snowflake.hivemetastoreconnector.util.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand Down Expand Up @@ -50,7 +53,7 @@ public class HiveSyncTool
*/
public HiveSyncTool(HiveMetaStoreClient hmsClient)
{
this.hmsClient = hmsClient;
this.hmsClient = Preconditions.checkNotNull(hmsClient);
this.snowflakeConf = new SnowflakeConf();
}

Expand Down Expand Up @@ -89,22 +92,43 @@ public void sync() throws TException
{
Preconditions.checkNotNull(tableName);

// Do a touch on the table to fire listener events
// Add missing tables to Snowflake
Table hiveTable = hmsClient.getTable(databaseName, tableName);
hmsClient.alter_table(databaseName, tableName, hiveTable);
SnowflakeClient.generateAndExecuteSnowflakeStatements(
new CreateExternalTable(hiveTable,
snowflakeConf,
new Configuration(), // We won't need Hive configs
false // Don't replace
),
snowflakeConf);

if (!hiveTable.getPartitionKeys().isEmpty())
{
// Drop extra partitions
dropExtraPartitionsFromSnowflake(databaseName, hiveTable);
}

// Do a touch on all partitions to fire listener events
List<Partition> partitions = hmsClient.listPartitions(
databaseName, tableName, (short) -1 /* all partitions */);
log.info(String.format("Syncing %s partitions for table %s.%s",
partitions.size(), tableName, databaseName));
hmsClient.alter_partitions(databaseName, tableName, partitions);
// Add the partitions
List<Partition> partitions = hmsClient.listPartitions(
databaseName, tableName, (short) -1 /* all partitions */);
log.info(String.format("Syncing %s partitions for table %s.%s",
partitions.size(), tableName, databaseName));
if (partitions.isEmpty())
{
log.info(String.format("No need to add partitions for table %s",
tableName));
}
else
{
SnowflakeClient.generateAndExecuteSnowflakeStatements(
new AddPartition(hiveTable,
partitions.iterator(),
snowflakeConf,
new Configuration(), // We won't need Hive configs
false // Not compact
),
snowflakeConf);
}
}
}
}
log.info("Sync complete");
Expand Down Expand Up @@ -158,6 +182,13 @@ private void dropExtraPartitionsFromSnowflake(String databaseName,
|| !hivePartitionRegex.matcher(location).matches())
.collect(Collectors.toList());

if (extraPartitions.isEmpty())
{
log.info(String.format("No need to drop partitions for table %s",
hiveTable.getTableName()));
return;
}

// Drop partitions that aren't in Hive
log.info(String.format("Dropping %s partition locations",
extraPartitions.size()));
Expand Down
64 changes: 27 additions & 37 deletions src/test/java/HiveSyncToolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
*/
import com.google.common.collect.ImmutableList;
import net.snowflake.hivemetastoreconnector.SnowflakeConf;
import net.snowflake.hivemetastoreconnector.commands.DropPartition;
import net.snowflake.hivemetastoreconnector.commands.Command;
import net.snowflake.hivemetastoreconnector.core.HiveSyncTool;
import net.snowflake.hivemetastoreconnector.core.SnowflakeClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand All @@ -24,10 +25,11 @@
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;

/**
Expand All @@ -37,7 +39,8 @@
@PowerMockIgnore("javax.management.*")
@PrepareForTest({Configuration.class, HiveMetaStore.HMSHandler.class,
DriverManager.class, SnowflakeConf.class,
SnowflakeClient.class, HiveSyncTool.class})
SnowflakeClient.class, HiveSyncTool.class,
HiveConf.class})
public class HiveSyncToolTest
{
/**
Expand Down Expand Up @@ -88,9 +91,11 @@ public void basicSyncTest() throws Exception
Partition partition1 = new Partition();
partition1.setSd(new StorageDescriptor());
partition1.getSd().setLocation("s3://path/to/part1");
partition1.setValues(ImmutableList.of("1", "partVal"));
Partition partition2 = new Partition();
partition2.setSd(new StorageDescriptor());
partition2.getSd().setLocation("s3://path/to/part2");
partition2.setValues(ImmutableList.of("2", "partVal"));
List<Partition> mockPartitions1 = ImmutableList.of(partition1, partition2);
List<Partition> mockPartitions2 = ImmutableList.of();
PowerMockito
Expand Down Expand Up @@ -121,54 +126,39 @@ public void basicSyncTest() throws Exception
PowerMockito.mockStatic(SnowflakeClient.class);
PowerMockito.doReturn(mockResultSet).when(SnowflakeClient.class);
SnowflakeClient.executeStatement(any(), any());
AtomicInteger numInvocations = new AtomicInteger();
List<List<String>> invocations = new ArrayList<>();
PowerMockito.doAnswer((Answer) invocation ->
{
Object[] args = invocation.getArguments();
DropPartition cmd = (DropPartition)args[0];
List<String> queries = cmd.generateSqlQueries();
if (numInvocations.getAndIncrement() == 0)
{
assertEquals(1, queries.size());
assertEquals("ALTER EXTERNAL TABLE tbl1 DROP PARTITION LOCATION " +
"'to/part3' /* TABLE LOCATION = 's3://path' */;",
queries.get(0));
}
else
{
assertEquals(0, queries.size());
}
Command cmd = (Command)args[0];
invocations.add(cmd.generateSqlQueries());
return null;
}).when(SnowflakeClient.class);
SnowflakeClient.generateAndExecuteSnowflakeStatements(any(), any());

// Run the tool
SnowflakeConf mockConfig = TestUtil.initializeMockConfig();
PowerMockito
.when(mockConfig.get("snowflake.hive-metastore-listener.integration",
null))
.thenReturn("anIntegration");
PowerMockito
.whenNew(SnowflakeConf.class).withAnyArguments().thenReturn(mockConfig);
new HiveSyncTool(mockHmsClient).sync();

// Verify:
// - tables touched
// - partition 3 is dropped from table 1
// - all partitions are touched
Mockito
.verify(mockHmsClient, Mockito.times(1))
.alter_table("db1", "tbl1", tbl1);
Mockito
.verify(mockHmsClient, Mockito.times(1))
.alter_table("db1", "tbl2", tbl2);

assertEquals(2, numInvocations.get());
// - tables "touched" (2 calls)
// - partition 3 is dropped from table 1 (1 call)
// - all partitions are "touched" (1 calls)
assertEquals(4, invocations.size());

Mockito
.verify(mockHmsClient, Mockito.times(2))
.alter_partitions(any(), any(), any());
Mockito
.verify(mockHmsClient)
.alter_partitions("db1", "tbl1", mockPartitions1);
Mockito
.verify(mockHmsClient)
.alter_partitions("db1", "tbl2", mockPartitions2);
assertTrue(invocations.get(0).get(1).startsWith(
"CREATE EXTERNAL TABLE IF NOT EXISTS tbl1"));
assertTrue(invocations.get(1).get(0).startsWith(
"ALTER EXTERNAL TABLE tbl1 DROP PARTITION LOCATION 'to/part3'"));
assertTrue(invocations.get(2).get(2).startsWith(
"ALTER EXTERNAL TABLE tbl1 ADD PARTITION(partcol='1',name='partVal')"));
assertTrue(invocations.get(3).get(1).startsWith(
"CREATE EXTERNAL TABLE IF NOT EXISTS tbl2"));
}
}

0 comments on commit 04c5ec1

Please sign in to comment.