Skip to content

Commit

Permalink
Merge pull request #29 from lalithkota/develop
Browse files Browse the repository at this point in the history
Added ExtractFieldAdv SMT. Updated Timestamp as required. Connector init wait times updated.
  • Loading branch information
lalithkota authored Sep 16, 2024
2 parents 4aa6c9d + f2fd020 commit 9217750
Show file tree
Hide file tree
Showing 25 changed files with 833 additions and 232 deletions.
28 changes: 18 additions & 10 deletions charts/reporting-init/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,23 @@ startUpCommand: |-
debezium_connector_name=$(echo $debezium_connector_config | jq -cr '.name')
debezium_new_connectors_list+=("$debezium_connector_name")
debez_wait_secs=$(echo $debezium_connector_config | jq -cr '.wait_after_init_secs // empty')
debezium_connector_config=$(echo $debezium_connector_config | jq -cr 'del(.wait_after_init_secs)')
debez_wait_init_secs=$(echo $debezium_connector_config | jq -cr '.wait_after_init_secs // empty')
debez_wait_update_secs=$(echo $debezium_connector_config | jq -cr '.wait_after_update_secs // empty')
debezium_connector_config=$(echo $debezium_connector_config | jq -cr 'del(.wait_after_init_secs, .wait_after_update_secs)')
if_exists=$(contains "$debezium_existing_connectors_list" "$debezium_connector_name")
if [ -z "$if_exists" ]; then
echo "==> Creating new Connector - $debezium_connector_name."
curl -s -XPOST -H 'Content-Type: application/json' $DEBEZIUM_URL/connectors -d "$debezium_connector_config" | jq
if [ -n "$debez_wait_init_secs" ]; then
sleep $debez_wait_init_secs
fi
else
echo "==> Connector - $debezium_connector_name - already exists. Updating config."
curl -s -XPUT -H 'Content-Type: application/json' $DEBEZIUM_URL/connectors/${debezium_connector_name}/config -d "$(echo $debezium_connector_config | jq -cr '.config')" | jq
fi
if [ -n "$debez_wait_secs" ]; then
sleep $debez_wait_secs
if [ -n "$debez_wait_update_secs" ]; then
sleep $debez_wait_update_secs
fi
fi
done
echo "==> Starting deletion process for old debezium connectors."
Expand All @@ -247,19 +251,23 @@ startUpCommand: |-
os_connector_name=$(echo $os_connector_config | jq -cr '.name')
os_new_connectors_list+=("$os_connector_name")
os_wait_secs=$(echo $os_connector_config | jq -cr '.wait_after_init_secs // empty')
os_connector_config=$(echo $os_connector_config | jq -cr 'del(.wait_after_init_secs)')
os_wait_init_secs=$(echo $os_connector_config | jq -cr '.wait_after_init_secs // empty')
os_wait_update_secs=$(echo $os_connector_config | jq -cr '.wait_after_update_secs // empty')
os_connector_config=$(echo $os_connector_config | jq -cr 'del(.wait_after_init_secs, .wait_after_update_secs)')
if_exists=$(contains "$os_existing_connectors_list" "$os_connector_name")
if [ -z "$if_exists" ]; then
echo "==> Creating new Connector - $os_connector_name."
curl -s -XPOST -H 'Content-Type: application/json' $OS_KAFKA_CONNECTOR_URL/connectors -d "$os_connector_config" | jq
if [ -n "$os_wait_init_secs" ]; then
sleep $os_wait_init_secs
fi
else
echo "==> Connector - $os_connector_name - already exists. Updating config."
curl -s -XPUT -H 'Content-Type: application/json' $OS_KAFKA_CONNECTOR_URL/connectors/${os_connector_name}/config -d "$(echo $os_connector_config | jq -cr '.config')" | jq
fi
if [ -n "$os_wait_secs" ]; then
sleep $os_wait_secs
if [ -n "$os_wait_update_secs" ]; then
sleep $os_wait_update_secs
fi
fi
done
echo "==> Starting deletion process for old opensearch-connectors."
Expand Down
4 changes: 2 additions & 2 deletions opensearch-kafka-connector/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ RUN cd /kafka-connect-transforms && \
wget https://repo.maven.apache.org/maven2/io/debezium/debezium-api/${DEBEZIUM_VERSION}/debezium-api-${DEBEZIUM_VERSION}.jar


FROM apache/kafka:3.7.0 AS runner
ARG KAFKA_VERSION=3.7.0
FROM apache/kafka:3.8.0 AS runner
ARG KAFKA_VERSION=3.8.0

WORKDIR /opt/kafka
RUN mkdir connectors
Expand Down
21 changes: 20 additions & 1 deletion opensearch-kafka-connector/kafka-connect-transforms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,29 @@
<version>5.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>3.8.0</version>
<!-- This is only required for JsonConverter that is used during testing. -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.8.0</version>
<version>3.8.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>3.8.0</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.field.FieldSyntaxVersion;
import org.apache.kafka.connect.transforms.field.SingleFieldPath;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;

import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
Expand Down Expand Up @@ -48,6 +52,7 @@ public static class Value<R extends ConnectRecord<R>> extends DynamicNewField<R>
public abstract class Config{
String type;
String[] inputFields;
SingleFieldPath[] inputFieldPaths;
String[] inputDefaultValues;
String[] outputFields;
Schema outputSchema;
Expand All @@ -57,6 +62,10 @@ public abstract class Config{
this.inputDefaultValues = inputDefaultValues;
this.outputFields = outputFields;
this.outputSchema = outputSchema;
this.inputFieldPaths = new SingleFieldPath[inputFields.length];
for(int i=0;i < inputFields.length; i++) {
this.inputFieldPaths[i] = new SingleFieldPath(inputFields[i], FieldSyntaxVersion.V2);
}
}
List<Object> make(Object input){
return null;
Expand Down Expand Up @@ -375,7 +384,7 @@ public R applySchemaless(R record) {
List<Object> valueList = new ArrayList<Object>();
boolean dealingWithList = false;
for(int i = 0; i < config.inputFields.length; i++){
Object v = Requirements.getNestedField(value, config.inputFields[i]);
Object v = config.inputFieldPaths[i].valueFrom(value);
if(v != null && !(v instanceof String && ((String)v).isEmpty())){
valueList.add(v);
if(v instanceof List<?>) dealingWithList = true;
Expand Down Expand Up @@ -412,7 +421,7 @@ public R applyWithSchema(R record) {
List<Object> valueList = new ArrayList<Object>();
boolean dealingWithList = false;
for(int i = 0; i < config.inputFields.length; i++){
Object v = Requirements.getNestedField(value, config.inputFields[i])[0];
Object v = config.inputFieldPaths[i].valueFrom(value);
if(v != null && !(v instanceof String && ((String)v).isEmpty())){
valueList.add(v);
if(v instanceof List) dealingWithList = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package org.openg2p.reporting.kafka.connect.transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.field.FieldSyntaxVersion;
import org.apache.kafka.connect.transforms.field.SingleFieldPath;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.openg2p.reporting.kafka.connect.transforms.util.StructMerger;

public abstract class ExtractFieldAdv<R extends ConnectRecord<R>> extends ExtractField<R>{

public static final String FIELD_CONFIG = "field";
public static final String ARRAY_MERGE_STRATEGY_CONFIG = "array.merge.strategy";
public static final String MAP_MERGE_STRATEGY_CONFIG = "map.merge.strategy";

public static final String PURPOSE = "field extraction";

public static final char FIELD_SPLITTER = ',';
public static final String RENAME_FIELD_OPERATOR = "->";

public static final ConfigDef CONFIG_DEF = new ConfigDef().define(
FIELD_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.MEDIUM,
"Field name to extract."
).define(
ARRAY_MERGE_STRATEGY_CONFIG,
ConfigDef.Type.STRING,
"concat",
ConfigDef.Importance.HIGH,
"Array merge strategy: Available configs: \"concat\", \"replace\"."
).define(
MAP_MERGE_STRATEGY_CONFIG,
ConfigDef.Type.STRING,
"deep",
ConfigDef.Importance.HIGH,
"Array merge strategy: Available configs: \"deep\", \"replace\"."
);

private List<Map.Entry<String, String>> outputPaths;
private List<Map.Entry<SingleFieldPath, String>> fieldPaths;
private StructMerger.MapMergeStrategy mapMergeStrategy;
private StructMerger.ArrayMergeStrategy arrMergeStrategy;

@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
outputPaths = this.getOutputPaths(config.getString(FIELD_CONFIG));
mapMergeStrategy = StructMerger.MapMergeStrategy.valueOf(config.getString(MAP_MERGE_STRATEGY_CONFIG));
arrMergeStrategy = StructMerger.ArrayMergeStrategy.valueOf(config.getString(ARRAY_MERGE_STRATEGY_CONFIG));

fieldPaths = new ArrayList<>();
for (Map.Entry<String, String> origPath: outputPaths){
fieldPaths.add(
new AbstractMap.SimpleEntry<>(
// FieldSyntaxVersion.V2 is mandatory here.
new SingleFieldPath(origPath.getKey(), FieldSyntaxVersion.V2),
origPath.getValue()
)
);
}
}

@Override
public R apply(R record) {
final Schema schema = operatingSchema(record);
if (schema == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}

public R applySchemaless(R record) {
Map<String, Object> value = Requirements.requireMapOrNull(operatingValue(record), PURPOSE);
Object res = null;
for (Map.Entry<SingleFieldPath, String> fieldPath: fieldPaths){
Object toBeMerged = getMapOrObject(value, fieldPath.getKey(), fieldPath.getValue());
if (res == null){
res = toBeMerged;
} else if (res instanceof Map && toBeMerged instanceof Map){
res = StructMerger.mergeMaps((Map<String, Object>)res, (Map<String, Object>)toBeMerged, mapMergeStrategy, arrMergeStrategy);
} else {
throw new DataException("ExtractNewFieldAdv: One of the maps trying to be merged is a primitive.");
}
}
return newRecord(record, null, res);
}

public R applyWithSchema(R record) {
Struct value = Requirements.requireStructOrNull(operatingValue(record), PURPOSE);
Object res = null;
Schema resSchema = null;

for (Map.Entry<SingleFieldPath, String> fieldPath: fieldPaths){
SchemaAndValue toBeMerged = getStructOrObject(value, fieldPath.getKey(), fieldPath.getValue());
if (res == null){
res = toBeMerged.value();
resSchema = toBeMerged.schema();
} else if (res instanceof Struct && toBeMerged.value() instanceof Struct){
res = StructMerger.mergeStructs((Struct)res, (Struct)toBeMerged.value(), mapMergeStrategy, arrMergeStrategy);
resSchema = ((Struct)res).schema();
} else {
throw new DataException("ExtractNewFieldAdv: One of the structs trying to be merged is a primitive.");
}
}
return newRecord(record, resSchema, res);
}

public Object getMapOrObject(Map<String, Object> value, SingleFieldPath fieldPath, String renameField){
if (renameField != null && !renameField.isEmpty()){
Map<String, Object> res = new HashMap<>();
res.put(renameField, fieldPath.valueFrom(value));
return res;
} else {
return fieldPath.valueFrom(value);
}
}

public SchemaAndValue getStructOrObject(Struct value, SingleFieldPath fieldPath, String renameField){
Object res = fieldPath.valueFrom(value);
Schema resSchema = fieldPath.fieldFrom(value.schema()).schema();
if (renameField != null && !renameField.isEmpty()){
SchemaBuilder builder = SchemaUtil.copySchemaBasics(((Struct)res).schema());
builder.field(renameField, ((Struct)res).schema());
resSchema = builder.build();
Struct resStruct = new Struct(resSchema);
resStruct.put(renameField, res);
return new SchemaAndValue(resSchema, resStruct);
} else {
return new SchemaAndValue(resSchema, res);
}
}

public List<Map.Entry<String, String>> getOutputPaths(String originalPath){
List<Map.Entry<String, String>> res = new ArrayList<>();
String[] pathSplit;
for(String path: originalPath.split(String.valueOf(FIELD_SPLITTER))){
pathSplit = path.split(RENAME_FIELD_OPERATOR);
res.add(new AbstractMap.SimpleEntry<>(pathSplit[0], pathSplit.length > 1 ? pathSplit[1] : null));
}
return res;
}

public static class Key<R extends ConnectRecord<R>> extends ExtractFieldAdv<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}

@Override
protected Object operatingValue(R record) {
return record.key();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}

public static class Value<R extends ConnectRecord<R>> extends ExtractFieldAdv<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}

@Override
protected Object operatingValue(R record) {
return record.value();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}
}
}
Loading

0 comments on commit 9217750

Please sign in to comment.