Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance s3 object key formatting #457

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
91 changes: 63 additions & 28 deletions singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,8 +25,10 @@

import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -46,6 +49,7 @@ public class S3Writer implements LogStreamWriter {
private static final String HOSTNAME = SingerUtils.HOSTNAME;
private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class);
private static final SimpleDateFormat FORMATTER = new SimpleDateFormat("yyyyMMddHHmmssSSS");
private final Map<String, String> envMappings = System.getenv();
private final LogStream logStream;
private final String logName;
private final String BUFFER_DIR;
Expand Down Expand Up @@ -82,17 +86,10 @@ public class S3Writer implements LogStreamWriter {
}

public enum DefaultTokens {
UUID("%UUID"),
TIMESTAMP("%TIMESTAMP"),
HOST("%HOST"),
LOGNAME("%LOGNAME");
private final String token;
DefaultTokens(String token) {
this.token = token;
}
public String getValue() {
return token;
}
UUID,
TIMESTAMP,
HOST,
LOGNAME;
}

/**
Expand Down Expand Up @@ -354,44 +351,82 @@ private Matcher extractTokensFromFilename(String logFileName) {
}

/**
* Generates an S3 object key based on the configured key format. The key can contain tokens in the
* format %{token} that will be replaced with the values extracted from the log filename based on
* the regex pattern provided in filenamePattern using named regex groups.
* Generates a map of default token values that can be used in the key format.
*
* @return the generated S3 object key
* @return a map of default token values
*/
public String generateS3ObjectKey() {
String s3Key = keyFormat;
Matcher matcher;
// Replace default tokens
// TODO: Implement a one pass replacement loop for all tokens if performance becomes an
// issue, for now this reads better.
private Map<String, String> getDefaultTokenValue() {
String timestamp = FORMATTER.format(new Date());
Map<String, String> defaultTokenValues = new HashMap<>();
for (DefaultTokens token : DefaultTokens.values()) {
String value;
switch (token) {
case UUID:
s3Key = s3Key.replace(token.getValue(), UUID.randomUUID().toString().substring(0, 8));
break;
case TIMESTAMP:
s3Key = s3Key.replace(token.getValue(), FORMATTER.format(new Date()));
value = UUID.randomUUID().toString().substring(0, 8);
break;
case HOST:
s3Key = s3Key.replace(token.getValue(), HOSTNAME);
value = HOSTNAME;
break;
case LOGNAME:
s3Key = s3Key.replace(token.getValue(), logName);
value = logName;
break;
case TIMESTAMP:
value = timestamp;
break;
default:
throw new IllegalStateException("Unexpected value: " + token);
}
defaultTokenValues.put(token.name(), value);
}
// Also allow for adding the timestamp in parts.
defaultTokenValues.put("y", timestamp.substring(0,4));
defaultTokenValues.put("M", timestamp.substring(4,6));
defaultTokenValues.put("d", timestamp.substring(6,8));
defaultTokenValues.put("H", timestamp.substring(8,10));
defaultTokenValues.put("m", timestamp.substring(10,12));
defaultTokenValues.put("S", timestamp.substring(12,14));
return defaultTokenValues;
}

/**
* Generates an S3 object key based on the configured key format. It uses {@link StringSubstitutor} to replace key tokens in the
* s3KeyFormat that will be replaced with the values extracted from the log filename based on the regex pattern provided in
* filenamePattern using named regex groups. We also support injection of environment variables and default tokens.
*
* Default tokens: {{TOKEN}}
* Environment variables: ${ENV_VAR}
* Named groups from filenamePattern: %{TOKEN}
*
* @return the generated S3 object key
*/
public String generateS3ObjectKey() {
String s3Key = keyFormat;
Matcher matcher;
// Replace default tokens in the "%TOKEN" format
Map<String, String> defaultTokenValues = getDefaultTokenValue();
StringSubstitutor stringSubstitutor = new StringSubstitutor(defaultTokenValues, "{{", "}}");
s3Key = stringSubstitutor.replace(s3Key);

// Replace environment variables
if (envMappings != null || !envMappings.isEmpty()) {
// Default replacement is with ${} format
stringSubstitutor = new StringSubstitutor(envMappings);
s3Key = stringSubstitutor.replace(s3Key);
}

// Replace named groups from filenamePattern
if (filenameParsingEnabled) {
if ((matcher = extractTokensFromFilename(logStream.getFileNamePrefix())) != null) {
Map<String, String> groupMap = new HashMap<>();
for (String token : fileNameTokens) {
// Attempt to replace the token in filenamePattern with the matched value
String matchedValue = matcher.group(token);
if (matchedValue != null) {
s3Key = s3Key.replace("%{" + token + "}", matchedValue);
groupMap.put(token, matchedValue);
}
}
stringSubstitutor = new StringSubstitutor(groupMap, "%{", "}");
s3Key = stringSubstitutor.replace(s3Key);
} else {
// If there is no match we simply return the key without replacing any custom tokens
LOG.warn("Filename parsing is enabled but filenamePattern provided: " + filenamePattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class S3WriterTest extends SingerTestBase {

@Before
public void setUp() {
// set hostname
// Set hostname
SingerUtils.setHostname("localhost-dev", "-");

// Initialize basics
Expand Down Expand Up @@ -220,12 +220,12 @@ public void testUploadIsScheduled() throws Exception {
}

@Test
public void testS3ObjectKeyGeneration() {
public void testObjectKeyGeneration() {
// Custom and default tokens used
String
keyFormat =
"my-path/%{namespace}/" + DefaultTokens.LOGNAME.getValue() + "/%{filename}-%{index}."
+ DefaultTokens.TIMESTAMP.getValue();
"my-path/%{namespace}/{{" + DefaultTokens.LOGNAME
+ "}}/%{filename}-%{index}.{{S}}";
logStream = new LogStream(singerLog, "my_namespace-test_log.0");
s3WriterConfig = new S3WriterConfig();
s3WriterConfig.setKeyFormat(keyFormat);
Expand All @@ -244,7 +244,8 @@ public void testS3ObjectKeyGeneration() {
String[] keySuffixParts = objectKeyParts[3].split("\\.");
assertEquals(3, keySuffixParts.length);
assertEquals("test_log-0", keySuffixParts[0]);
assertNotEquals(DefaultTokens.LOGNAME.getValue(), keySuffixParts[1]);
assertNotEquals("{{S}}", keySuffixParts[1]);
assertEquals(2, keySuffixParts[1].length());
// Custom tokens provided but filename pattern does not match
s3WriterConfig.setFilenamePattern("(?<filename>[^.]+)\\.(?<index>\\d+).0");
s3Writer =
Expand All @@ -253,6 +254,22 @@ public void testS3ObjectKeyGeneration() {
assertEquals("%{namespace}", objectKeyParts[1]);
keySuffixParts = objectKeyParts[3].split("\\.");
assertEquals("%{filename}-%{index}", keySuffixParts[0]);

// Custom tokens used but with typos in format
// Final result should be: my-path/%{{namespace}}/%testLog/%test_log/0%/<seconds>}.<uuid>
keyFormat =
"my-path/%{{namespace}}/%{{" + DefaultTokens.LOGNAME
+ "}}/%%{filename}/%{index}%/{{S}}}";
s3WriterConfig.setKeyFormat(keyFormat);
s3WriterConfig.setFilenamePattern("(?<namespace>[^-]+)-(?<filename>[^.]+)\\.(?<index>\\d+)");
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Client, mockObjectUploaderTask, tempPath);
objectKeyParts = s3Writer.generateS3ObjectKey().split("/");
assertEquals(6, objectKeyParts.length);
assertEquals("%{{namespace}}", objectKeyParts[1]);
assertEquals("%" + logStream.getSingerLog().getSingerLogConfig().getName(), objectKeyParts[2]);
assertEquals("%test_log", objectKeyParts[3]);
assertEquals("0%", objectKeyParts[4]);
assertEquals(3, objectKeyParts[5].split("\\.")[0].length());
}

@Test
Expand Down
Loading