Skip to content

Commit

Permalink
BE: Messages: Add CEL extensions (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean authored Jul 7, 2024
1 parent 28677a9 commit 4de0d53
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
12 changes: 9 additions & 3 deletions api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import dev.cel.common.types.StructType;
import dev.cel.compiler.CelCompiler;
import dev.cel.compiler.CelCompilerFactory;
import dev.cel.extensions.CelExtensions;
import dev.cel.parser.CelStandardMacro;
import dev.cel.runtime.CelEvaluationException;
import dev.cel.runtime.CelRuntime;
import dev.cel.runtime.CelRuntimeFactory;
import io.kafbat.ui.exception.CelException;
import io.kafbat.ui.model.MessageFilterTypeDTO;
import io.kafbat.ui.model.TopicMessageDTO;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -42,8 +42,7 @@ public class MessageFilters {
private static final String CEL_RECORD_TYPE_NAME = TopicMessageDTO.class.getSimpleName();

private static final CelCompiler CEL_COMPILER = createCompiler();
private static final CelRuntime CEL_RUNTIME = CelRuntimeFactory.standardCelRuntimeBuilder()
.build();
private static final CelRuntime CEL_RUNTIME = createRuntime();

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

Expand Down Expand Up @@ -143,6 +142,7 @@ private static CelCompiler createCompiler() {
return CelCompilerFactory.standardCelCompilerBuilder()
.setOptions(CelOptions.DEFAULT)
.setStandardMacros(CelStandardMacro.STANDARD_MACROS)
.addLibraries(CelExtensions.strings(), CelExtensions.encoders())
.addVar(CEL_RECORD_VAR_NAME, recordType)
.setResultType(SimpleType.BOOL)
.setTypeProvider(new CelTypeProvider() {
Expand All @@ -159,6 +159,12 @@ public Optional<CelType> findType(String typeName) {
.build();
}

private static CelRuntime createRuntime() {
return CelRuntimeFactory.standardCelRuntimeBuilder()
.addLibraries(CelExtensions.strings(), CelExtensions.encoders())
.build();
}

@Nullable
private static Object parseToJsonOrReturnAsIs(@Nullable String str) {
if (str == null) {
Expand Down
19 changes: 13 additions & 6 deletions api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import io.kafbat.ui.exception.CelException;
import io.kafbat.ui.model.TopicMessageDTO;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Predicate;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Nested;
Expand Down Expand Up @@ -100,7 +101,7 @@ void canCheckTimestampMs() {
var ts = OffsetDateTime.now();
var f = celScriptFilter("record.timestampMs == " + ts.toInstant().toEpochMilli());
assertTrue(f.test(msg().timestamp(ts)));
assertFalse(f.test(msg().timestamp(ts.plus(1L, ChronoUnit.SECONDS))));
assertFalse(f.test(msg().timestamp(ts.plusSeconds(1L))));
}

@Test
Expand Down Expand Up @@ -177,6 +178,7 @@ void filterSpeedIsAtLeast5kPerSec() {
toFilter.add(msg().content(jsonContent).key(randString));
}
// first iteration for warmup
// noinspection ResultOfMethodCallIgnored
toFilter.stream().filter(f).count();

long before = System.currentTimeMillis();
Expand All @@ -188,10 +190,15 @@ void filterSpeedIsAtLeast5kPerSec() {
}
}

@Test
void testBase64DecodingWorks() {
var uuid = UUID.randomUUID().toString();
var msg = "test." + Base64.getEncoder().encodeToString(uuid.getBytes());
var f = celScriptFilter("string(base64.decode(record.value.split('.')[1])).contains('" + uuid + "')");
assertTrue(f.test(msg().content(msg)));
}

private TopicMessageDTO msg() {
return new TopicMessageDTO()
.timestamp(OffsetDateTime.now())
.offset(-1L)
.partition(1);
return new TopicMessageDTO(1, -1L, OffsetDateTime.now());
}
}

0 comments on commit 4de0d53

Please sign in to comment.