Skip to content

Commit

Permalink
Merge pull request #27 from lalithkota/develop
Browse files Browse the repository at this point in the history
Reporting Changes - Revision History
  • Loading branch information
lalithkota authored Sep 3, 2024
2 parents fdc351b + da50707 commit 441c370
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 38 deletions.
20 changes: 16 additions & 4 deletions charts/reporting-init/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,25 @@ startUpCommand: |-
debezium_connector_name=$(echo $debezium_connector_config | jq -cr '.name')
debezium_new_connectors_list+=("$debezium_connector_name")
if_exists=$(contains $debezium_existing_connectors_list $debezium_connector_name)
debez_wait_secs=$(echo $debezium_connector_config | jq -cr '.wait_after_init_secs // empty')
debezium_connector_config=$(echo $debezium_connector_config | jq -cr 'del(.wait_after_init_secs)')
if_exists=$(contains "$debezium_existing_connectors_list" "$debezium_connector_name")
if [ -z "$if_exists" ]; then
echo "==> Creating new Connector - $debezium_connector_name."
curl -s -XPOST -H 'Content-Type: application/json' $DEBEZIUM_URL/connectors -d "$debezium_connector_config" | jq
else
echo "==> Connector - $debezium_connector_name - already exists. Updating config."
curl -s -XPUT -H 'Content-Type: application/json' $DEBEZIUM_URL/connectors/${debezium_connector_name}/config -d "$(echo $debezium_connector_config | jq -cr '.config')" | jq
fi
if [ -n "$debez_wait_secs" ]; then
sleep $debez_wait_secs
fi
done
echo "==> Starting deletion process for old debezium connectors."
debezium_new_connectors_list=${debezium_new_connectors_list[@]}
for connector_to_delete in $debezium_existing_connectors_list; do
if_exists=$(contains $debezium_new_connectors_list $connector_to_delete)
if_exists=$(contains "$debezium_new_connectors_list" "$connector_to_delete")
if [ -z "$if_exists" ]; then
echo "==> Deleting old connector - $connector_to_delete."
curl -s -XDELETE $DEBEZIUM_URL/connectors/${connector_to_delete} | jq
Expand All @@ -241,19 +247,25 @@ startUpCommand: |-
os_connector_name=$(echo $os_connector_config | jq -cr '.name')
os_new_connectors_list+=("$os_connector_name")
if_exists=$(contains $os_existing_connectors_list $os_connector_name)
os_wait_secs=$(echo $os_connector_config | jq -cr '.wait_after_init_secs // empty')
os_connector_config=$(echo $os_connector_config | jq -cr 'del(.wait_after_init_secs)')
if_exists=$(contains "$os_existing_connectors_list" "$os_connector_name")
if [ -z "$if_exists" ]; then
echo "==> Creating new Connector - $os_connector_name."
curl -s -XPOST -H 'Content-Type: application/json' $OS_KAFKA_CONNECTOR_URL/connectors -d "$os_connector_config" | jq
else
echo "==> Connector - $os_connector_name - already exists. Updating config."
curl -s -XPUT -H 'Content-Type: application/json' $OS_KAFKA_CONNECTOR_URL/connectors/${os_connector_name}/config -d "$(echo $os_connector_config | jq -cr '.config')" | jq
fi
if [ -n "$os_wait_secs" ]; then
sleep $os_wait_secs
fi
done
echo "==> Starting deletion process for old opensearch-connectors."
os_new_connectors_list=${os_new_connectors_list[@]}
for connector_to_delete in $os_existing_connectors_list; do
if_exists=$(contains $os_new_connectors_list $connector_to_delete)
if_exists=$(contains "$os_new_connectors_list" "$connector_to_delete")
if [ -z "$if_exists" ]; then
echo "==> Deleting old connector - $connector_to_delete."
curl -s -XDELETE $OS_KAFKA_CONNECTOR_URL/connectors/${connector_to_delete} | jq
Expand Down
28 changes: 28 additions & 0 deletions opensearch-kafka-connector/kafka-connect-transforms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.8.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
Expand Down Expand Up @@ -87,6 +89,32 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.6.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/**</exclude>
<exclude>META-INF</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,30 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.CredentialsStore;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactoryBuilder;
import org.apache.hc.client5.http.ssl.TrustAllStrategy;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.ssl.SSLContextBuilder;

import org.json.JSONObject;
import org.json.JSONException;

import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.security.NoSuchAlgorithmException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -69,6 +74,7 @@ public class ESQueryConfig extends Config{
String[] esInputFields;
String[] esOutputFields;
String esInputQueryAddKeyword;
String esQuerySort;

// RestHighLevelClient esClient;
CloseableHttpClient hClient;
Expand All @@ -84,6 +90,7 @@ public class ESQueryConfig extends Config{
String[] esInputFields,
String[] esOutputFields,
String esInputQueryAddKeyword,
String esQuerySort,
String esSecurity,
String esUsername,
String esPassword
Expand All @@ -94,18 +101,34 @@ public class ESQueryConfig extends Config{
this.esIndex=esIndex;
this.esInputFields=esInputFields;
this.esOutputFields=esOutputFields;
this.esQuerySort=esQuerySort;
this.esInputQueryAddKeyword=esInputQueryAddKeyword;

// esClient = new RestHighLevelClient(RestClient.builder(HttpHost.create(this.esUrl)));
HttpClientBuilder hClientBuilder = HttpClients.custom();
if(esSecurity!=null && !esSecurity.isEmpty() && "true".equals(esSecurity)) {
CredentialsStore esCredStore = new BasicCredentialsProvider();
esCredStore.setCredentials(new AuthScope(null, -1), new UsernamePasswordCredentials(esUsername, esPassword.toCharArray()));
hClientBuilder.setDefaultCredentialsProvider(esCredStore);
try{
hClientBuilder.setConnectionManager(
PoolingHttpClientConnectionManagerBuilder.create().setSSLSocketFactory(
SSLConnectionSocketFactoryBuilder.create().setSslContext(
SSLContextBuilder.create().loadTrustMaterial(
TrustAllStrategy.INSTANCE
).build()
).setHostnameVerifier(
NoopHostnameVerifier.INSTANCE
).build()
).build()
);
} catch(NoSuchAlgorithmException | KeyManagementException | KeyStoreException e ){
throw new ConfigException("Cannot Initialize ES Httpclient for security", e);
}
}
hClient = hClientBuilder.build();
hGet = new HttpGet(this.esUrl+"/"+this.esIndex+"/_search");
hGet.setHeader("Content-type", "application/json");
if(esSecurity!=null && !esSecurity.isEmpty() && "true".equals(esSecurity)) {
hGet.setHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((esUsername + ":" + esPassword).getBytes()));
}
}

List<Object> makeQuery(List<Object> inputValues){
Expand All @@ -132,7 +155,7 @@ else if(inputValues.size()==0){
requestJson += (value instanceof Number || value instanceof Boolean) ? value : "\"" + value + "\"";
requestJson += "}}";
}
requestJson += "]}}}";
requestJson += "]}}, \"sort\":" + esQuerySort + "}";

hGet.setEntity(new StringEntity(requestJson));

Expand Down Expand Up @@ -241,6 +264,7 @@ void close(){
public static final String ES_INPUT_FIELDS_CONFIG = "es.input.fields";
public static final String ES_OUTPUT_FIELDS_CONFIG = "es.output.fields";
public static final String ES_INPUT_QUERY_ADD_KEYWORD = "es.input.query.add.keyword";
public static final String ES_QUERY_SORT = "es.query.sort";
public static final String ES_SECURITY_ENABLED_CONFIG = "es.security.enabled";
public static final String ES_USERNAME_CONFIG = "es.username";
public static final String ES_PASSWORD_CONFIG = "es.password";
Expand All @@ -259,6 +283,7 @@ void close(){
.define(ES_INPUT_FIELDS_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "ES documents with given input field will be searched for. This field tells the key name")
.define(ES_OUTPUT_FIELDS_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "If a successful match is made with the above input field+value, the values of this output fields from the same document will be returned")
.define(ES_INPUT_QUERY_ADD_KEYWORD, ConfigDef.Type.STRING, "false", ConfigDef.Importance.HIGH, "Should add the .keyword suffix while querying ES?")
.define(ES_QUERY_SORT, ConfigDef.Type.STRING, "[{\"@timestamp_gen\": {\"order\": \"desc\"}}]", ConfigDef.Importance.HIGH, "This will be added under \"sort\" section in the ES Query.")

.define(ES_SECURITY_ENABLED_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Is Elasticsearch security enabled?")
.define(ES_USERNAME_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Username")
Expand Down Expand Up @@ -296,6 +321,7 @@ public void configure(Map<String, ?> configs) {
String esInputFieldBulk = absconf.getString(ES_INPUT_FIELDS_CONFIG);
String esOutputFieldBulk = absconf.getString(ES_OUTPUT_FIELDS_CONFIG);
String esInputQueryAddKeyword = absconf.getString(ES_INPUT_QUERY_ADD_KEYWORD);
String esQuerySort = absconf.getString(ES_QUERY_SORT);

if(esUrl.isEmpty() || esIndex.isEmpty() || esInputFieldBulk.isEmpty() || esOutputFieldBulk.isEmpty()){
throw new ConfigException("One of required transform Elasticsearch config fields not set. Required Elasticsearch fields in tranform: " + ES_URL_CONFIG + " ," + ES_INDEX_CONFIG + " ," + ES_INPUT_FIELDS_CONFIG + " ," + ES_OUTPUT_FIELDS_CONFIG);
Expand All @@ -319,6 +345,7 @@ public void configure(Map<String, ?> configs) {
esInputFields,
esOutputFields,
esInputQueryAddKeyword,
esQuerySort,
esSecurity,
esUsername,
esPassword
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,11 @@ private String convertTimestamp(Object timestamp, Config.InputType inType, Simpl
}

String output;
long tsLong;
long tsLong = ((Number)timestamp).longValue();;

if(inType == Config.InputType.milli_sec){
tsLong = (long)timestamp;
output = format.format(new Date(tsLong));
} else if(inType == Config.InputType.micro_sec){
tsLong = (long)timestamp;
output = format.format(new Date(tsLong/1000));
// // the following is if there are more micro digits... right now ignoring
// if((tsLong%1000) != 0) {
Expand All @@ -198,7 +196,6 @@ private String convertTimestamp(Object timestamp, Config.InputType inType, Simpl
// output += "Z";
// }
} else if(inType == Config.InputType.days_epoch){
tsLong = (int)timestamp;
output = format.format(new Date(tsLong*24*60*60*1000));
} else{
// it should never come here
Expand Down
4 changes: 2 additions & 2 deletions scripts/pbms/opensearch-connectors/10.g2p_program.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program",
"key.ignore": "false",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",

"behavior.on.null.values": "delete",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

Expand Down
4 changes: 2 additions & 2 deletions scripts/pbms/opensearch-connectors/20.res_partner.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.res_partner",
"key.ignore": "false",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",

"behavior.on.null.values": "delete",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_membership",
"key.ignore": "false",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",

"behavior.on.null.values": "delete",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_registrant_info",
"key.ignore": "false",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",

"behavior.on.null.values": "delete",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_assessment",
"key.ignore": "false",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",

"behavior.on.null.values": "delete",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_membership_duplicate",
"key.ignore": "false",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",

"behavior.on.null.values": "delete",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

Expand Down
4 changes: 2 additions & 2 deletions scripts/pbms/opensearch-connectors/50.g2p_program_fund.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_program_fund",
"key.ignore": "false",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",

"behavior.on.null.values": "delete",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

Expand Down
4 changes: 2 additions & 2 deletions scripts/pbms/opensearch-connectors/60.g2p_entitlement.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.g2p_entitlement",
"key.ignore": "false",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",

"behavior.on.null.values": "delete",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "warn",
"behavior.on.version.conflict": "warn",

Expand Down
Loading

0 comments on commit 441c370

Please sign in to comment.