Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiple tables when importing snapshots #4358

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bigtable-dataflow-parent/bigtable-beam-import/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ limitations under the License.
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,46 @@
*/
package com.google.cloud.bigtable.beam.hbasesnapshots;

import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.beam.TemplateUtils;
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.HBaseSnapshotInputConfigBuilder;
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.CleanupHBaseSnapshotRestoreFiles;
import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.CleanupRestoredSnapshots;
import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.RestoreSnapshot;
import com.google.cloud.bigtable.beam.hbasesnapshots.transforms.ListRegions;
import com.google.cloud.bigtable.beam.hbasesnapshots.transforms.ReadRegions;
import com.google.cloud.bigtable.beam.sequencefiles.HBaseResultToMutationFn;
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob;
import com.google.cloud.bigtable.beam.sequencefiles.Utils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

Expand Down Expand Up @@ -68,6 +86,16 @@
public class ImportJobFromHbaseSnapshot {
private static final Log LOG = LogFactory.getLog(ImportJobFromHbaseSnapshot.class);

@VisibleForTesting
static final String MISSING_SNAPSHOT_SOURCEPATH =
"Source Path containing hbase snapshots must be specified.";

@VisibleForTesting
static final String MISSING_SNAPSHOT_NAMES =
"Snapshots must be specified. Allowed values are '*' (indicating all snapshots under source path) or "
+ "'prefix*' (snapshots matching certain prefix) or 'snapshotname1:tablename1,snapshotname2:tablename2' "
+ "(comma seperated list of snapshots)";

public interface ImportOptions extends ImportJob.ImportOptions {
@Description("The HBase root dir where HBase snapshot files resides.")
String getHbaseSnapshotSourceDir();
Expand All @@ -87,24 +115,163 @@ public interface ImportOptions extends ImportJob.ImportOptions {

@SuppressWarnings("unused")
void setEnableSnappy(Boolean enableSnappy);

@Description("Path to config file containing snapshot source path/snapshot names.")
String getImportConfigFilePath();

void setImportConfigFilePath(String value);

@Description(
"Snapshots to be imported. Can be '*', 'prefix*' or 'snap1,snap2' or 'snap1:table1,snap2:table2'.")
String getSnapshots();

void setSnapshots(String value);

@Description("Specifies whether to use dynamic splitting while reading hbase region.")
@Default.Boolean(true)
boolean getUseDynamicSplitting();

void setUseDynamicSplitting(boolean value);
}

public static void main(String[] args) throws Exception {
PipelineOptionsFactory.register(ImportOptions.class);

ImportOptions opts =
ImportOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportOptions.class);

// To determine the Google Cloud Storage file scheme (gs://)
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create().as(GcsOptions.class));

LOG.info("Building Pipeline");
Pipeline pipeline = buildPipeline(opts);
Pipeline pipeline = null;
// Maintain Backward compatibility until deprecation
if (options.getSnapshotName() != null && !options.getSnapshotName().isEmpty()) {
pipeline = buildPipeline(options);
} else {
ImportConfig importConfig =
options.getImportConfigFilePath() != null
? buildImportConfigFromConfigFile(options.getImportConfigFilePath())
: buildImportConfigFromPipelineOptions(options, options.as(GcsOptions.class));

LOG.info(
String.format(
"SourcePath:%s, RestorePath:%s",
importConfig.getSourcepath(), importConfig.getRestorepath()));
pipeline = buildPipelineWithMultipleSnapshots(options, importConfig);
}

LOG.info("Running Pipeline");
PipelineResult result = pipeline.run();

if (opts.getWait()) {
if (options.getWait()) {
Utils.waitForPipelineToFinish(result);
}
}

@VisibleForTesting
static ImportConfig buildImportConfigFromConfigFile(String configFilePath) throws Exception {
Gson gson = new GsonBuilder().create();
ImportConfig importConfig =
gson.fromJson(SnapshotUtils.readFileContents(configFilePath), ImportConfig.class);
Preconditions.checkNotNull(importConfig.getSourcepath(), MISSING_SNAPSHOT_SOURCEPATH);
Preconditions.checkNotNull(importConfig.getSnapshots(), MISSING_SNAPSHOT_NAMES);
SnapshotUtils.setRestorePath(importConfig);
return importConfig;
}

@VisibleForTesting
static ImportConfig buildImportConfigFromPipelineOptions(
ImportOptions options, GcsOptions gcsOptions) throws IOException {
Preconditions.checkArgument(
options.getHbaseSnapshotSourceDir() != null, MISSING_SNAPSHOT_SOURCEPATH);
Preconditions.checkArgument(options.getSnapshots() != null, MISSING_SNAPSHOT_NAMES);

Map<String, String> snapshots =
SnapshotUtils.isRegex(options.getSnapshots())
? SnapshotUtils.getSnapshotsFromSnapshotPath(
options.getHbaseSnapshotSourceDir(),
gcsOptions.getGcsUtil(),
options.getSnapshots())
: SnapshotUtils.getSnapshotsFromString(options.getSnapshots());

ImportConfig importConfig = new ImportConfig();
importConfig.setSourcepath(options.getHbaseSnapshotSourceDir());
importConfig.setSnapshotsFromMap(snapshots);
SnapshotUtils.setRestorePath(importConfig);
return importConfig;
}

/**
* Builds the pipeline that supports loading multiple snapshots to BigTable.
*
* @param options - Pipeline options
* @param importConfig - Configuration representing snapshot source path, list of snapshots etc
* @return
* @throws Exception
*/
static Pipeline buildPipelineWithMultipleSnapshots(
ImportOptions options, ImportConfig importConfig) throws Exception {
Map<String, String> configurations =
SnapshotUtils.getConfiguration(
options.getRunner().getSimpleName(),
options.getProject(),
importConfig.getSourcepath(),
importConfig.getHbaseConfiguration());

List<SnapshotConfig> snapshotConfigs =
SnapshotUtils.buildSnapshotConfigs(
importConfig.getSnapshots(),
configurations,
options.getProject(),
importConfig.getSourcepath(),
importConfig.getRestorepath());

Pipeline pipeline = Pipeline.create(options);

PCollection<SnapshotConfig> restoredSnapshots =
pipeline
.apply("Read Snapshot Configs", Create.of(snapshotConfigs))
.apply("Restore Snapshots", ParDo.of(new RestoreSnapshot()));

// Read records from hbase region files and write to Bigtable
// PCollection<RegionConfig> hbaseRecords = restoredSnapshots
// .apply("List Regions", new ListRegions());
PCollection<KV<String, Iterable<Mutation>>> hbaseRecords =
restoredSnapshots
.apply("List Regions", new ListRegions())
.apply("Read Regions", new ReadRegions(options.getUseDynamicSplitting()));

options.setBigtableTableId(ValueProvider.StaticValueProvider.of("NA"));
CloudBigtableTableConfiguration bigtableConfiguration =
TemplateUtils.buildImportConfig(options, "HBaseSnapshotImportJob");
if (importConfig.getBigtableConfiguration() != null) {
CloudBigtableTableConfiguration.Builder builder = bigtableConfiguration.toBuilder();
for (Map.Entry<String, String> entry : importConfig.getBigtableConfiguration().entrySet())
builder = builder.withConfiguration(entry.getKey(), entry.getValue());
bigtableConfiguration = builder.build();
}

hbaseRecords.apply(
"Write to BigTable", CloudBigtableIO.writeToMultipleTables(bigtableConfiguration));

// Clean up all the temporary restored snapshot HLinks after reading all the data
restoredSnapshots
.apply(Wait.on(hbaseRecords))
.apply("Clean restored files", ParDo.of(new CleanupRestoredSnapshots()));

return pipeline;
}

/**
* Builds the pipeline that supports loading single snapshot to BigTable. Maintained for backward
* compatiablity and will be deprecated merging the functionality to
* buildPipelineWithMultipleSnapshots method.
*
* @param opts - Pipeline options
* @return
* @throws Exception
*/
@VisibleForTesting
static Pipeline buildPipeline(ImportOptions opts) throws Exception {
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
Expand Down Expand Up @@ -133,7 +300,7 @@ static Pipeline buildPipeline(ImportOptions opts) throws Exception {
pipeline
.apply(Create.of(sourceAndRestoreFolders))
.apply(Wait.on(readResult))
.apply(ParDo.of(new CleanupHBaseSnapshotRestoreFilesFn()));
.apply(ParDo.of(new CleanupHBaseSnapshotRestoreFiles()));

return pipeline;
}
Expand Down
Loading