diff --git a/pom.xml b/pom.xml index f22a7df..3942dec 100644 --- a/pom.xml +++ b/pom.xml @@ -102,12 +102,12 @@ org.apache.hive hive-metastore - 3.1.1 + 2.3.5 org.apache.hadoop hadoop-common - 3.1.1 + 2.7.7 net.snowflake diff --git a/scripts/sync_hive_to_snowflake.sh b/scripts/sync_hive_to_snowflake.sh new file mode 100644 index 0000000..9e3a887 --- /dev/null +++ b/scripts/sync_hive_to_snowflake.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -e + +HIVE_CLASSPATH=$(hive -e "set env:CLASSPATH;" | grep env:CLASSPATH= | sed -e "s/^env:CLASSPATH=//") +java -cp $HIVE_CLASSPATH:. net.snowflake.hivemetastoreconnector.core.HiveSyncTool \ No newline at end of file diff --git a/src/main/java/net/snowflake/hivemetastoreconnector/SnowflakeConf.java b/src/main/java/net/snowflake/hivemetastoreconnector/SnowflakeConf.java index 19cde8a..fd78391 100644 --- a/src/main/java/net/snowflake/hivemetastoreconnector/SnowflakeConf.java +++ b/src/main/java/net/snowflake/hivemetastoreconnector/SnowflakeConf.java @@ -39,6 +39,8 @@ public enum ConfVars "The role to use to connect to Snowflake."), SNOWFLAKE_JDBC_DB("snowflake.jdbc.db", "db", "The database to use to connect to Snowflake."), + SNOWFLAKE_JDBC_WAREHOUSE("snowflake.jdbc.warehouse", "warehouse", + "The warehouse to use with Snowflake, if necessary."), SNOWFLAKE_JDBC_SCHEMA("snowflake.jdbc.schema", "schema", "The schema to use to connect to Snowflake."), SNOWFLAKE_JDBC_SSL("snowflake.jdbc.ssl", "ssl", diff --git a/src/main/java/net/snowflake/hivemetastoreconnector/commands/AddPartition.java b/src/main/java/net/snowflake/hivemetastoreconnector/commands/AddPartition.java index eeb5a50..404de21 100644 --- a/src/main/java/net/snowflake/hivemetastoreconnector/commands/AddPartition.java +++ b/src/main/java/net/snowflake/hivemetastoreconnector/commands/AddPartition.java @@ -73,11 +73,11 @@ public AddPartition(AlterPartitionEvent alterPartitionEvent, * @param isCompact internal marker to check if this command was generated * by compaction */ - protected AddPartition(Table hiveTable, - Iterator partitionsIterator, - SnowflakeConf snowflakeConf, - Configuration hiveConf, - boolean isCompact) + public AddPartition(Table hiveTable, + Iterator partitionsIterator, + SnowflakeConf snowflakeConf, + Configuration hiveConf, + boolean isCompact) { super(hiveTable); this.hiveTable = Preconditions.checkNotNull(hiveTable); diff --git a/src/main/java/net/snowflake/hivemetastoreconnector/commands/CreateExternalTable.java b/src/main/java/net/snowflake/hivemetastoreconnector/commands/CreateExternalTable.java index 4176a95..0c49d18 100644 --- a/src/main/java/net/snowflake/hivemetastoreconnector/commands/CreateExternalTable.java +++ b/src/main/java/net/snowflake/hivemetastoreconnector/commands/CreateExternalTable.java @@ -50,10 +50,10 @@ public CreateExternalTable(CreateTableEvent createTableEvent, * @param hiveConf The Hive configuration * @param canReplace Whether to replace existing resources or not */ - protected CreateExternalTable(Table hiveTable, - SnowflakeConf snowflakeConf, - Configuration hiveConf, - boolean canReplace) + public CreateExternalTable(Table hiveTable, + SnowflakeConf snowflakeConf, + Configuration hiveConf, + boolean canReplace) { super(hiveTable); this.hiveTable = Preconditions.checkNotNull(hiveTable); diff --git a/src/main/java/net/snowflake/hivemetastoreconnector/commands/DropPartition.java b/src/main/java/net/snowflake/hivemetastoreconnector/commands/DropPartition.java index a161f99..dcdf18f 100644 --- a/src/main/java/net/snowflake/hivemetastoreconnector/commands/DropPartition.java +++ b/src/main/java/net/snowflake/hivemetastoreconnector/commands/DropPartition.java @@ -4,6 +4,7 @@ package net.snowflake.hivemetastoreconnector.commands; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; import net.snowflake.hivemetastoreconnector.util.StringUtil; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -12,7 +13,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.function.Supplier; /** * A class for the DropPartition command @@ -28,18 +28,37 @@ public DropPartition(DropPartitionEvent dropPartitionEvent) { super(Preconditions.checkNotNull(dropPartitionEvent).getTable()); this.hiveTable = Preconditions.checkNotNull(dropPartitionEvent.getTable()); - this.getPartititonsIterator = dropPartitionEvent::getPartitionIterator; + // Avoid 'this' constructor due to usage of hiveTable below + this.partititonLocationsIterator = Iterators.transform( + dropPartitionEvent.getPartitionIterator(), + partition -> StringUtil.relativizePartitionURI( + hiveTable, + Preconditions.checkNotNull(partition))); + } + + /** + * Creates a DropPartition command + * @param hiveTable The Hive table to generate a command from + * @param partititonLocationsIterator iterator of the locations of the + * partitions to drop + */ + public DropPartition(Table hiveTable, Iterator partititonLocationsIterator) + { + super(hiveTable); + this.hiveTable = Preconditions.checkNotNull(hiveTable); + this.partititonLocationsIterator = + Preconditions.checkNotNull(partititonLocationsIterator); } /** * Generates the command for drop partitions. * Note: Unlike Hive, Snowflake partitions are dropped using locations. - * @param partition Partition object to generate a command from + * @param partitionLocation Partition location to generate a command from * @return The Snowflake command generated, for example: * ALTER EXTERNAL TABLE t1 DROP PARTITION LOCATION 'location' * /* TABLE LOCATION = 's3n://bucketname/path/to/table' * /; */ - private String generateDropPartitionCommand(Partition partition) + private String generateDropPartitionCommand(String partitionLocation) { return String.format( "ALTER EXTERNAL TABLE %1$s " + @@ -47,7 +66,7 @@ private String generateDropPartitionCommand(Partition partition) "LOCATION '%2$s' " + "/* TABLE LOCATION = '%3$s' */;", StringUtil.escapeSqlIdentifier(hiveTable.getTableName()), - StringUtil.escapeSqlText(StringUtil.relativizePartitionURI(hiveTable, partition)), + StringUtil.escapeSqlText(partitionLocation), StringUtil.escapeSqlComment(hiveTable.getSd().getLocation())); } @@ -59,11 +78,10 @@ public List generateSqlQueries() { List queryList = new ArrayList<>(); - Iterator partitionIterator = this.getPartititonsIterator.get(); - while (partitionIterator.hasNext()) + while (partititonLocationsIterator.hasNext()) { - Partition partition = partitionIterator.next(); - queryList.add(this.generateDropPartitionCommand(partition)); + queryList.add( + this.generateDropPartitionCommand(partititonLocationsIterator.next())); } return queryList; @@ -71,5 +89,5 @@ public List generateSqlQueries() private final Table hiveTable; - private final Supplier> getPartititonsIterator; + private final Iterator partititonLocationsIterator; } diff --git a/src/main/java/net/snowflake/hivemetastoreconnector/core/HiveSyncTool.java b/src/main/java/net/snowflake/hivemetastoreconnector/core/HiveSyncTool.java new file mode 100644 index 0000000..0dde915 --- /dev/null +++ b/src/main/java/net/snowflake/hivemetastoreconnector/core/HiveSyncTool.java @@ -0,0 +1,279 @@ +/* + * Copyright (c) 2012-2019 Snowflake Computing Inc. All right reserved. + */ +package net.snowflake.hivemetastoreconnector.core; + +import com.google.common.base.Preconditions; +import net.snowflake.hivemetastoreconnector.SnowflakeConf; +import net.snowflake.hivemetastoreconnector.commands.AddPartition; +import net.snowflake.hivemetastoreconnector.commands.CreateExternalTable; +import net.snowflake.hivemetastoreconnector.commands.DropPartition; +import net.snowflake.hivemetastoreconnector.util.StringUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for syncing metadata from Hive to Snowflake + * @author wwong + */ +public class HiveSyncTool +{ + private static final Logger log = LoggerFactory.getLogger(HiveSyncTool.class); + + // Note: HiveMetaStoreClient is not backwards compatible. Notably, 2.X + // versions use a HiveConf argument instead of a Configuration + // argument. Also, if the version of the HiveMetaStoreClient is not + // compatible with the metastore, calls to the metastore may return + // wrong results, e.g. client.getAllDatabases() returns no databases. + private final HiveMetaStoreClient hmsClient; + + // The Snowflake configuration + private final SnowflakeConf snowflakeConf; + + /** + * Instantiates the HiveSyncTool. + * @param hmsClient a client for the Hive metastore + */ + public HiveSyncTool(HiveMetaStoreClient hmsClient) + { + this.hmsClient = Preconditions.checkNotNull(hmsClient); + this.snowflakeConf = new SnowflakeConf(); + } + + /** + * Does a one-time, one-way metadata sync from the Hive metastore to + * Snowflake. + * 1. Tables in Hive are created in Snowflake + * 2. Partitions not in Hive are dropped from Snowflake + * 3. Partitions in Hive are added to Snowflake + * Note: does not drop tables from Snowflake. + * @throws TException thrown when encountering a Thrift exception while + * communicating with the metastore or executing a metastore operation + */ + public void sync() throws TException + { + Pattern tableNameFilter = snowflakeConf.getPattern( + SnowflakeConf.ConfVars.SNOWFLAKE_TABLE_FILTER_REGEX.getVarname(), null); + Pattern databaseNameFilter = snowflakeConf.getPattern( + SnowflakeConf.ConfVars.SNOWFLAKE_DATABASE_FILTER_REGEX.getVarname(), null); + + log.info("Starting sync"); + List databaseNames = hmsClient.getAllDatabases().stream() + .filter(db -> databaseNameFilter == null || !databaseNameFilter.matcher(db).matches()) + .collect(Collectors.toList()); + log.info(String.format("Syncing %s databases from Hive", + databaseNames.size())); + for (String databaseName : databaseNames) + { + Preconditions.checkNotNull(databaseName); + List tableNames = hmsClient.getAllTables(databaseName).stream() + .filter(table -> tableNameFilter == null || !tableNameFilter.matcher(table).matches()) + .collect(Collectors.toList()); + log.info(String.format("Syncing %s tables for database %s", + tableNames.size(), databaseName)); + for (String tableName : tableNames) + { + Preconditions.checkNotNull(tableName); + + // Add missing tables to Snowflake + Table hiveTable = hmsClient.getTable(databaseName, tableName); + SnowflakeClient.generateAndExecuteSnowflakeStatements( + new CreateExternalTable(hiveTable, + snowflakeConf, + new Configuration(), // We won't need Hive configs + false // Don't replace + ), + snowflakeConf); + + if (!hiveTable.getPartitionKeys().isEmpty()) + { + // Drop extra partitions + dropExtraPartitionsFromSnowflake(databaseName, hiveTable); + + // Add the partitions + List partitions = hmsClient.listPartitions( + databaseName, tableName, (short) -1 /* all partitions */); + log.info(String.format("Syncing %s partitions for table %s.%s", + partitions.size(), tableName, databaseName)); + if (partitions.isEmpty()) + { + log.info(String.format("No need to add partitions for table %s", + tableName)); + } + else + { + SnowflakeClient.generateAndExecuteSnowflakeStatements( + new AddPartition(hiveTable, + partitions.iterator(), + snowflakeConf, + new Configuration(), // We won't need Hive configs + false // Not compact + ), + snowflakeConf); + } + } + } + } + log.info("Sync complete"); + } + + /** + * Helper method that drops extra partitions from Snowflake if they are + * not in Hive. + * @throws TException thrown when encountering a Thrift exception while + * communicating with the metastore or executing a metastore operation + */ + private void dropExtraPartitionsFromSnowflake(String databaseName, + Table hiveTable) + throws TException + { + Preconditions.checkNotNull(databaseName); + Preconditions.checkNotNull(Preconditions.checkNotNull(hiveTable).getTableName()); + + // Get Snowflake partition locations + Set sfPartLocs; + try + { + sfPartLocs = getSnowflakePartitionLocations(hiveTable); + } + catch (IllegalStateException | SQLException ex) + { + log.warn(String.format( + "Error encountered, skipping dropping partitions for table %s. Error: %s", + hiveTable.getTableName(), ex)); + return; + } + + // Listing partitions in Hive should always be done after the list on + // Snowflake. This prevents the edge case where a partition is added + // between the Hive and Snowflake list operations. + List hivePartitions = hmsClient.listPartitions( + databaseName, hiveTable.getTableName(), (short) -1 /* all partitions */); + Preconditions.checkNotNull(hivePartitions); + log.info(String.format("Found %s partitions in Hive.", hivePartitions.size())); + + // Ensure that no file in Snowflake is removed if it's prefixed by any Hive + // location, by filtering with a generated regex + Pattern hivePartitionRegex = Pattern.compile(String.format("(%s).*", + String.join("|", + hivePartitions.stream() + .map(partition -> StringUtil.relativizePartitionURI( + hiveTable, Preconditions.checkNotNull(partition))) + .collect(Collectors.toList())))); + List extraPartitions = sfPartLocs.stream() + .filter(location -> hivePartitions.isEmpty() + || !hivePartitionRegex.matcher(location).matches()) + .collect(Collectors.toList()); + + if (extraPartitions.isEmpty()) + { + log.info(String.format("No need to drop partitions for table %s", + hiveTable.getTableName())); + return; + } + + // Drop partitions that aren't in Hive + log.info(String.format("Dropping %s partition locations", + extraPartitions.size())); + SnowflakeClient.generateAndExecuteSnowflakeStatements( + new DropPartition(hiveTable, extraPartitions.iterator()), + snowflakeConf); + } + + /** + * Retrieves a set of locations for a Snowflake table that can be used as + * an argument to the drop partition command. + * @param hiveTable the Hive table + * @return a set of partition locations + * @throws SQLException Thrown when there was an error executing a Snowflake + * SQL query (if a Snowflake query must be executed). + * @throws IllegalStateException thrown when the file paths from Snowflake + * are in an unexpected format + */ + private Set getSnowflakePartitionLocations(Table hiveTable) + throws SQLException, IllegalStateException + { + // Get the list of files from Snowflake. Note that this abuses the + // behavior of Snowflake's drop partition command to unregister + // individual subdirectories instead of actual partitions. + ResultSet filePathsResult = SnowflakeClient.executeStatement( + String.format( + "SELECT FILE_NAME FROM " + + "table(information_schema.external_table_files('%s'));", + StringUtil.escapeSqlText(hiveTable.getTableName())), + snowflakeConf); + Preconditions.checkNotNull(filePathsResult); + Preconditions.checkState(filePathsResult.getMetaData().getColumnCount() == 1); + + // The relevant paths have the following form: + // s3:////// + // |- Hive table --------| + // |- Hive partitions ------------------------| + // |- Snowflake partitions -----| + // |- Snowflake files --------------------------------| + // To get the Snowflake partitions from the Snowflake files: + // - Append the protocol and bucket + // - Remove the file name + // - Get the path relative to the Hive table + String[] hiveTableLocSplit = hiveTable.getSd().getLocation().split("/"); + Preconditions.checkArgument(hiveTableLocSplit.length > 2); + String prefix = String.join("/", + Arrays.asList(hiveTableLocSplit).subList(0, 3)); + + List snowflakePartitionLocations = new ArrayList<>(); + while (filePathsResult.next()) + { + // Note: Must call ResultSet.next() once to move cursor to the first row. + // Also, column indices are 1-based. + String filePath = filePathsResult.getString(1); + + Preconditions.checkState(filePath.contains("/"), + String.format("No directories to partition on. Path: %s.", filePath)); + + String absFilePath = prefix + "/" + filePath.substring(0, + filePath.lastIndexOf("/")); + Optional partitionLocation = StringUtil.relativizeURI( + hiveTable.getSd().getLocation(), absFilePath); + Preconditions.checkState(partitionLocation.isPresent(), + String.format("Could not relativize %s with %s", + hiveTable.getSd().getLocation(), absFilePath)); + + snowflakePartitionLocations.add(partitionLocation.get()); + } + log.info(String.format("Found %s files in Snowflake.", snowflakePartitionLocations.size())); + + return new HashSet<>(snowflakePartitionLocations); + } + + /** + * A convenient entry point for the sync tool. Expects Hive and Hadoop + * libraries to be in the classpath. + * See also: {@link #sync()} + * @param args program arguments, which are not used + * @throws TException thrown when encountering a Thrift exception while + * communicating with the metastore or executing a metastore operation + */ + public static void main(final String[] args) throws TException + { + Preconditions.checkArgument(args.length == 0, + "The Hive sync tool expects no arguments."); + new HiveSyncTool(new HiveMetaStoreClient(new HiveConf())).sync(); + } +} diff --git a/src/test/java/AlterExternalTableTest.java b/src/test/java/AlterExternalTableTest.java index 7f42f5b..42523af 100644 --- a/src/test/java/AlterExternalTableTest.java +++ b/src/test/java/AlterExternalTableTest.java @@ -37,7 +37,7 @@ public void basicTouchTableGenerateCommandTest() throws Exception Table table = TestUtil.initializeMockTable(); HiveMetaStore.HMSHandler mockHandler = TestUtil.initializeMockHMSHandler(); AlterTableEvent alterTableEvent = new AlterTableEvent(table, table, - true, true, mockHandler); + true, mockHandler); AlterExternalTable alterTable = new AlterExternalTable(alterTableEvent, TestUtil.initializeMockConfig()); @@ -77,7 +77,7 @@ public void unpartitionedTouchTableGenerateCommandTest() throws Exception new FieldSchema("col2", "string", null))); HiveMetaStore.HMSHandler mockHandler = TestUtil.initializeMockHMSHandler(); AlterTableEvent alterTableEvent = new AlterTableEvent(table, table, - true, true, mockHandler); + true, mockHandler); AlterExternalTable alterTable = new AlterExternalTable(alterTableEvent, TestUtil.initializeMockConfig()); @@ -116,7 +116,7 @@ public void addColumnsGenerateCommandTest() throws Exception newTable.getSd().getCols().add(new FieldSchema("new2", "string", null)); HiveMetaStore.HMSHandler mockHandler = TestUtil.initializeMockHMSHandler(); AlterTableEvent alterTableEvent = new AlterTableEvent(oldTable, newTable, - true, true, mockHandler); + true, mockHandler); AlterExternalTable alterTable = new AlterExternalTable(alterTableEvent, TestUtil.initializeMockConfig()); @@ -160,7 +160,7 @@ public void dropColumnsGenerateCommandTest() throws Exception Table newTable = TestUtil.initializeMockTable(); HiveMetaStore.HMSHandler mockHandler = TestUtil.initializeMockHMSHandler(); AlterTableEvent alterTableEvent = new AlterTableEvent(oldTable, newTable, - true, true, mockHandler); + true, mockHandler); AlterExternalTable alterTable = new AlterExternalTable(alterTableEvent, TestUtil.initializeMockConfig()); @@ -205,7 +205,7 @@ public void mixAddDropColumnsGenerateCommandTest() throws Exception newTable.getSd().getCols().add(new FieldSchema("new2", "string", null)); HiveMetaStore.HMSHandler mockHandler = TestUtil.initializeMockHMSHandler(); AlterTableEvent alterTableEvent = new AlterTableEvent(oldTable, newTable, - true, true, mockHandler); + true, mockHandler); AlterExternalTable alterTable = new AlterExternalTable(alterTableEvent, TestUtil.initializeMockConfig()); diff --git a/src/test/java/HiveSyncToolTest.java b/src/test/java/HiveSyncToolTest.java new file mode 100644 index 0000000..764aadb --- /dev/null +++ b/src/test/java/HiveSyncToolTest.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2012-2019 Snowflake Computing Inc. All right reserved. + */ +import com.google.common.collect.ImmutableList; +import net.snowflake.hivemetastoreconnector.SnowflakeConf; +import net.snowflake.hivemetastoreconnector.commands.Command; +import net.snowflake.hivemetastoreconnector.core.HiveSyncTool; +import net.snowflake.hivemetastoreconnector.core.SnowflakeClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; + +/** + * Unit tests for the sync tool + */ +@RunWith(PowerMockRunner.class) +@PowerMockIgnore("javax.management.*") +@PrepareForTest({Configuration.class, HiveMetaStore.HMSHandler.class, + DriverManager.class, SnowflakeConf.class, + SnowflakeClient.class, HiveSyncTool.class, + HiveConf.class}) +public class HiveSyncToolTest +{ + /** + * A basic test for the sync tool + * + * @throws Exception + */ + @Test + public void basicSyncTest() throws Exception + { + // Mock the following via the HiveMetaStoreClient: + // db1: + // - tbl1: + // - partition 1 + // - partition 2 + // - tbl2 (empty) + // db2 (empty) + HiveMetaStoreClient mockHmsClient = PowerMockito.mock(HiveMetaStoreClient.class); + PowerMockito + .when(mockHmsClient.getAllDatabases()) + .thenReturn(ImmutableList.of("db1", "db2")); + PowerMockito + .when(mockHmsClient.getAllTables("db1")) + .thenReturn(ImmutableList.of("tbl1", "tbl2")); + PowerMockito + .when(mockHmsClient.getAllTables("db2")) + .thenReturn(ImmutableList.of()); + + // Mock tbl1 + Table tbl1 = TestUtil.initializeMockTable(); + PowerMockito + .when(mockHmsClient.getTable("db1", "tbl1")) + .thenReturn(tbl1); + tbl1.setTableName("tbl1"); + tbl1.setDbName("db1"); + tbl1.getSd().setLocation("s3://path"); + + // Mock tbl2 + Table tbl2 = TestUtil.initializeMockTable(); + PowerMockito + .when(mockHmsClient.getTable("db1", "tbl2")) + .thenReturn(tbl2); + tbl2.setTableName("tbl2"); + tbl2.setDbName("db1"); + tbl2.getSd().setLocation("s3://path"); + + // Mock partitions + Partition partition1 = new Partition(); + partition1.setSd(new StorageDescriptor()); + partition1.getSd().setLocation("s3://path/to/part1"); + partition1.setValues(ImmutableList.of("1", "partVal")); + Partition partition2 = new Partition(); + partition2.setSd(new StorageDescriptor()); + partition2.getSd().setLocation("s3://path/to/part2"); + partition2.setValues(ImmutableList.of("2", "partVal")); + List mockPartitions1 = ImmutableList.of(partition1, partition2); + List mockPartitions2 = ImmutableList.of(); + PowerMockito + .when(mockHmsClient.listPartitions("db1", "tbl1", (short) -1)) + .thenReturn(mockPartitions1); + PowerMockito + .when(mockHmsClient.listPartitions("db1", "tbl2", (short) -1)) + .thenReturn(mockPartitions2); + + // Mock the following via the SnowflakeClient: + // db1: + // - tbl1: + // - partition 2 + // - partition 3 + ResultSet mockResultSet = PowerMockito.mock(ResultSet.class); + Mockito.when(mockResultSet.next()) + .thenReturn(true) // tbl1 has 2 files + .thenReturn(true) + .thenReturn(false); // tbl2 has no files + Mockito.when(mockResultSet.getString(1)) + .thenReturn("to/part2/file") + .thenReturn("to/part3/file"); + ResultSetMetaData mockMetaData = PowerMockito.mock(ResultSetMetaData.class); + Mockito.when(mockMetaData.getColumnCount()) + .thenReturn(1); + Mockito.when(mockResultSet.getMetaData()) + .thenReturn(mockMetaData); + PowerMockito.mockStatic(SnowflakeClient.class); + PowerMockito.doReturn(mockResultSet).when(SnowflakeClient.class); + SnowflakeClient.executeStatement(any(), any()); + List> invocations = new ArrayList<>(); + PowerMockito.doAnswer((Answer) invocation -> + { + Object[] args = invocation.getArguments(); + Command cmd = (Command)args[0]; + invocations.add(cmd.generateSqlQueries()); + return null; + }).when(SnowflakeClient.class); + SnowflakeClient.generateAndExecuteSnowflakeStatements(any(), any()); + + // Run the tool + SnowflakeConf mockConfig = TestUtil.initializeMockConfig(); + PowerMockito + .when(mockConfig.get("snowflake.hive-metastore-listener.integration", + null)) + .thenReturn("anIntegration"); + PowerMockito + .whenNew(SnowflakeConf.class).withAnyArguments().thenReturn(mockConfig); + new HiveSyncTool(mockHmsClient).sync(); + + // Verify: + // - tables "touched" (2 calls) + // - partition 3 is dropped from table 1 (1 call) + // - all partitions are "touched" (1 calls) + assertEquals(4, invocations.size()); + + assertTrue(invocations.get(0).get(1).startsWith( + "CREATE EXTERNAL TABLE IF NOT EXISTS tbl1")); + assertTrue(invocations.get(1).get(0).startsWith( + "ALTER EXTERNAL TABLE tbl1 DROP PARTITION LOCATION 'to/part3'")); + assertTrue(invocations.get(2).get(2).startsWith( + "ALTER EXTERNAL TABLE tbl1 ADD PARTITION(partcol='1',name='partVal')")); + assertTrue(invocations.get(3).get(1).startsWith( + "CREATE EXTERNAL TABLE IF NOT EXISTS tbl2")); + } +}