diff --git a/singer/pom.xml b/singer/pom.xml index 96dfd16e..44458bc0 100644 --- a/singer/pom.xml +++ b/singer/pom.xml @@ -90,6 +90,11 @@ + + org.apache.commons + commons-text + 1.10.0 + io.netty netty-all diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java index f12acad5..1673f7ec 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java @@ -14,6 +14,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,8 +25,10 @@ import java.nio.file.Files; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Date; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -46,6 +49,7 @@ public class S3Writer implements LogStreamWriter { private static final String HOSTNAME = SingerUtils.HOSTNAME; private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class); private static final SimpleDateFormat FORMATTER = new SimpleDateFormat("yyyyMMddHHmmssSSS"); + private final Map envMappings = System.getenv(); private final LogStream logStream; private final String logName; private final String BUFFER_DIR; @@ -82,17 +86,10 @@ public class S3Writer implements LogStreamWriter { } public enum DefaultTokens { - UUID("%UUID"), - TIMESTAMP("%TIMESTAMP"), - HOST("%HOST"), - LOGNAME("%LOGNAME"); - private final String token; - DefaultTokens(String token) { - this.token = token; - } - public String getValue() { - return token; - } + UUID, + TIMESTAMP, + HOST, + LOGNAME; } /** @@ -354,44 +351,82 @@ private Matcher extractTokensFromFilename(String logFileName) { } /** - * Generates an S3 object key based on the configured key format. The key can contain tokens in the - * format %{token} that will be replaced with the values extracted from the log filename based on - * the regex pattern provided in filenamePattern using named regex groups. + * Generates a map of default token values that can be used in the key format. * - * @return the generated S3 object key + * @return a map of default token values */ - public String generateS3ObjectKey() { - String s3Key = keyFormat; - Matcher matcher; - // Replace default tokens - // TODO: Implement a one pass replacement loop for all tokens if performance becomes an - // issue, for now this reads better. + private Map getDefaultTokenValue() { + String timestamp = FORMATTER.format(new Date()); + Map defaultTokenValues = new HashMap<>(); for (DefaultTokens token : DefaultTokens.values()) { + String value; switch (token) { case UUID: - s3Key = s3Key.replace(token.getValue(), UUID.randomUUID().toString().substring(0, 8)); - break; - case TIMESTAMP: - s3Key = s3Key.replace(token.getValue(), FORMATTER.format(new Date())); + value = UUID.randomUUID().toString().substring(0, 8); break; case HOST: - s3Key = s3Key.replace(token.getValue(), HOSTNAME); + value = HOSTNAME; break; case LOGNAME: - s3Key = s3Key.replace(token.getValue(), logName); + value = logName; break; + case TIMESTAMP: + value = timestamp; + break; + default: + throw new IllegalStateException("Unexpected value: " + token); } + defaultTokenValues.put(token.name(), value); + } + // Also allow for adding the timestamp in parts. + defaultTokenValues.put("y", timestamp.substring(0,4)); + defaultTokenValues.put("M", timestamp.substring(4,6)); + defaultTokenValues.put("d", timestamp.substring(6,8)); + defaultTokenValues.put("H", timestamp.substring(8,10)); + defaultTokenValues.put("m", timestamp.substring(10,12)); + defaultTokenValues.put("S", timestamp.substring(12,14)); + return defaultTokenValues; + } + + /** + * Generates an S3 object key based on the configured key format. It uses {@link StringSubstitutor} to replace key tokens in the + * s3KeyFormat that will be replaced with the values extracted from the log filename based on the regex pattern provided in + * filenamePattern using named regex groups. We also support injection of environment variables and default tokens. + * + * Default tokens: {{TOKEN}} + * Environment variables: ${ENV_VAR} + * Named groups from filenamePattern: %{TOKEN} + * + * @return the generated S3 object key + */ + public String generateS3ObjectKey() { + String s3Key = keyFormat; + Matcher matcher; + // Replace default tokens in the "%TOKEN" format + Map defaultTokenValues = getDefaultTokenValue(); + StringSubstitutor stringSubstitutor = new StringSubstitutor(defaultTokenValues, "{{", "}}"); + s3Key = stringSubstitutor.replace(s3Key); + + // Replace environment variables + if (envMappings != null || !envMappings.isEmpty()) { + // Default replacement is with ${} format + stringSubstitutor = new StringSubstitutor(envMappings); + s3Key = stringSubstitutor.replace(s3Key); } + // Replace named groups from filenamePattern if (filenameParsingEnabled) { if ((matcher = extractTokensFromFilename(logStream.getFileNamePrefix())) != null) { + Map groupMap = new HashMap<>(); for (String token : fileNameTokens) { // Attempt to replace the token in filenamePattern with the matched value String matchedValue = matcher.group(token); if (matchedValue != null) { - s3Key = s3Key.replace("%{" + token + "}", matchedValue); + groupMap.put(token, matchedValue); } } + stringSubstitutor = new StringSubstitutor(groupMap, "%{", "}"); + s3Key = stringSubstitutor.replace(s3Key); } else { // If there is no match we simply return the key without replacing any custom tokens LOG.warn("Filename parsing is enabled but filenamePattern provided: " + filenamePattern diff --git a/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java b/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java index ae3f5852..05aec40a 100644 --- a/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java +++ b/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java @@ -53,7 +53,7 @@ public class S3WriterTest extends SingerTestBase { @Before public void setUp() { - // set hostname + // Set hostname SingerUtils.setHostname("localhost-dev", "-"); // Initialize basics @@ -220,12 +220,12 @@ public void testUploadIsScheduled() throws Exception { } @Test - public void testS3ObjectKeyGeneration() { + public void testObjectKeyGeneration() { // Custom and default tokens used String keyFormat = - "my-path/%{namespace}/" + DefaultTokens.LOGNAME.getValue() + "/%{filename}-%{index}." - + DefaultTokens.TIMESTAMP.getValue(); + "my-path/%{namespace}/{{" + DefaultTokens.LOGNAME + + "}}/%{filename}-%{index}.{{S}}"; logStream = new LogStream(singerLog, "my_namespace-test_log.0"); s3WriterConfig = new S3WriterConfig(); s3WriterConfig.setKeyFormat(keyFormat); @@ -244,7 +244,8 @@ public void testS3ObjectKeyGeneration() { String[] keySuffixParts = objectKeyParts[3].split("\\."); assertEquals(3, keySuffixParts.length); assertEquals("test_log-0", keySuffixParts[0]); - assertNotEquals(DefaultTokens.LOGNAME.getValue(), keySuffixParts[1]); + assertNotEquals("{{S}}", keySuffixParts[1]); + assertEquals(2, keySuffixParts[1].length()); // Custom tokens provided but filename pattern does not match s3WriterConfig.setFilenamePattern("(?[^.]+)\\.(?\\d+).0"); s3Writer = @@ -253,6 +254,22 @@ public void testS3ObjectKeyGeneration() { assertEquals("%{namespace}", objectKeyParts[1]); keySuffixParts = objectKeyParts[3].split("\\."); assertEquals("%{filename}-%{index}", keySuffixParts[0]); + + // Custom tokens used but with typos in format + // Final result should be: my-path/%{{namespace}}/%testLog/%test_log/0%/}. + keyFormat = + "my-path/%{{namespace}}/%{{" + DefaultTokens.LOGNAME + + "}}/%%{filename}/%{index}%/{{S}}}"; + s3WriterConfig.setKeyFormat(keyFormat); + s3WriterConfig.setFilenamePattern("(?[^-]+)-(?[^.]+)\\.(?\\d+)"); + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Client, mockObjectUploaderTask, tempPath); + objectKeyParts = s3Writer.generateS3ObjectKey().split("/"); + assertEquals(6, objectKeyParts.length); + assertEquals("%{{namespace}}", objectKeyParts[1]); + assertEquals("%" + logStream.getSingerLog().getSingerLogConfig().getName(), objectKeyParts[2]); + assertEquals("%test_log", objectKeyParts[3]); + assertEquals("0%", objectKeyParts[4]); + assertEquals(3, objectKeyParts[5].split("\\.")[0].length()); } @Test