Skip to content

Commit

Permalink
reverting back to redis backed cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ag060 committed Jan 22, 2025
1 parent f193fd3 commit 2a56730
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 69 deletions.
6 changes: 6 additions & 0 deletions apps/threat-detection/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@
<version>2.24.2</version>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>


</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ public interface CounterCache {

boolean exists(String key);

void clear(String key);
void reset(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.akto.threat.detection.cache;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;

public class RedisBackedCounterCache implements CounterCache {
private final StatefulRedisConnection<String, Long> redis;

private final Cache<String, Long> localCache;

private final String prefix;
private final ConcurrentLinkedQueue<Object> pendingOps;

static class Op {
private final String key;
private final long value;

public Op(String key, long value) {
this.key = key;
this.value = value;
}

public String getKey() {
return key;
}

public long getValue() {
return value;
}
}

public RedisBackedCounterCache(RedisClient redisClient, String prefix) {
this.prefix = prefix;
this.redis = redisClient.connect(new LongValueCodec());
this.localCache = Caffeine.newBuilder().maximumSize(10000).expireAfterWrite(3, TimeUnit.HOURS).build();
this.pendingOps = new ConcurrentLinkedQueue<>();
}

@Override
public void increment(String key) {
this.incrementBy(key, 1);
}

@Override
public void incrementBy(String key, long val) {
long cv = this.get(key);
this.localCache.put(key, cv + val);

this.pendingOps.add(new Op(key, val));
if (this.pendingOps.size() >= 100) {
this.flush();
}
}

@Override
public long get(String key) {
if (this.localCache.asMap().containsKey(key)) {
return this.localCache.asMap().get(key);
}

Long rv = this.redis.sync().get(key);

this.localCache.put(key, rv != null ? rv : 0L);
return rv != null ? rv : 0L;
}

@Override
public boolean exists(String key) {
if (this.localCache.asMap().containsKey(key)) {
return true;
}

return this.redis.sync().exists(key) > 0L;
}

@Override
public void reset(String key) {
this.localCache.put(key, 0L);

this.redis.async().hset(prefix, key, 0L).whenComplete(
(v, e) -> this.redis.async().expire(key, 3 * 60 * 60)
);
}

private void flush() {
Set<String> keys = new HashSet<>();
while (!this.pendingOps.isEmpty()) {
Op op = (Op) this.pendingOps.poll();
keys.add(op.getKey());
}

Map<String, Long> val = new HashMap<>();
for (String key : keys) {
long cv = this.get(key);
val.put(key, cv);
}

this.redis.async().mset(val);
val.forEach((k, v) -> this.redis.async().expire(k, 3 * 60 * 60));

this.pendingOps.clear();
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Result shouldNotify(String aggKey, SampleMaliciousRequest maliciousEvent,
boolean thresholdBreached = windowCount >= rule.getCondition().getMatchCount();

if (thresholdBreached) {
this.cache.clear(cacheKey);
this.cache.reset(cacheKey);
}

return new Result(thresholdBreached);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.akto.dto.HttpResponseParams;
import com.akto.dto.RawApi;
import com.akto.dto.api_protection_parse_layer.AggregationRules;
import com.akto.dto.api_protection_parse_layer.Condition;
import com.akto.dto.api_protection_parse_layer.Rule;
import com.akto.dto.monitoring.FilterConfig;
import com.akto.dto.test_editor.YamlTemplate;
Expand All @@ -26,7 +25,7 @@
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
import com.akto.threat.detection.cache.RedisCounterCache;
import com.akto.threat.detection.cache.RedisBackedCounterCache;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.kafka.KafkaProtoProducer;
import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier;
Expand Down Expand Up @@ -82,7 +81,7 @@ public MaliciousTrafficDetectorTask(

this.windowBasedThresholdNotifier =
new WindowBasedThresholdNotifier(
new RedisCounterCache(redisClient, "wbt"),
new RedisBackedCounterCache(redisClient, "wbt"),
new WindowBasedThresholdNotifier.Config(100, 10 * 60));

this.internalKafka = new KafkaProtoProducer(internalConfig);
Expand Down

0 comments on commit 2a56730

Please sign in to comment.