Skip to content

Commit

Permalink
feat: add Lineage metrics for CloudBigtableIO
Browse files Browse the repository at this point in the history
  • Loading branch information
ad548 committed Oct 30, 2024
1 parent 4033a54 commit a052fbf
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_ADMIN_HOST_KEY;
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_HOST_KEY;
import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.Matchers.hasItem;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
Expand All @@ -35,6 +37,7 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -65,6 +68,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.apache.beam.sdk.metrics.Lineage;

/**
* This class contains integration test for Beam Dataflow.It creates dataflow pipelines that perform
Expand Down Expand Up @@ -165,16 +169,17 @@ public void testWriteToBigtable() throws IOException {
keys.add(RandomStringUtils.randomAlphanumeric(10));
}

PipelineResult.State result =
PipelineResult result =
Pipeline.create(options)
.apply("Keys", Create.of(keys))
.apply("Create Puts", ParDo.of(WRITE_ONE_TENTH_PERCENT))
.apply("Write to BT", CloudBigtableIO.writeToTable(config))
.getPipeline()
.run()
.waitUntilFinish();
.run();
PipelineResult.State state =
result.waitUntilFinish();

Assert.assertEquals(PipelineResult.State.DONE, result);
Assert.assertEquals(PipelineResult.State.DONE, state);

try (ResultScanner scanner =
connection.getTable(tableName).getScanner(new Scan().setFilter(new KeyOnlyFilter()))) {
Expand All @@ -184,6 +189,9 @@ public void testWriteToBigtable() throws IOException {
}
Assert.assertEquals(TOTAL_ROW_COUNT, count);
}
assertThat(Lineage.query(result.metrics(), Lineage.Type.SINK),
hasItem(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(),
config.getTableId())));
}

@Test
Expand Down Expand Up @@ -236,8 +244,12 @@ public void testReadFromBigtable() throws IOException {

PAssert.thatSingleton(count).isEqualTo(TOTAL_ROW_COUNT);

PipelineResult.State result = pipeLine.run().waitUntilFinish();
Assert.assertEquals(PipelineResult.State.DONE, result);
PipelineResult result = pipeLine.run();
PipelineResult.State state = result.waitUntilFinish();
Assert.assertEquals(PipelineResult.State.DONE, state);
assertThat(Lineage.query(result.metrics(), Lineage.Type.SOURCE),
hasItem(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(),
config.getTableId())));
}

private static byte[] createRandomValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.beam.sdk.metrics.Lineage;

/**
* Utilities to create {@link PTransform}s for reading and writing <a
Expand Down Expand Up @@ -666,6 +667,8 @@ static class Reader extends BoundedReader<Result> {

private final AtomicInteger attempt = new AtomicInteger(3);

private transient boolean reportedLineage;

@VisibleForTesting
Reader(CloudBigtableIO.AbstractSource source) {
this.source = source;
Expand All @@ -680,6 +683,7 @@ static class Reader extends BoundedReader<Result> {
public boolean start() throws IOException {
initializeScanner();
workStart = System.currentTimeMillis();
reportLineageOnce();
return advance();
}

Expand Down Expand Up @@ -906,6 +910,15 @@ public String toString() {
Bytes.toStringBinary(rangeTracker.getStartPosition().getBytes()),
Bytes.toStringBinary(rangeTracker.getStopPosition().getBytes()));
}

void reportLineageOnce() {
if (!reportedLineage) {
Lineage.getSources().add(
String.format("bigtable:%s.%s.%s", source.getConfiguration().getProjectId(),
source.getConfiguration().getInstanceId(), source.getConfiguration().getTableId()));
reportedLineage = true;
}
}
}

///////////////////// Write Class /////////////////////////////////
Expand Down Expand Up @@ -969,6 +982,7 @@ public static class CloudBigtableSingleTableBufferedWriteFn
extends BufferedMutatorDoFn<Mutation> {
private static final long serialVersionUID = 2L;
private transient BufferedMutator mutator;
private transient boolean reportedLineage;

public CloudBigtableSingleTableBufferedWriteFn(CloudBigtableTableConfiguration config) {
super(config);
Expand Down Expand Up @@ -1006,6 +1020,16 @@ public synchronized void finishBundle(@SuppressWarnings("unused") FinishBundleCo
logExceptions(null, exception);
rethrowException(exception);
}
reportLineageOnce();
}

void reportLineageOnce() {
if (!reportedLineage) {
Lineage.getSinks()
.add(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(),
((CloudBigtableTableConfiguration) getConfig()).getTableId()));
reportedLineage = true;
}
}
}

Expand All @@ -1029,6 +1053,7 @@ public static class CloudBigtableMultiTableWriteFn

// Stats
private transient Map<String, BufferedMutator> mutators;
private transient boolean reportedLineage;

public CloudBigtableMultiTableWriteFn(CloudBigtableConfiguration config) {
super(config);
Expand Down Expand Up @@ -1084,6 +1109,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
}
} finally {
mutators.clear();
reportLineageOnce();
}
}

void reportLineageOnce() {
if (!reportedLineage) {
for (String tableName : mutators.keySet()) {
Lineage.getSinks()
.add(String.format("bigtable:%s.%s.%s", config.getProjectId(),
config.getInstanceId(), tableName));
reportedLineage = true;
}
}
}
}
Expand Down

0 comments on commit a052fbf

Please sign in to comment.