Skip to content

Commit

Permalink
SMT InsertBack: Upsert config added
Browse files Browse the repository at this point in the history
Signed-off-by: Lalith Kota <[email protected]>
  • Loading branch information
lalithkota committed Oct 24, 2024
1 parent f036896 commit db4fcc4
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class ESQueryConfig extends Config{
boolean esSecurity;
String esUsername;
String esPassword;
boolean esUpsert;
int esRetryOnConflict;

ESQueryConfig(
Expand All @@ -80,6 +81,7 @@ public class ESQueryConfig extends Config{
String esSecurity,
String esUsername,
String esPassword,
boolean esUpsert,
int esRetryOnConflict
) {
super(type,idExpr,conditionExpr,valueExpr);
Expand All @@ -89,6 +91,7 @@ public class ESQueryConfig extends Config{
this.esSecurity = false;
this.esUsername = esUsername;
this.esPassword = esPassword;
this.esUpsert = esUpsert;
this.esRetryOnConflict = esRetryOnConflict;

HttpClientBuilder hClientBuilder = HttpClients.custom();
Expand Down Expand Up @@ -146,7 +149,9 @@ void insertBack(Object input){

ObjectNode requestJson = JsonNodeFactory.instance.objectNode();
requestJson.set("doc", value);
requestJson.set("doc_as_upsert", BooleanNode.TRUE);
if(esUpsert){
requestJson.set("doc_as_upsert", BooleanNode.TRUE);
}
hPost.setEntity(new StringEntity(requestJson.toString()));
try(CloseableHttpResponse hResponse = hClient.execute(hPost)){
HttpEntity hResponseEntity = hResponse.getEntity();
Expand Down Expand Up @@ -181,6 +186,7 @@ void close(){
public static final String ES_USERNAME_CONFIG = "es.username";
public static final String ES_PASSWORD_CONFIG = "es.password";
public static final String ES_RETRY_ON_CONFLICT_CONFIG = "es.retry.on.conflict";
public static final String ES_UPSERT = "es.upsert";

private Config config;
private boolean stopRecordAfterInsert;
Expand All @@ -197,6 +203,7 @@ void close(){
.define(ES_SECURITY_ENABLED_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Is Elasticsearch security enabled?")
.define(ES_USERNAME_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Username")
.define(ES_PASSWORD_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Password")
.define(ES_UPSERT, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, "Elasticsearch Update API doc_as_upsert parameter.")
.define(ES_RETRY_ON_CONFLICT_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.HIGH, "Elasticsearch Update API retry_on_conflict parameter.");


Expand Down Expand Up @@ -235,6 +242,7 @@ public void configure(Map<String, ?> configs) {
String esUsername = absconf.getString(ES_USERNAME_CONFIG);
String esPassword = absconf.getString(ES_PASSWORD_CONFIG);
int esRetryOnConflict = absconf.getInt(ES_RETRY_ON_CONFLICT_CONFIG);
boolean esUpsert = absconf.getBoolean(ES_UPSERT);

if(esUrl.isEmpty() || esIndex.isEmpty()){
throw new ConfigException("One of required transform Elasticsearch config fields not set. Required Elasticsearch fields in tranform: " + ES_URL_CONFIG + " ," + ES_INDEX_CONFIG);
Expand All @@ -251,6 +259,7 @@ public void configure(Map<String, ?> configs) {
esSecurity,
esUsername,
esPassword,
esUpsert,
esRetryOnConflict
);
}
Expand Down

0 comments on commit db4fcc4

Please sign in to comment.