Skip to content

Commit

Permalink
Add Lineage metrics for CloudBigtableIO
Browse files Browse the repository at this point in the history
  • Loading branch information
ad548 committed Sep 4, 2024
1 parent ae4452a commit 665d2fe
Showing 1 changed file with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.beam.sdk.metrics.Lineage;

/**
* Utilities to create {@link PTransform}s for reading and writing <a
Expand Down Expand Up @@ -666,6 +667,8 @@ static class Reader extends BoundedReader<Result> {

private final AtomicInteger attempt = new AtomicInteger(3);

private transient boolean reportedLineage;

@VisibleForTesting
Reader(CloudBigtableIO.AbstractSource source) {
this.source = source;
Expand All @@ -680,6 +683,7 @@ static class Reader extends BoundedReader<Result> {
public boolean start() throws IOException {
initializeScanner();
workStart = System.currentTimeMillis();
reportLineageOnce();
return advance();
}

Expand Down Expand Up @@ -906,6 +910,15 @@ public String toString() {
Bytes.toStringBinary(rangeTracker.getStartPosition().getBytes()),
Bytes.toStringBinary(rangeTracker.getStopPosition().getBytes()));
}

void reportLineageOnce() {
if (!reportedLineage) {
Lineage.getSources().add(
String.format("bigtable:%s.%s.%s", source.getConfiguration().getProjectId(),
source.getConfiguration().getInstanceId(), source.getConfiguration().getTableId()));
reportedLineage = true;
}
}
}

///////////////////// Write Class /////////////////////////////////
Expand Down Expand Up @@ -969,6 +982,7 @@ public static class CloudBigtableSingleTableBufferedWriteFn
extends BufferedMutatorDoFn<Mutation> {
private static final long serialVersionUID = 2L;
private transient BufferedMutator mutator;
private transient boolean reportedLineage;

public CloudBigtableSingleTableBufferedWriteFn(CloudBigtableTableConfiguration config) {
super(config);
Expand Down Expand Up @@ -1006,6 +1020,16 @@ public synchronized void finishBundle(@SuppressWarnings("unused") FinishBundleCo
logExceptions(null, exception);
rethrowException(exception);
}
reportLineageOnce();
}

void reportLineageOnce() {
if (!reportedLineage) {
Lineage.getSinks()
.add(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(),
((CloudBigtableTableConfiguration) getConfig()).getTableId()));
reportedLineage = true;
}
}
}

Expand All @@ -1029,6 +1053,7 @@ public static class CloudBigtableMultiTableWriteFn

// Stats
private transient Map<String, BufferedMutator> mutators;
private transient boolean reportedLineage;

public CloudBigtableMultiTableWriteFn(CloudBigtableConfiguration config) {
super(config);
Expand Down Expand Up @@ -1084,6 +1109,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
}
} finally {
mutators.clear();
reportLineageOnce();
}
}

void reportLineageOnce() {
if (!reportedLineage) {
for (String tableName : mutators.keySet()) {
Lineage.getSinks()
.add(String.format("bigtable:%s.%s.%s", config.getProjectId(),
config.getInstanceId(), tableName));
reportedLineage = true;
}
}
}
}
Expand Down

0 comments on commit 665d2fe

Please sign in to comment.