From 24012e165c2023d873aa39ad2380eb6e3c7802e4 Mon Sep 17 00:00:00 2001 From: Anwesha Das Date: Wed, 4 Sep 2024 15:51:02 +0000 Subject: [PATCH] feat: add Lineage metrics for CloudBigtableIO --- .../bigtable-beam-import/pom.xml | 1 + .../bigtable/beam/it/CloudBigtableBeamIT.java | 24 +++++++++--- .../cloud/bigtable/beam/CloudBigtableIO.java | 37 +++++++++++++++++++ 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/bigtable-dataflow-parent/bigtable-beam-import/pom.xml b/bigtable-dataflow-parent/bigtable-beam-import/pom.xml index 999f4fd870..57ec41229a 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/pom.xml +++ b/bigtable-dataflow-parent/bigtable-beam-import/pom.xml @@ -46,6 +46,7 @@ limitations under the License. org.apache.beam beam-sdks-java-core + ${beam.version} org.apache.beam diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/it/CloudBigtableBeamIT.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/it/CloudBigtableBeamIT.java index 6564850933..361c590d31 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/it/CloudBigtableBeamIT.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/it/CloudBigtableBeamIT.java @@ -17,6 +17,8 @@ import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_ADMIN_HOST_KEY; import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_HOST_KEY; +import static com.google.common.truth.Truth.assertThat; +import static org.hamcrest.Matchers.hasItem; import com.google.cloud.bigtable.beam.CloudBigtableIO; import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration; @@ -35,6 +37,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -65,6 +68,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.apache.beam.sdk.metrics.Lineage; /** * This class contains integration test for Beam Dataflow.It creates dataflow pipelines that perform @@ -165,16 +169,17 @@ public void testWriteToBigtable() throws IOException { keys.add(RandomStringUtils.randomAlphanumeric(10)); } - PipelineResult.State result = + PipelineResult result = Pipeline.create(options) .apply("Keys", Create.of(keys)) .apply("Create Puts", ParDo.of(WRITE_ONE_TENTH_PERCENT)) .apply("Write to BT", CloudBigtableIO.writeToTable(config)) .getPipeline() - .run() - .waitUntilFinish(); + .run(); + PipelineResult.State state = + result.waitUntilFinish(); - Assert.assertEquals(PipelineResult.State.DONE, result); + Assert.assertEquals(PipelineResult.State.DONE, state); try (ResultScanner scanner = connection.getTable(tableName).getScanner(new Scan().setFilter(new KeyOnlyFilter()))) { @@ -184,6 +189,9 @@ public void testWriteToBigtable() throws IOException { } Assert.assertEquals(TOTAL_ROW_COUNT, count); } + assertThat(Lineage.query(result.metrics(), Lineage.Type.SINK), + hasItem(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(), + config.getTableId()))); } @Test @@ -236,8 +244,12 @@ public void testReadFromBigtable() throws IOException { PAssert.thatSingleton(count).isEqualTo(TOTAL_ROW_COUNT); - PipelineResult.State result = pipeLine.run().waitUntilFinish(); - Assert.assertEquals(PipelineResult.State.DONE, result); + PipelineResult result = pipeLine.run(); + PipelineResult.State state = result.waitUntilFinish(); + Assert.assertEquals(PipelineResult.State.DONE, state); + assertThat(Lineage.query(result.metrics(), Lineage.Type.SOURCE), + hasItem(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(), + config.getTableId()))); } private static byte[] createRandomValue() { diff --git a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java index dcc0441a8f..b827555b25 100755 --- a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java +++ b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.beam.sdk.metrics.Lineage; /** * Utilities to create {@link PTransform}s for reading and writing { private final AtomicInteger attempt = new AtomicInteger(3); + private transient boolean reportedLineage; + @VisibleForTesting Reader(CloudBigtableIO.AbstractSource source) { this.source = source; @@ -680,6 +683,7 @@ static class Reader extends BoundedReader { public boolean start() throws IOException { initializeScanner(); workStart = System.currentTimeMillis(); + reportLineageOnce(); return advance(); } @@ -906,6 +910,15 @@ public String toString() { Bytes.toStringBinary(rangeTracker.getStartPosition().getBytes()), Bytes.toStringBinary(rangeTracker.getStopPosition().getBytes())); } + + void reportLineageOnce() { + if (!reportedLineage) { + Lineage.getSources().add( + String.format("bigtable:%s.%s.%s", source.getConfiguration().getProjectId(), + source.getConfiguration().getInstanceId(), source.getConfiguration().getTableId())); + reportedLineage = true; + } + } } ///////////////////// Write Class ///////////////////////////////// @@ -969,6 +982,7 @@ public static class CloudBigtableSingleTableBufferedWriteFn extends BufferedMutatorDoFn { private static final long serialVersionUID = 2L; private transient BufferedMutator mutator; + private transient boolean reportedLineage; public CloudBigtableSingleTableBufferedWriteFn(CloudBigtableTableConfiguration config) { super(config); @@ -1006,6 +1020,16 @@ public synchronized void finishBundle(@SuppressWarnings("unused") FinishBundleCo logExceptions(null, exception); rethrowException(exception); } + reportLineageOnce(); + } + + void reportLineageOnce() { + if (!reportedLineage) { + Lineage.getSinks() + .add(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(), + ((CloudBigtableTableConfiguration) getConfig()).getTableId())); + reportedLineage = true; + } } } @@ -1029,6 +1053,7 @@ public static class CloudBigtableMultiTableWriteFn // Stats private transient Map mutators; + private transient boolean reportedLineage; public CloudBigtableMultiTableWriteFn(CloudBigtableConfiguration config) { super(config); @@ -1084,6 +1109,18 @@ public void finishBundle(FinishBundleContext c) throws Exception { } } finally { mutators.clear(); + reportLineageOnce(); + } + } + + void reportLineageOnce() { + if (!reportedLineage) { + for (String tableName : mutators.keySet()) { + Lineage.getSinks() + .add(String.format("bigtable:%s.%s.%s", config.getProjectId(), + config.getInstanceId(), tableName)); + reportedLineage = true; + } } } }