-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cloudwatch Metrics Integration #8
base: cloudwatch
Are you sure you want to change the base?
Conversation
Works on Spark 3.1 Currently Default Metrics is Hardcoded Need to Add More Tests
aws/src/main/java/org/apache/iceberg/aws/cloudwatch/CloudWatchMetricsContext.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/cloudwatch/CloudWatchMetricsContext.java
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/cloudwatch/CloudWatchMetricsContext.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/cloudwatch/CloudWatchMetricsContext.java
Outdated
Show resolved
Hide resolved
@@ -119,7 +119,7 @@ public void initialize(Map<String, String> properties) { | |||
// Report Hadoop metrics if Hadoop is available | |||
try { | |||
DynConstructors.Ctor<MetricsContext> ctor = | |||
DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL, String.class).buildChecked(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point on this, I think this is the right way, scheme is specific to Hadoop FileSystem, can you also make the change in HadoopMetricsContext to make the implementations consistent?
@@ -119,7 +119,7 @@ public void initialize(Map<String, String> properties) { | |||
// Report Hadoop metrics if Hadoop is available | |||
try { | |||
DynConstructors.Ctor<MetricsContext> ctor = | |||
DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL, String.class).buildChecked(); | |||
DynConstructors.builder(MetricsContext.class).impl(DEFAULT_METRICS_IMPL).buildChecked(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hiddenImpl is fine, there might be class with protected constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we are using impl
in all CatalogUtil
calls, let's keep it that way and use impl
then
@@ -43,7 +43,7 @@ | |||
*/ | |||
public class S3FileIO implements FileIO { | |||
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class); | |||
private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default is fine here, but I think the default should be moved to CatalogProperties when you add io-metrics-impl
config key
@Override | ||
public void increment(Number amount) { | ||
MetricDatum metricData = MetricDatum.builder().value(amount.doubleValue()).unit(unit).metricName(metric).build(); | ||
PutMetricDataRequest dataRequest = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the queue should not be putting requests, but just datum's. In the worker you perform batch request of metric datum's.
metricsQueue.add(dataRequest); | ||
} | ||
|
||
public static class Worker extends Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a worker definition is not needed, you can use functional defintion like () -> { .. }
for a runnable.
Also see how S3OutputStream
creates an executorService
with a thread pool, that is a safer pattern to use comparing to initializing a fixed number of workers.
} | ||
|
||
public CloudWatchMetricsContext(SerializableSupplier<CloudWatchClient> cloudWatch) { | ||
this((CloudWatchClient) cloudWatch, new String()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never call new String()
. the default value should be passed in
if (cloudWatch == null) { | ||
this.cloudWatch = AwsClientFactories.from(properties)::cloudWatch; | ||
} | ||
this.namespace = properties.getOrDefault(NAMESPACE, "Iceberg/S3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lower case iceberg/s3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be default value as a static variable
Commit 1: