Skip to content

Commit

Permalink
fixup! feat: Update in-process resolver to support flag metadata open…
Browse files Browse the repository at this point in the history
…-feature#1102

Signed-off-by: christian.lutnik <[email protected]>
  • Loading branch information
chrfwow committed Jan 10, 2025
1 parent f8e69aa commit 65a508c
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageQueryResult;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector;
Expand Down Expand Up @@ -46,7 +47,7 @@ public class InProcessResolver implements Resolver {
* Resolves flag values using https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1. Flags
* are evaluated locally.
*
* @param options flagd options
* @param options flagd options
* @param connectedSupplier lambda providing current connection status from caller
* @param onConnectionEvent lambda which handles changes in the connection/stream
*/
Expand Down Expand Up @@ -161,14 +162,15 @@ static Connector getConnector(final FlagdOptions options) {
}

private <T> ProviderEvaluation<T> resolve(Class<T> type, String key, EvaluationContext ctx) {
final FeatureFlag flag = flagStore.getFlag(key);
final StorageQueryResult storageQueryResult = flagStore.getFlag(key);
final FeatureFlag flag = storageQueryResult.getFeatureFlag();

// missing flag
if (flag == null) {
return ProviderEvaluation.<T>builder()
.errorMessage("flag: " + key + " not found")
.errorCode(ErrorCode.FLAG_NOT_FOUND)
.flagMetadata(fallBackMetadata)
.flagMetadata(getFlagMetadata(storageQueryResult))
.build();
}

Expand All @@ -177,7 +179,7 @@ private <T> ProviderEvaluation<T> resolve(Class<T> type, String key, EvaluationC
return ProviderEvaluation.<T>builder()
.errorMessage("flag: " + key + " is disabled")
.errorCode(ErrorCode.FLAG_NOT_FOUND)
.flagMetadata(getFlagMetadata(flag))
.flagMetadata(getFlagMetadata(storageQueryResult))
.build();
}

Expand Down Expand Up @@ -228,47 +230,55 @@ private <T> ProviderEvaluation<T> resolve(Class<T> type, String key, EvaluationC
.value((T) value)
.variant(resolvedVariant)
.reason(reason)
.flagMetadata(getFlagMetadata(flag))
.flagMetadata(getFlagMetadata(storageQueryResult))
.build();
}

private ImmutableMetadata getFlagMetadata(FeatureFlag flag) {
if (flag == null) {
return fallBackMetadata;
private ImmutableMetadata getFlagMetadata(StorageQueryResult storageQueryResult) {
ImmutableMetadata.ImmutableMetadataBuilder metadataBuilder = ImmutableMetadata.builder();
for (Map.Entry<String, Object> entry :
storageQueryResult.getGlobalFlagMetadata().entrySet()) {
addEntryToMetadataBuilder(metadataBuilder, entry.getKey(), entry.getValue());
}

ImmutableMetadata.ImmutableMetadataBuilder metadataBuilder = ImmutableMetadata.builder();
if (scope != null) {
metadataBuilder.addString("scope", scope);
}

for (Map.Entry<String, Object> entry : flag.getMetadata().entrySet()) {
Object value = entry.getValue();
if (value instanceof Number) {
if (value instanceof Long) {
metadataBuilder.addLong(entry.getKey(), (Long) value);
continue;
} else if (value instanceof Double) {
metadataBuilder.addDouble(entry.getKey(), (Double) value);
continue;
} else if (value instanceof Integer) {
metadataBuilder.addInteger(entry.getKey(), (Integer) value);
continue;
} else if (value instanceof Float) {
metadataBuilder.addFloat(entry.getKey(), (Float) value);
continue;
}
} else if (value instanceof Boolean) {
metadataBuilder.addBoolean(entry.getKey(), (Boolean) value);
continue;
} else if (value instanceof String) {
metadataBuilder.addString(entry.getKey(), (String) value);
continue;
FeatureFlag flag = storageQueryResult.getFeatureFlag();
if (flag != null) {
for (Map.Entry<String, Object> entry : flag.getMetadata().entrySet()) {
addEntryToMetadataBuilder(metadataBuilder, entry.getKey(), entry.getValue());
}
throw new IllegalArgumentException("The type of the Metadata entry with key " + entry.getKey()
+ " and value " + entry.getValue() + " is not supported");
}

return metadataBuilder.build();
}

private void addEntryToMetadataBuilder(
ImmutableMetadata.ImmutableMetadataBuilder metadataBuilder, String key, Object value) {
if (value instanceof Number) {
if (value instanceof Long) {
metadataBuilder.addLong(key, (Long) value);
return;
} else if (value instanceof Double) {
metadataBuilder.addDouble(key, (Double) value);
return;
} else if (value instanceof Integer) {
metadataBuilder.addInteger(key, (Integer) value);
return;
} else if (value instanceof Float) {
metadataBuilder.addFloat(key, (Float) value);
return;
}
} else if (value instanceof Boolean) {
metadataBuilder.addBoolean(key, (Boolean) value);
return;
} else if (value instanceof String) {
metadataBuilder.addString(key, (String) value);
return;
}
throw new IllegalArgumentException(
"The type of the Metadata entry with key " + key + " and value " + value + " is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,6 @@ public FeatureFlag(String state, String defaultVariant, Map<String, Object> vari
this.metadata = new HashMap<>();
}

/**
* Add global metadata to this FeatureFlag. Keys that already exist in the metadata of this flag are not
* overwritten.
*
* @param metadata The metadata to add to this flag
*/
public void addMetadata(Map<String, Object> metadata) {
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
this.metadata.putIfAbsent(entry.getKey(), entry.getValue());
}
}

/** Get targeting rule of the flag. */
public String getTargeting() {
return this.targeting == null ? EMPTY_TARGETING_STRING : this.targeting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ private FlagParser() {}
}

/** Parse {@link String} for feature flags. */
public static Map<String, FeatureFlag> parseString(final String configuration, boolean throwIfInvalid)
throws IOException {
public static ParsingResult parseString(final String configuration, boolean throwIfInvalid) throws IOException {
if (SCHEMA_VALIDATOR != null) {
try (JsonParser parser = MAPPER.createParser(configuration)) {
Set<ValidationMessage> validationMessages = SCHEMA_VALIDATOR.validate(parser.readValueAsTree());
Expand All @@ -72,12 +71,12 @@ public static Map<String, FeatureFlag> parseString(final String configuration, b
final String transposedConfiguration = transposeEvaluators(configuration);

final Map<String, FeatureFlag> flagMap = new HashMap<>();

final Map<String, Object> metadata;
try (JsonParser parser = MAPPER.createParser(transposedConfiguration)) {
final TreeNode treeNode = parser.readValueAsTree();
final TreeNode flagNode = treeNode.get(FLAG_KEY);
final TreeNode metadataNode = treeNode.get(METADATA_KEY);
final Map<String, Object> metadata = parseMetadata(metadataNode);
metadata = parseMetadata(metadataNode);

if (flagNode == null) {
throw new IllegalArgumentException("No flag configurations found in the payload");
Expand All @@ -86,13 +85,11 @@ public static Map<String, FeatureFlag> parseString(final String configuration, b
final Iterator<String> it = flagNode.fieldNames();
while (it.hasNext()) {
final String key = it.next();
FeatureFlag flag = MAPPER.treeToValue(flagNode.get(key), FeatureFlag.class);
flag.addMetadata(metadata);
flagMap.put(key, flag);
flagMap.put(key, MAPPER.treeToValue(flagNode.get(key), FeatureFlag.class));
}
}

return flagMap;
return new ParsingResult(flagMap, metadata);
}

private static Map<String, Object> parseMetadata(TreeNode metadataNode) throws JsonProcessingException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.model;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Map;
import lombok.Getter;

/**
* The result of the parsing of a json string containing feature flag definitions.
*/
@Getter
@SuppressFBWarnings(
value = {"EI_EXPOSE_REP"},
justification = "Feature flag comes as a Json configuration, hence they must be exposed")
public class ParsingResult {
private final Map<String, FeatureFlag> flags;
private final Map<String, Object> globalFlagMetadata;

public ParsingResult(Map<String, FeatureFlag> flags, Map<String, Object> globalFlagMetadata) {
this.flags = flags;
this.globalFlagMetadata = globalFlagMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.ParsingResult;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
Expand Down Expand Up @@ -35,6 +36,7 @@ public class FlagStore implements Storage {
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final BlockingQueue<StorageStateChange> stateBlockingQueue = new LinkedBlockingQueue<>(1);
private final Map<String, FeatureFlag> flags = new HashMap<>();
private final Map<String, Object> globalFlagMetadata = new HashMap<>();

private final Connector connector;
private final boolean throwIfInvalid;
Expand All @@ -49,6 +51,7 @@ public FlagStore(final Connector connector, final boolean throwIfInvalid) {
}

/** Initialize storage layer. */
@Override
public void init() throws Exception {
connector.init();
Thread streamer = new Thread(() -> {
Expand All @@ -68,6 +71,7 @@ public void init() throws Exception {
*
* @throws InterruptedException if stream can't be closed within deadline.
*/
@Override
public void shutdown() throws InterruptedException {
if (shutdown.getAndSet(true)) {
return;
Expand All @@ -76,17 +80,23 @@ public void shutdown() throws InterruptedException {
connector.shutdown();
}

/** Retrieve flag for the given key. */
public FeatureFlag getFlag(final String key) {
/** Retrieve flag for the given key and the global flag metadata. */
@Override
public StorageQueryResult getFlag(final String key) {
readLock.lock();
FeatureFlag flag;
Map<String, Object> metadata;
try {
return flags.get(key);
flag = flags.get(key);
metadata = new HashMap<>(globalFlagMetadata);
} finally {
readLock.unlock();
}
return new StorageQueryResult(flag, metadata);
}

/** Retrieve blocking queue to check storage status. */
@Override
public BlockingQueue<StorageStateChange> getStateQueue() {
return stateBlockingQueue;
}
Expand All @@ -100,14 +110,18 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
case DATA:
try {
List<String> changedFlagsKeys;
Map<String, FeatureFlag> flagMap =
FlagParser.parseString(payload.getFlagData(), throwIfInvalid);
ParsingResult parsingResult = FlagParser.parseString(payload.getFlagData(), throwIfInvalid);
Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
Map<String, Object> globalFlagMetadataMap = parsingResult.getGlobalFlagMetadata();

Structure metadata = parseSyncMetadata(payload.getMetadataResponse());
writeLock.lock();
try {
changedFlagsKeys = getChangedFlagsKeys(flagMap);
flags.clear();
flags.putAll(flagMap);
globalFlagMetadata.clear();
globalFlagMetadata.putAll(globalFlagMetadataMap);
} finally {
writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import java.util.concurrent.BlockingQueue;

/** Storage abstraction for resolver. */
Expand All @@ -9,7 +8,7 @@ public interface Storage {

void shutdown() throws InterruptedException;

FeatureFlag getFlag(final String key);
StorageQueryResult getFlag(final String key);

BlockingQueue<StorageStateChange> getStateQueue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Map;
import lombok.Getter;

/**
* To be returned by the storage when a flag is queried. Contains the flag (iff a flag associated with the given key
* exists, null otherwise) and global flag metadata
*/
@Getter
@SuppressFBWarnings(
value = {"EI_EXPOSE_REP"},
justification = "The storage provides access to both feature flags and global metadata")
public class StorageQueryResult {
private final FeatureFlag featureFlag;
private final Map<String, Object> globalFlagMetadata;

public StorageQueryResult(FeatureFlag featureFlag, Map<String, Object> globalFlagMetadata) {
this.featureFlag = featureFlag;
this.globalFlagMetadata = globalFlagMetadata;
}
}
Loading

0 comments on commit 65a508c

Please sign in to comment.