Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloudwatch Metrics Integration #8

Open
wants to merge 4 commits into
base: cloudwatch
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.s3.S3Client;

Expand All @@ -56,6 +57,7 @@ public class GlueTestBase {
static final AwsClientFactory clientFactory = AwsClientFactories.defaultFactory();
static final GlueClient glue = clientFactory.glue();
static final S3Client s3 = clientFactory.s3();
static final CloudWatchClient cloudWatch = clientFactory.cloudWatch();

// iceberg
static GlueCatalog glueCatalog;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws.glue;

import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.cloudwatch.CloudWatchMetricsContext;
import org.apache.iceberg.metrics.MetricsContext;
import org.junit.Test;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;

public class TestCloudWatchMetricsContext extends GlueTestBase {
private static final CloudWatchClient cw = clientFactory.cloudWatch();

@Test
public void testDataIncrement() {
CloudWatchMetricsContext context = new CloudWatchMetricsContext(cw, "Check",
CatalogProperties.DEFAULT_METRICS_MODE);
MetricsContext.Counter count = context.counter("read.bytes", Long.class, MetricsContext.Unit.COUNT);
count.increment(30);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
Expand Down Expand Up @@ -69,6 +70,11 @@ public DynamoDbClient dynamo() {
return DynamoDbClient.builder().applyMutation(this::configure).build();
}

@Override
public CloudWatchClient cloudWatch() {
return CloudWatchClient.builder().applyMutation(this::configure).build();
}

@Override
public void initialize(Map<String, String> properties) {
roleArn = properties.get(AwsProperties.CLIENT_ASSUME_ROLE_ARN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
Expand Down Expand Up @@ -110,6 +111,11 @@ public DynamoDbClient dynamo() {
return DynamoDbClient.builder().httpClient(HTTP_CLIENT_DEFAULT).build();
}

@Override
public CloudWatchClient cloudWatch() {
return CloudWatchClient.builder().httpClient(HTTP_CLIENT_DEFAULT).build();
}

@Override
public void initialize(Map<String, String> properties) {
this.s3Endpoint = properties.get(AwsProperties.S3FILEIO_ENDPOINT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Serializable;
import java.util.Map;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
Expand Down Expand Up @@ -56,6 +57,12 @@ public interface AwsClientFactory extends Serializable {
*/
DynamoDbClient dynamo();

/**
* Create a Amazon CloudWatch client
* @return dynamoDB client
*/
CloudWatchClient cloudWatch();

/**
* Initialize AWS client factory from catalog properties.
* @param properties catalog properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws.cloudwatch;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.SerializableSupplier;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.cloudwatchlogs.emf.config.Configuration;
import software.amazon.cloudwatchlogs.emf.config.EnvironmentConfigurationProvider;
import software.amazon.cloudwatchlogs.emf.environment.DefaultEnvironment;
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;

public class CloudWatchMetricsContext implements FileIOMetricsContext {
private SerializableSupplier<CloudWatchClient> cloudWatch;
private transient CloudWatchClient client;
private SerializableSupplier<MetricsLogger> metricsLogger;
private transient MetricsLogger logger;
private String namespace;
private String mode;
private String scheme;

public CloudWatchMetricsContext() {
}

public CloudWatchMetricsContext(String scheme) {
this.scheme = scheme;
}

public CloudWatchMetricsContext(SerializableSupplier<CloudWatchClient> cloudWatch) {
this((CloudWatchClient) cloudWatch,
CatalogProperties.DEFAULT_CLOUDWATCH_NAMESPACE,
CatalogProperties.DEFAULT_METRICS_MODE);
}

public CloudWatchMetricsContext(SerializableSupplier<CloudWatchClient> cloudWatch,
MetricsLogger logger, String namespace, String mode) {
this.cloudWatch = cloudWatch;
this.namespace = namespace;
this.mode = mode;
this.logger = logger;
}

private CloudWatchClient client() {
if (client == null) {
client = cloudWatch.get();
}
return client;
}

private MetricsLogger metricsLogger() {
if (logger == null) {
logger = metricsLogger.get();
}
return logger;
}

public CloudWatchMetricsContext(CloudWatchClient cloudWatch, String namespace, String mode) {
this.client = cloudWatch;
this.namespace = namespace;
this.mode = mode;
}

@Override
public void initialize(Map<String, String> properties) {
AwsClientFactory factory = AwsClientFactories.from(properties);

if (cloudWatch == null) {
this.cloudWatch = AwsClientFactories.from(properties)::cloudWatch;
}

this.namespace = properties.getOrDefault(CatalogProperties.CLOUDWATCH_NAMESPACE,
CatalogProperties.DEFAULT_CLOUDWATCH_NAMESPACE);
this.mode = properties.getOrDefault(CatalogProperties.CLOUDWATCH_MODE, "normal");

Configuration config = EnvironmentConfigurationProvider.getConfig();
DefaultEnvironment environment = new DefaultEnvironment(EnvironmentConfigurationProvider.getConfig());

this.logger = new MetricsLogger(environment);
this.logger.setNamespace(this.namespace);
}

@Override
@SuppressWarnings("unchecked")
public <T extends Number> Counter<T> counter(String name, Class<T> type, Unit unit) {
switch (name) {
case READ_BYTES:
ValidationException.check(type == Long.class, "'%s' requires Long type", READ_BYTES);
if (this.mode.equals(CatalogProperties.CLOUDWATCH_EMBEDDED)) {
return new CloudWatchEmbeddedMetricsCounter("ReadBytes", metricsLogger(), this.namespace, "Bytes");
} else {
return new CloudWatchCounter("ReadBytes", client(), this.namespace, "Bytes");
}
case READ_OPERATIONS:
ValidationException.check(type == Integer.class, "'%s' requires Integer type", READ_OPERATIONS);
if (this.mode.equals(CatalogProperties.CLOUDWATCH_EMBEDDED)) {
return new CloudWatchEmbeddedMetricsCounter("ReadOperations", metricsLogger(), this.namespace, "Count");
} else {
return new CloudWatchCounter("ReadOperations", client(), this.namespace, "Count");
}
case WRITE_BYTES:
ValidationException.check(type == Long.class, "'%s' requires Long type", WRITE_BYTES);
if (this.mode.equals(CatalogProperties.CLOUDWATCH_EMBEDDED)) {
return new CloudWatchEmbeddedMetricsCounter("WriteBytes", metricsLogger(), this.namespace, "Bytes");
} else {
return new CloudWatchCounter("WriteBytes", client(), this.namespace, "Bytes");
}

case WRITE_OPERATIONS:
ValidationException.check(type == Integer.class, "'%s' requires Integer type", WRITE_OPERATIONS);
if (this.mode.equals(CatalogProperties.CLOUDWATCH_EMBEDDED)) {
return new CloudWatchEmbeddedMetricsCounter("WriteOperations", metricsLogger(), this.namespace, "Count");
} else {
return new CloudWatchCounter("WriteOperations", client(), this.namespace, "Count");
}
default:
throw new IllegalArgumentException(String.format("Unsupported counter: '%s'", name));
}
}

private static class CloudWatchCounter implements Counter {
private static ExecutorService executorService;
private static CloudWatchClient cloudWatch;
private String namespace;
private String metric;
private String unit;
private static final int NUMBER_THREADS = 10;

CloudWatchCounter(String metric, CloudWatchClient cloudWatch, String namespace, String unit) {
if (executorService == null) {
synchronized (CloudWatchCounter.class) {
if (executorService == null) {
executorService = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(
NUMBER_THREADS,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("iceberg-cloudwatch-metric-%d")
.build()));
}
}
}

this.cloudWatch = cloudWatch;
this.namespace = namespace;
this.metric = metric;
this.unit = unit;
}

@Override
public void increment() {
increment(1);
}

@Override
public void increment(Number amount) {
kunal0829 marked this conversation as resolved.
Show resolved Hide resolved
MetricDatum metricData = MetricDatum
.builder()
.value(amount.doubleValue())
.unit(unit)
.metricName(metric)
.build();

executorService.execute(new Runnable() {
public void run() {
PutMetricDataRequest dataRequest = PutMetricDataRequest
.builder()
.namespace(namespace)
.metricData(metricData)
.build();
cloudWatch.putMetricData(dataRequest);
}
});
}
}

private static class CloudWatchEmbeddedMetricsCounter implements Counter {
private static MetricsLogger metricsLogger;
private String namespace;
private String metric;
private String unit;

CloudWatchEmbeddedMetricsCounter(String metric, MetricsLogger metricsLogger, String namespace, String unit) {
this.metricsLogger = metricsLogger;
this.namespace = namespace;
this.metric = metric;
this.unit = unit;
}

@Override
public void increment() {
increment(1);
}

@Override
public void increment(Number amount) {

}
}
}
10 changes: 7 additions & 3 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.common.DynConstructors;
Expand All @@ -43,7 +44,6 @@
*/
public class S3FileIO implements FileIO {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext";
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default is fine here, but I think the default should be moved to CatalogProperties when you add io-metrics-impl config key


private SerializableSupplier<S3Client> s3;
private AwsProperties awsProperties;
Expand Down Expand Up @@ -119,12 +119,16 @@ public void initialize(Map<String, String> properties) {
// Report Hadoop metrics if Hadoop is available
try {
DynConstructors.Ctor<MetricsContext> ctor =
DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL, String.class).buildChecked();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point on this, I think this is the right way, scheme is specific to Hadoop FileSystem, can you also make the change in HadoopMetricsContext to make the implementations consistent?

DynConstructors.builder(MetricsContext.class)
.impl(properties.getOrDefault(CatalogProperties.IO_METRICS_IMPL,
CatalogProperties.DEFAULT_METRICS_IMPL), String.class)
.buildChecked();
this.metrics = ctor.newInstance("s3");

metrics.initialize(properties);
} catch (NoSuchMethodException | ClassCastException e) {
LOG.warn("Unable to load metrics class: '{}', falling back to null metrics", DEFAULT_METRICS_IMPL, e);
LOG.warn("Unable to load metrics class: '{}', falling back to null metrics",
CatalogProperties.DEFAULT_METRICS_IMPL, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
Expand Down Expand Up @@ -108,6 +109,11 @@ public DynamoDbClient dynamo() {
return null;
}

@Override
public CloudWatchClient cloudWatch() {
return null;
}

@Override
public void initialize(Map<String, String> properties) {

Expand Down
Loading