Skip to content

Commit

Permalink
Merge pull request #28 from dgzpg/feat-addAwsLogsImpl
Browse files Browse the repository at this point in the history
Feat add aws logs impl
  • Loading branch information
dgzpg authored Dec 1, 2021
2 parents 6a01fcc + 05d707d commit b22575f
Show file tree
Hide file tree
Showing 6 changed files with 509 additions and 2 deletions.
62 changes: 60 additions & 2 deletions capa-spi-aws-telemetry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,80 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>capa-aws-parent</artifactId>
<groupId>group.rxcloud</groupId>
<version>1.0.4-alpha-1</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>capa-spi-aws-telemetry</artifactId>
<name>capa-skd-spi-aws-cloudwatch</name>
<properties>
<cloud.watch.version>2.16.29</cloud.watch.version>
<cloud.watch.events.version>2.17.61</cloud.watch.events.version>
<cloud.watch.logs.version>2.17.59</cloud.watch.logs.version>
<log4j.version>2.8.2</log4j.version>
<logback.version>1.1.7</logback.version>
<slf4j.version>1.7.21</slf4j.version>
<commons-collections4.version>4.2</commons-collections4.version>
<commons-lang3.version>3.3.2</commons-lang3.version>
<gson.version>2.8.0</gson.version>
<capa-foundation.version>1.0.2.RELEASE</capa-foundation.version>
</properties>

<dependencies>
<dependency>
<groupId>group.rxcloud</groupId>
<artifactId>capa-spi-aws-infrastructure</artifactId>
</dependency>
<dependency>
<groupId>group.rxcloud</groupId>
<artifactId>capa-foundation</artifactId>
<version>${capa-foundation.version}</version>
</dependency>
<!-- cloud watch logs-->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>cloudwatchlogs</artifactId>
<version>${cloud.watch.logs.version}</version>
</dependency>
<!-- logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<!-- log4j-slf4j-impl and logback-classic cannot exist at the same time. -->
<optional>true</optional>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>

<!--log4j-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>

<dependency>
<groupId>group.rxcloud</groupId>
<artifactId>capa-id-generator</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package group.rxcloud.capa.spi.aws.telemetry.log.appender;

import com.google.gson.Gson;
import group.rxcloud.capa.component.telemetry.context.CapaContext;
import group.rxcloud.capa.infrastructure.hook.Mixer;
import group.rxcloud.capa.infrastructure.hook.TelemetryHooks;
import group.rxcloud.capa.spi.aws.telemetry.log.service.CloudWatchLogsService;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* The abstract capa aws log appender.
*/
public abstract class AbstractCapaAwsLogAppender {

/**
* Tag identifier prefix.
*/
protected static final String TAG_PREFIX = "[[";
/**
* Tag identifier suffix.
*/
protected static final String TAG_SUFFIX = "]]";
/**
* The name of log source data.
*/
protected static final String LOG_DATA_NAME = "logData";
/**
* The name of log level.
*/
protected static final String LOG_LEVEL_NAME = "logLevel";
/**
* The name of log's _trace_id.
*/
protected static final String TRACE_ID_NAME = "_trace_id";
/**
* Number of counts each time.
*/
protected static final Integer COUNTER_NUM = 1;
/**
* Init a {@link Gson} instance.
*/
private static final Gson GSON = new Gson();
/**
* The instance of the {@link TelemetryHooks}.
*/
private static final Optional<TelemetryHooks> TELEMETRY_HOOKS;
/**
* The namespace for logging error.
*/
private static final String LOG_ERROR_NAMESPACE = "CloudWatchLogs";
/**
* The metric name for logging error.
*/
private static final String LOG_ERROR_METRIC_NAME = "LogError";
/**
* Init an instance of {@link LongCounter}.
*/
protected static Optional<LongCounter> LONG_COUNTER = Optional.empty();

/**
* Init telemetry hooks and longCounter.
*/
static {
TELEMETRY_HOOKS = Mixer.telemetryHooksNullable();
TELEMETRY_HOOKS.ifPresent(telemetryHooks -> {
Meter meter = telemetryHooks.buildMeter(LOG_ERROR_NAMESPACE).block();
LongCounter longCounter = meter.counterBuilder(LOG_ERROR_METRIC_NAME).build();
LONG_COUNTER = Optional.ofNullable(longCounter);
});
}

/**
*
* @param message
* @param tagsEndIndex
* @return
*/
protected Map<String, String> parseTags(String message, int tagsEndIndex) {
Map<String, String> tags = null;
int tagStart = 2;
while (tagStart < tagsEndIndex) {
int tagEnd = message.indexOf(',', tagStart);
if (tagEnd < 0 || tagEnd > tagsEndIndex) {
tagEnd = tagsEndIndex;
}
int equalIndex = message.indexOf('=', tagStart);
if (equalIndex > tagStart && equalIndex < tagEnd - 1) {
String key = message.substring(tagStart, equalIndex);
String value = message.substring(equalIndex + 1, tagEnd);
if (tags == null) {
tags = new HashMap<>();
}
tags.put(key, value);
}
tagStart = tagEnd + 1;
}
return tags;
}

protected Map<String, String> appendMDCTags(Map<String, String> tags, Map<String, String> MDCTags) {
if (MDCTags != null && !MDCTags.isEmpty()) {
if (tags == null) {
return new HashMap<String, String>(MDCTags);
} else {
tags.putAll(MDCTags);
return tags;
}
}
return tags;
}

protected void appendLogs(String message, Map<String, String> MDCTags, String logLevel) {
if (StringUtils.isBlank(message)) {
message = StringUtils.EMPTY;
}
Map<String, String> tags = new HashMap<>();
tags.put(LOG_LEVEL_NAME, logLevel);
if (message.startsWith(TAG_PREFIX)) {
int tagsEndIndex = message.indexOf(TAG_SUFFIX);
if (tagsEndIndex > 0) {
tags = this.parseTags(message, tagsEndIndex);
if (tags != null) {
message = message.substring(tagsEndIndex + 2);
}
}
}
tags = appendMDCTags(tags, MDCTags);

Map<String, String> logMessageMap = new HashMap<>();
if (StringUtils.isNotEmpty(message)) {
logMessageMap.put(LOG_DATA_NAME, message);
}
if (tags != null && !tags.isEmpty()) {
logMessageMap.putAll(tags);
}
Map<String, String> defaultTags = this.getDefaultTags();
if (defaultTags != null && !defaultTags.isEmpty()) {
logMessageMap.putAll(defaultTags);
}
// put logs to CloudWatchLogs
if (!logMessageMap.isEmpty()) {
String logMessage = GSON.toJson(logMessageMap);
CloudWatchLogsService.putLogEvent(logMessage);
}
}

protected Map<String, String> getDefaultTags() {
Map<String, String> defaultTags = new HashMap<>();
// traceId
if (StringUtils.isNotBlank(CapaContext.getTraceId())) {
defaultTags.put(TRACE_ID_NAME, CapaContext.getTraceId());
}
return defaultTags;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package group.rxcloud.capa.spi.aws.telemetry.log.appender;

import group.rxcloud.capa.component.log.agent.CapaLog4jAppenderAgent;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
import org.apache.logging.log4j.util.ReadOnlyStringMap;

import java.util.HashMap;
import java.util.Map;

public class CapaAwsLog4jAppender extends AbstractCapaAwsLogAppender
implements CapaLog4jAppenderAgent.CapaLog4jAppender {
/**
* The error type name of the log4j appender.
*/
protected static final String LOG_LOG4J_APPENDER_ERROR_TYPE = "Log4jAppendLogsError";

static {
PluginManager.addPackage("group.rxcloud.capa.spi.aws.telemetry.log.appender");
}

@Override
public void appendLog(LogEvent event) {
try {
if (event == null
|| event.getLevel() == null
|| event.getMessage() == null) {
return;
}
String message = event.getMessage().getFormattedMessage();
ReadOnlyStringMap contextData = event.getContextData();
Map<String, String> MDCTags = contextData == null ? new HashMap<>() : contextData.toMap();
super.appendLogs(message, MDCTags, event.getLevel().name());
} catch (Exception e) {
LONG_COUNTER.ifPresent(longCounter -> {
longCounter.bind(Attributes.of(AttributeKey.stringKey(LOG_LOG4J_APPENDER_ERROR_TYPE), e.getMessage()))
.add(COUNTER_NUM);
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package group.rxcloud.capa.spi.aws.telemetry.log.appender;

import ch.qos.logback.classic.spi.ILoggingEvent;
import group.rxcloud.capa.component.log.agent.CapaLogbackAppenderAgent;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;

import java.util.Map;

public class CapaAwsLogbackAppender extends AbstractCapaAwsLogAppender
implements CapaLogbackAppenderAgent.CapaLogbackAppender {

/**
* The error type name of the logback appender.
*/
protected static final String LOG_LOGBACK_APPENDER_ERROR_TYPE = "LogbackAppendLogsError";

@Override
public void appendLog(ILoggingEvent event) {
try {
if (event == null || event.getLevel() == null) {
return;
}
String message = event.getFormattedMessage();
Map<String, String> MDCTags = event.getMDCPropertyMap();
super.appendLogs(message, MDCTags, event.getLevel().levelStr);
} catch (Exception e) {
LONG_COUNTER.ifPresent(longCounter -> {
longCounter.bind(Attributes.of(AttributeKey.stringKey(LOG_LOGBACK_APPENDER_ERROR_TYPE), e.getMessage()))
.add(COUNTER_NUM);
});
}
}
}
Loading

0 comments on commit b22575f

Please sign in to comment.