Skip to content

Commit

Permalink
feat: add Lineage metrics for CloudBigtableIO
Browse files Browse the repository at this point in the history
  • Loading branch information
ad548 committed Nov 5, 2024
1 parent 4033a54 commit 8d18829
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_ADMIN_HOST_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_HOST_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
Expand All @@ -30,11 +32,13 @@
import java.util.Random;
import java.util.UUID;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -144,6 +148,8 @@ public void testWriteToBigtable() throws IOException {
options.setAppName("testWriteToBigtable-" + System.currentTimeMillis());
LOG.info(
String.format("Started writeToBigtable test with jobName as: %s", options.getAppName()));
System.out.println(
String.format("Started writeToBigtable test with jobName as: %s", options.getAppName()));

CloudBigtableTableConfiguration.Builder configBuilder =
new CloudBigtableTableConfiguration.Builder()
Expand All @@ -165,16 +171,21 @@ public void testWriteToBigtable() throws IOException {
keys.add(RandomStringUtils.randomAlphanumeric(10));
}

PipelineResult.State result =
Pipeline.create(options)
DataflowPipelineJob result =
(DataflowPipelineJob) Pipeline.create(options)
.apply("Keys", Create.of(keys))
.apply("Create Puts", ParDo.of(WRITE_ONE_TENTH_PERCENT))
.apply("Write to BT", CloudBigtableIO.writeToTable(config))
.getPipeline()
.run()
.waitUntilFinish();
.run();
LOG.info(
String.format("Ran writeToBigtable test with job ID as: %s", result.getJobId()));
System.out.println(
String.format("Ran writeToBigtable test with job ID as: %s", result.getJobId()));
PipelineResult.State state =
result.waitUntilFinish();

Assert.assertEquals(PipelineResult.State.DONE, result);
Assert.assertEquals(PipelineResult.State.DONE, state);

try (ResultScanner scanner =
connection.getTable(tableName).getScanner(new Scan().setFilter(new KeyOnlyFilter()))) {
Expand All @@ -184,6 +195,10 @@ public void testWriteToBigtable() throws IOException {
}
Assert.assertEquals(TOTAL_ROW_COUNT, count);
}
LOG.info(String.format("lineage: %s", Lineage.query(result.metrics(), Lineage.Type.SINK)));
assertThat(Lineage.query(result.metrics(), Lineage.Type.SINK),
contains(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(),
config.getTableId())));
}

@Test
Expand Down Expand Up @@ -236,8 +251,18 @@ public void testReadFromBigtable() throws IOException {

PAssert.thatSingleton(count).isEqualTo(TOTAL_ROW_COUNT);

PipelineResult.State result = pipeLine.run().waitUntilFinish();
Assert.assertEquals(PipelineResult.State.DONE, result);
DataflowPipelineJob result = (DataflowPipelineJob) pipeLine.run();
LOG.info(
String.format("Ran writeToBigtable test with job ID as: %s", result.getJobId()));
System.out.println(
String.format("Ran writeToBigtable test with job ID as: %s", result.getJobId()));

PipelineResult.State state = result.waitUntilFinish();
Assert.assertEquals(PipelineResult.State.DONE, state);
LOG.info(String.format("lineage: %s", Lineage.query(result.metrics(), Lineage.Type.SOURCE)));
assertThat(Lineage.query(result.metrics(), Lineage.Type.SOURCE),
contains(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(),
config.getTableId())));
}

private static byte[] createRandomValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.beam.sdk.metrics.Lineage;

/**
* Utilities to create {@link PTransform}s for reading and writing <a
Expand Down Expand Up @@ -666,6 +667,8 @@ static class Reader extends BoundedReader<Result> {

private final AtomicInteger attempt = new AtomicInteger(3);

private transient boolean reportedLineage;

@VisibleForTesting
Reader(CloudBigtableIO.AbstractSource source) {
this.source = source;
Expand All @@ -680,6 +683,7 @@ static class Reader extends BoundedReader<Result> {
public boolean start() throws IOException {
initializeScanner();
workStart = System.currentTimeMillis();
reportLineageOnce();
return advance();
}

Expand Down Expand Up @@ -906,6 +910,15 @@ public String toString() {
Bytes.toStringBinary(rangeTracker.getStartPosition().getBytes()),
Bytes.toStringBinary(rangeTracker.getStopPosition().getBytes()));
}

void reportLineageOnce() {
if (!reportedLineage) {
Lineage.getSources().add(
String.format("bigtable:%s.%s.%s", source.getConfiguration().getProjectId(),
source.getConfiguration().getInstanceId(), source.getConfiguration().getTableId()));
reportedLineage = true;
}
}
}

///////////////////// Write Class /////////////////////////////////
Expand Down Expand Up @@ -969,6 +982,7 @@ public static class CloudBigtableSingleTableBufferedWriteFn
extends BufferedMutatorDoFn<Mutation> {
private static final long serialVersionUID = 2L;
private transient BufferedMutator mutator;
private transient boolean reportedLineage;

public CloudBigtableSingleTableBufferedWriteFn(CloudBigtableTableConfiguration config) {
super(config);
Expand Down Expand Up @@ -1006,6 +1020,16 @@ public synchronized void finishBundle(@SuppressWarnings("unused") FinishBundleCo
logExceptions(null, exception);
rethrowException(exception);
}
reportLineageOnce();
}

void reportLineageOnce() {
if (!reportedLineage) {
Lineage.getSinks()
.add(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(),
((CloudBigtableTableConfiguration) getConfig()).getTableId()));
reportedLineage = true;
}
}
}

Expand All @@ -1029,6 +1053,7 @@ public static class CloudBigtableMultiTableWriteFn

// Stats
private transient Map<String, BufferedMutator> mutators;
private transient boolean reportedLineage;

public CloudBigtableMultiTableWriteFn(CloudBigtableConfiguration config) {
super(config);
Expand Down Expand Up @@ -1084,6 +1109,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
}
} finally {
mutators.clear();
reportLineageOnce();
}
}

void reportLineageOnce() {
if (!reportedLineage) {
for (String tableName : mutators.keySet()) {
Lineage.getSinks()
.add(String.format("bigtable:%s.%s.%s", config.getProjectId(),
config.getInstanceId(), tableName));
reportedLineage = true;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ limitations under the License.
<truth.version>1.1.5</truth.version>
<hamcrest.version>1.3</hamcrest.version>
<mockito.version>4.11.0</mockito.version>
<beam.version>2.58.0</beam.version>
<beam.version>2.60.0</beam.version>
<!-- referred from bigtable-beam-import and bigtable-emulator -->
<guava.version>31.1-jre</guava.version>
<opencensus.version>0.31.1</opencensus.version>
Expand Down

0 comments on commit 8d18829

Please sign in to comment.