Skip to content

Commit

Permalink
Merge pull request #40 from lalithkota/develop
Browse files Browse the repository at this point in the history
SMT InsertBack: Retry on conflict added
  • Loading branch information
lalithkota authored Oct 24, 2024
2 parents 66eb8f8 + f036896 commit 7cbecdb
Showing 1 changed file with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactoryBuilder;
import org.apache.hc.client5.http.ssl.TrustAllStrategy;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.ssl.SSLContextBuilder;
Expand All @@ -40,7 +41,7 @@ public abstract class DynamicNewFieldInsertBack<R extends ConnectRecord<R>> exte
public static class Key<R extends ConnectRecord<R>> extends DynamicNewFieldInsertBack<R>{}
public static class Value<R extends ConnectRecord<R>> extends DynamicNewFieldInsertBack<R>{}

private static final Logger logger = LoggerFactory.getLogger(ApplyJq.class);
private static final Logger logger = LoggerFactory.getLogger(DynamicNewFieldInsertBack.class);

public abstract class Config{
String type;
Expand All @@ -67,6 +68,7 @@ public class ESQueryConfig extends Config{
boolean esSecurity;
String esUsername;
String esPassword;
int esRetryOnConflict;

ESQueryConfig(
String type,
Expand All @@ -77,7 +79,8 @@ public class ESQueryConfig extends Config{
String esIndex,
String esSecurity,
String esUsername,
String esPassword
String esPassword,
int esRetryOnConflict
) {
super(type,idExpr,conditionExpr,valueExpr);

Expand All @@ -86,6 +89,7 @@ public class ESQueryConfig extends Config{
this.esSecurity = false;
this.esUsername = esUsername;
this.esPassword = esPassword;
this.esRetryOnConflict = esRetryOnConflict;

HttpClientBuilder hClientBuilder = HttpClients.custom();
if(esSecurity!=null && !esSecurity.isEmpty() && "true".equals(esSecurity)) {
Expand Down Expand Up @@ -134,7 +138,7 @@ void insertBack(Object input){
logger.error("DynamicNewFieldInsertBack: could not render value expr.", e);
return;
}
HttpPost hPost = new HttpPost(esUrl + "/" + esIndex + "/_update/" + idValue);
HttpPost hPost = new HttpPost(esUrl + "/" + esIndex + "/_update/" + idValue + "?retry_on_conflict=" + String.valueOf(esRetryOnConflict));
hPost.setHeader("Content-type", "application/json");
if(esSecurity) {
hPost.setHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((esUsername + ":" + esPassword).getBytes()));
Expand All @@ -145,12 +149,12 @@ void insertBack(Object input){
requestJson.set("doc_as_upsert", BooleanNode.TRUE);
hPost.setEntity(new StringEntity(requestJson.toString()));
try(CloseableHttpResponse hResponse = hClient.execute(hPost)){
if(hResponse.getCode() >= 200 && hResponse.getCode() < 300){
logger.error("DynamicNewFieldInsertBack: Error occured while ES query. " + EntityUtils.toString(hResponse.getEntity()));
HttpEntity hResponseEntity = hResponse.getEntity();
if(hResponse.getCode() < 200 || hResponse.getCode() >= 400){
logger.error("Error occured while ES query. " + EntityUtils.toString(hResponseEntity));
}
} catch(Exception e) {
logger.error("DynamicNewFieldInsertBack: ES connection issues.", e);
return;
logger.error("ES connection issues.", e);
}
}
}
Expand All @@ -176,6 +180,7 @@ void close(){
public static final String ES_SECURITY_ENABLED_CONFIG = "es.security.enabled";
public static final String ES_USERNAME_CONFIG = "es.username";
public static final String ES_PASSWORD_CONFIG = "es.password";
public static final String ES_RETRY_ON_CONFLICT_CONFIG = "es.retry.on.conflict";

private Config config;
private boolean stopRecordAfterInsert;
Expand All @@ -191,7 +196,8 @@ void close(){
.define(ES_INDEX_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Name of the index in ES to search")
.define(ES_SECURITY_ENABLED_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Is Elasticsearch security enabled?")
.define(ES_USERNAME_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Username")
.define(ES_PASSWORD_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Password");
.define(ES_PASSWORD_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Password")
.define(ES_RETRY_ON_CONFLICT_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.HIGH, "Elasticsearch Update API retry_on_conflict parameter.");


@Override
Expand Down Expand Up @@ -228,6 +234,7 @@ public void configure(Map<String, ?> configs) {
String esSecurity = absconf.getString(ES_SECURITY_ENABLED_CONFIG);
String esUsername = absconf.getString(ES_USERNAME_CONFIG);
String esPassword = absconf.getString(ES_PASSWORD_CONFIG);
int esRetryOnConflict = absconf.getInt(ES_RETRY_ON_CONFLICT_CONFIG);

if(esUrl.isEmpty() || esIndex.isEmpty()){
throw new ConfigException("One of required transform Elasticsearch config fields not set. Required Elasticsearch fields in tranform: " + ES_URL_CONFIG + " ," + ES_INDEX_CONFIG);
Expand All @@ -243,7 +250,8 @@ public void configure(Map<String, ?> configs) {
esIndex,
esSecurity,
esUsername,
esPassword
esPassword,
esRetryOnConflict
);
}
catch(Exception e){
Expand Down

0 comments on commit 7cbecdb

Please sign in to comment.