Skip to content

Commit

Permalink
Merge pull request #457 from jfzunigac/s3writer/improve_key_formatting
Browse files Browse the repository at this point in the history
Enhance s3 object key formatting
  • Loading branch information
jfzunigac authored Dec 3, 2024
2 parents 376ed60 + e7c6686 commit 5363358
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 33 deletions.
5 changes: 5 additions & 0 deletions singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
91 changes: 63 additions & 28 deletions singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,8 +25,10 @@

import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -46,6 +49,7 @@ public class S3Writer implements LogStreamWriter {
private static final String HOSTNAME = SingerUtils.HOSTNAME;
private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class);
private static final SimpleDateFormat FORMATTER = new SimpleDateFormat("yyyyMMddHHmmssSSS");
private final Map<String, String> envMappings = System.getenv();
private final LogStream logStream;
private final String logName;
private final String BUFFER_DIR;
Expand Down Expand Up @@ -82,17 +86,10 @@ public class S3Writer implements LogStreamWriter {
}

public enum DefaultTokens {
UUID("%UUID"),
TIMESTAMP("%TIMESTAMP"),
HOST("%HOST"),
LOGNAME("%LOGNAME");
private final String token;
DefaultTokens(String token) {
this.token = token;
}
public String getValue() {
return token;
}
UUID,
TIMESTAMP,
HOST,
LOGNAME;
}

/**
Expand Down Expand Up @@ -354,44 +351,82 @@ private Matcher extractTokensFromFilename(String logFileName) {
}

/**
* Generates an S3 object key based on the configured key format. The key can contain tokens in the
* format %{token} that will be replaced with the values extracted from the log filename based on
* the regex pattern provided in filenamePattern using named regex groups.
* Generates a map of default token values that can be used in the key format.
*
* @return the generated S3 object key
* @return a map of default token values
*/
public String generateS3ObjectKey() {
String s3Key = keyFormat;
Matcher matcher;
// Replace default tokens
// TODO: Implement a one pass replacement loop for all tokens if performance becomes an
// issue, for now this reads better.
private Map<String, String> getDefaultTokenValue() {
String timestamp = FORMATTER.format(new Date());
Map<String, String> defaultTokenValues = new HashMap<>();
for (DefaultTokens token : DefaultTokens.values()) {
String value;
switch (token) {
case UUID:
s3Key = s3Key.replace(token.getValue(), UUID.randomUUID().toString().substring(0, 8));
break;
case TIMESTAMP:
s3Key = s3Key.replace(token.getValue(), FORMATTER.format(new Date()));
value = UUID.randomUUID().toString().substring(0, 8);
break;
case HOST:
s3Key = s3Key.replace(token.getValue(), HOSTNAME);
value = HOSTNAME;
break;
case LOGNAME:
s3Key = s3Key.replace(token.getValue(), logName);
value = logName;
break;
case TIMESTAMP:
value = timestamp;
break;
default:
throw new IllegalStateException("Unexpected value: " + token);
}
defaultTokenValues.put(token.name(), value);
}
// Also allow for adding the timestamp in parts.
defaultTokenValues.put("y", timestamp.substring(0,4));
defaultTokenValues.put("M", timestamp.substring(4,6));
defaultTokenValues.put("d", timestamp.substring(6,8));
defaultTokenValues.put("H", timestamp.substring(8,10));
defaultTokenValues.put("m", timestamp.substring(10,12));
defaultTokenValues.put("S", timestamp.substring(12,14));
return defaultTokenValues;
}

/**
* Generates an S3 object key based on the configured key format. It uses {@link StringSubstitutor} to replace key tokens in the
* s3KeyFormat that will be replaced with the values extracted from the log filename based on the regex pattern provided in
* filenamePattern using named regex groups. We also support injection of environment variables and default tokens.
*
* Default tokens: {{TOKEN}}
* Environment variables: ${ENV_VAR}
* Named groups from filenamePattern: %{TOKEN}
*
* @return the generated S3 object key
*/
public String generateS3ObjectKey() {
String s3Key = keyFormat;
Matcher matcher;
// Replace default tokens in the "%TOKEN" format
Map<String, String> defaultTokenValues = getDefaultTokenValue();
StringSubstitutor stringSubstitutor = new StringSubstitutor(defaultTokenValues, "{{", "}}");
s3Key = stringSubstitutor.replace(s3Key);

// Replace environment variables
if (envMappings != null || !envMappings.isEmpty()) {
// Default replacement is with ${} format
stringSubstitutor = new StringSubstitutor(envMappings);
s3Key = stringSubstitutor.replace(s3Key);
}

// Replace named groups from filenamePattern
if (filenameParsingEnabled) {
if ((matcher = extractTokensFromFilename(logStream.getFileNamePrefix())) != null) {
Map<String, String> groupMap = new HashMap<>();
for (String token : fileNameTokens) {
// Attempt to replace the token in filenamePattern with the matched value
String matchedValue = matcher.group(token);
if (matchedValue != null) {
s3Key = s3Key.replace("%{" + token + "}", matchedValue);
groupMap.put(token, matchedValue);
}
}
stringSubstitutor = new StringSubstitutor(groupMap, "%{", "}");
s3Key = stringSubstitutor.replace(s3Key);
} else {
// If there is no match we simply return the key without replacing any custom tokens
LOG.warn("Filename parsing is enabled but filenamePattern provided: " + filenamePattern
Expand Down
27 changes: 22 additions & 5 deletions singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class S3WriterTest extends SingerTestBase {

@Before
public void setUp() {
// set hostname
// Set hostname
SingerUtils.setHostname("localhost-dev", "-");

// Initialize basics
Expand Down Expand Up @@ -220,12 +220,12 @@ public void testUploadIsScheduled() throws Exception {
}

@Test
public void testS3ObjectKeyGeneration() {
public void testObjectKeyGeneration() {
// Custom and default tokens used
String
keyFormat =
"my-path/%{namespace}/" + DefaultTokens.LOGNAME.getValue() + "/%{filename}-%{index}."
+ DefaultTokens.TIMESTAMP.getValue();
"my-path/%{namespace}/{{" + DefaultTokens.LOGNAME
+ "}}/%{filename}-%{index}.{{S}}";
logStream = new LogStream(singerLog, "my_namespace-test_log.0");
s3WriterConfig = new S3WriterConfig();
s3WriterConfig.setKeyFormat(keyFormat);
Expand All @@ -244,7 +244,8 @@ public void testS3ObjectKeyGeneration() {
String[] keySuffixParts = objectKeyParts[3].split("\\.");
assertEquals(3, keySuffixParts.length);
assertEquals("test_log-0", keySuffixParts[0]);
assertNotEquals(DefaultTokens.LOGNAME.getValue(), keySuffixParts[1]);
assertNotEquals("{{S}}", keySuffixParts[1]);
assertEquals(2, keySuffixParts[1].length());
// Custom tokens provided but filename pattern does not match
s3WriterConfig.setFilenamePattern("(?<filename>[^.]+)\\.(?<index>\\d+).0");
s3Writer =
Expand All @@ -253,6 +254,22 @@ public void testS3ObjectKeyGeneration() {
assertEquals("%{namespace}", objectKeyParts[1]);
keySuffixParts = objectKeyParts[3].split("\\.");
assertEquals("%{filename}-%{index}", keySuffixParts[0]);

// Custom tokens used but with typos in format
// Final result should be: my-path/%{{namespace}}/%testLog/%test_log/0%/<seconds>}.<uuid>
keyFormat =
"my-path/%{{namespace}}/%{{" + DefaultTokens.LOGNAME
+ "}}/%%{filename}/%{index}%/{{S}}}";
s3WriterConfig.setKeyFormat(keyFormat);
s3WriterConfig.setFilenamePattern("(?<namespace>[^-]+)-(?<filename>[^.]+)\\.(?<index>\\d+)");
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Client, mockObjectUploaderTask, tempPath);
objectKeyParts = s3Writer.generateS3ObjectKey().split("/");
assertEquals(6, objectKeyParts.length);
assertEquals("%{{namespace}}", objectKeyParts[1]);
assertEquals("%" + logStream.getSingerLog().getSingerLogConfig().getName(), objectKeyParts[2]);
assertEquals("%test_log", objectKeyParts[3]);
assertEquals("0%", objectKeyParts[4]);
assertEquals(3, objectKeyParts[5].split("\\.")[0].length());
}

@Test
Expand Down

0 comments on commit 5363358

Please sign in to comment.