Skip to content

Commit

Permalink
Send to Kafka as json.
Browse files Browse the repository at this point in the history
  • Loading branch information
matejonnet committed Dec 1, 2018
1 parent 44179c9 commit 61be081
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 41 deletions.
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<version.org.slf4j>1.7.25</version.org.slf4j>
<version.commons-cli>1.4</version.commons-cli>
<version.jackson>2.9.7</version.jackson>
<version.logback.contrib>0.1.5</version.logback.contrib>

<jdk.min.version>1.8</jdk.min.version>
<maven.min.version>3.2</maven.min.version>
Expand Down Expand Up @@ -114,6 +115,18 @@
<version>1.2.3</version>
</dependency>

<dependency>
<groupId>ch.qos.logback.contrib</groupId>
<artifactId>logback-jackson</artifactId>
<version>${version.logback.contrib}</version>
</dependency>

<dependency>
<groupId>ch.qos.logback.contrib</groupId>
<artifactId>logback-json-classic</artifactId>
<version>${version.logback.contrib}</version>
</dependency>

<dependency>
<groupId>io.undertow</groupId>
<artifactId>undertow-core</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback.contrib</groupId>
<artifactId>logback-jackson</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback.contrib</groupId>
<artifactId>logback-json-classic</artifactId>
</dependency>

<dependency>
<groupId>io.undertow</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -83,7 +85,9 @@ public BuildAgentServer(
false
);
this.options = options;
init(logPath, kafkaConfig, primaryLoggersArr, RandomUtils.randString(8));
Map<String, String> mdcMap = new HashMap<>();
mdcMap.put("ctx", RandomUtils.randString(8));
init(logPath, kafkaConfig, primaryLoggersArr, mdcMap);
}

/**
Expand All @@ -94,7 +98,7 @@ public BuildAgentServer(
Optional<Path> kafkaConfig,
IoLoggerName[] primaryLoggersArr,
Options options,
String logMDC) throws BuildAgentException {
Map<String, String> logMDC) throws BuildAgentException {
this.options = options;
init(logPath, kafkaConfig, primaryLoggersArr, logMDC);
}
Expand All @@ -107,13 +111,13 @@ private void init(
Optional<Path> logPath,
Optional<Path> kafkaConfig,
IoLoggerName[] primaryLoggersArr,
String logMDC) throws BuildAgentException {
Map<String, String> logMDC) throws BuildAgentException {

List<IoLoggerName> primaryLoggers = Arrays.asList(primaryLoggersArr);

if (IoLogLogger.processLog.isInfoEnabled()) {
log.info("Initializing Logger sink.");
sinkChannels.add(new IoLogLogger());
sinkChannels.add(new IoLogLogger(logMDC));
}

if (logPath.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.jboss.pnc.buildagent.server.formatter.LogbackFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.IOException;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -64,21 +65,22 @@ public class IoKafkaLogger implements ReadOnlyChannel {

private long flushTimeoutMillis;

public IoKafkaLogger(Properties properties, String queueTopic, boolean primary, long flushTimeoutMillis, String logMDC) {
public IoKafkaLogger(Properties properties, String queueTopic, boolean primary, long flushTimeoutMillis, Map<String, String> logMDC) {
this.queueTopic = queueTopic;
this.primary = primary;
this.flushTimeoutMillis = flushTimeoutMillis;
kafkaProducer = new KafkaProducer<>(properties);

SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ");
LogbackFormatter logbackFormatter = new LogbackFormatter();

Consumer<Exception> exceptionHandler = (e) -> {
log.error("Error writing log.", e);
deliveryException.compareAndSet(null, e);
};
outputLogger = (bytes) -> {
String now = dateFormat.format(new Date());
send(now + " [" + logMDC + "] [org.jboss.pnc._userlog_.build-log] " + new String(bytes, charset), exceptionHandler);
MDC.setContextMap(logMDC);

String messageJson = logbackFormatter.format(new String(bytes, charset));
send(messageJson, exceptionHandler);
};
}

Expand Down Expand Up @@ -140,4 +142,5 @@ public void close() throws IOException {
kafkaProducer.close();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.function.Consumer;

/**
Expand All @@ -35,8 +37,9 @@ public class IoLogLogger implements ReadOnlyChannel {
private Charset charset = Charset.defaultCharset();
private Consumer<byte[]> outputLogger;

public IoLogLogger() {
public IoLogLogger(Map<String, String> logMDC) {
outputLogger = (bytes) -> {
MDC.setContextMap(logMDC);
processLog.info(new String(bytes, charset));
};
}
Expand Down
40 changes: 37 additions & 3 deletions server/src/main/java/org/jboss/pnc/buildagent/server/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,30 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.jboss.pnc.buildagent.common.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* @author <a href="mailto:[email protected]">Matej Lazar</a>
*/
public class Main {

private static final Logger logger = LoggerFactory.getLogger(Main.class);

private static final String DEFAULT_HOST = "localhost";
private static final String DEFAULT_PORT = "8080";

public static void main(String[] args) throws ParseException, BuildAgentException, InterruptedException {
logger.info("Starting Build Agent.");
Options options = new Options();
options.addOption("b", true, "Address to bind. When not specified " + DEFAULT_HOST + " is used as default.");
options.addOption("p", true, "Port to bind. When not specified " + DEFAULT_PORT + " is used as default.");
Expand All @@ -64,8 +72,20 @@ public static void main(String[] args) throws ParseException, BuildAgentExceptio
if (logMDC == null) {
logMDC = System.getenv("logMDC");
}
if (logMDC == null) {
logMDC = RandomUtils.randString(12);

Optional<Map<String, String>> mdcParamMap;
if (logMDC != null && !logMDC.isEmpty()) {
mdcParamMap = parseMdc(logMDC);
} else {
mdcParamMap = Optional.empty();
}

Map<String, String> mdcMap;
if (mdcParamMap.isPresent()) {
mdcMap = mdcParamMap.get();
} else {
mdcMap = new HashMap<>();
mdcMap.put("ctx", RandomUtils.randString(12));
}

if (cmd.hasOption("h")) {
Expand Down Expand Up @@ -117,7 +137,21 @@ public static void main(String[] args) throws ParseException, BuildAgentExceptio
kafkaPropertiesPath,
primaryLoggers,
buildAgentOptions,
logMDC);
mdcMap);
}

private static Optional<Map<String, String>> parseMdc(String logMDC) {
Map<String, String> mdcMap = new HashMap<>();
String[] keyVals = logMDC.split(",");
for (String keyVal : keyVals) {
String[] split = keyVal.split(":");
if (split.length != 2) {
logger.warn("Invalid logMdc, expected comma-separated list of key value pairs delimited with colon. eg. k1:v1,k2,v2. Found:{}", logMDC);
return Optional.empty();
}
mdcMap.put(split[0], split[1]);
}
return Optional.of(mdcMap);
}

private static String getOption(CommandLine cmd, String opt, String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.jboss.pnc.buildagent.server.formatter;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.ConsoleAppender;
import org.slf4j.LoggerFactory;

/**
* @author <a href="mailto:[email protected]">Matej Lazar</a>
*/
public class LogbackFormatter {

private final Logger logger;
private final ConsoleAppender<ILoggingEvent> appender;

public LogbackFormatter() {
LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
logger = context.getLogger("org.jboss.pnc._userlog_.build-log");
appender = (ConsoleAppender<ILoggingEvent>) logger.getAppender("STDOUT-BUILD-LOG");
}

public String format(String message) {
ILoggingEvent logEvent = new LoggingEvent(LogbackFormatter.class.getName(), logger, Level.INFO, message, null, new Object[0]);
byte[] encode = appender.getEncoder().encode(logEvent);
return new String(encode);
}
}
31 changes: 19 additions & 12 deletions server/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,35 @@
~ limitations under the License.
-->
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss.SSSZ} [%-5level] [${logMDC}] [%logger] [%thread] %msg%n
</Pattern>
</layout>
<encoder>
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS,UTC} [%thread] %-5level %logger - %msg%n
</pattern>
</encoder>
</appender>

<appender name="STDOUT-BUILD-LOG" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern> <!-- logMDC can be set as environment variable or java system property -->
%d{yyyy-MM-dd HH:mm:ss.SSSZ} [${logMDC}] [%logger] %msg%n
</Pattern>
</layout>
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="ch.qos.logback.contrib.json.classic.JsonLayout">
<timestampFormat>yyyy-MM-dd'T'HH:mm:ss.SSS</timestampFormat>
<timestampFormatTimezoneId>Etc/UTC</timestampFormatTimezoneId>
<jsonFormatter class="ch.qos.logback.contrib.jackson.JacksonJsonFormatter">
<prettyPrint>false</prettyPrint>
</jsonFormatter>
</layout>
</encoder>
</appender>

<!-- Set log level to info to enable process output log -->
<logger name="org.jboss.pnc._userlog_.build-log" level="error" additivity="false">
<appender-ref ref="STDOUT-BUILD-LOG" />
</logger>

<logger name="io.termd.core" level="info" additivity="false">
<logger name="io" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.jboss.pnc.buildagent" level="debug" additivity="false">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -63,7 +65,7 @@ public static void startServer(String host, int port, String bindPath, boolean u
primaryLoggers = new IoLoggerName[] { IoLoggerName.FILE};
} else {
logFolder = Optional.empty();
primaryLoggers = new IoLoggerName[] { IoLoggerName.LOG};
primaryLoggers = new IoLoggerName[0];
}
try {
Options options = new Options(
Expand All @@ -73,12 +75,15 @@ public static void startServer(String host, int port, String bindPath, boolean u
useSocket,
!useSocket
);
Map<String, String> mdcMap = new HashMap<>();
mdcMap.put("ctx", RandomUtils.randString(6));

buildAgentServer = new BuildAgentServer(
logFolder,
Optional.empty(),
primaryLoggers,
options,
RandomUtils.randString(6));
mdcMap);
log.info("Server started.");
} catch (BuildAgentException e) {
throw new RuntimeException("Cannot start terminal server.", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.jboss.pnc.buildagent.server.formatter;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

/**
* @author <a href="mailto:[email protected]">Matej Lazar</a>
*/
public class TestLogbackFormatter {

Logger logger = LoggerFactory.getLogger(TestLogbackFormatter.class);

@Test
public void shouldReturnJsonFormatted() throws IOException {
String message = "Major Tom, where is your ship ?";
LogbackFormatter logbackJsonFormatter = new LogbackFormatter();

String ctx = "12345";
MDC.setContextMap(Collections.singletonMap("ctx", ctx));

String messageJson = logbackJsonFormatter.format(message);

logger.info(messageJson);

ObjectMapper mapper = new ObjectMapper();
Map<String, Object> map = mapper.readValue(messageJson, Map.class);

Assert.assertEquals(message, map.get("message"));
Assert.assertEquals("org.jboss.pnc._userlog_.build-log", map.get("logger"));
Assert.assertEquals(ctx, ((Map)map.get("mdc")).get("ctx"));
}


}
Loading

0 comments on commit 61be081

Please sign in to comment.