Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ecosystem](kettle) add delete mode for kettle plugin #43820

Merged
merged 6 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
.setFormatMeta(data.formatMeta)
.setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT))
.setLogChannelInterface(log)
.setDeletable(options.isDeletable())
.build();
}

Expand Down Expand Up @@ -120,6 +121,8 @@ private void closeOutput() throws Exception {
public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (DorisStreamLoaderMeta) smi;
data = (DorisStreamLoaderData) sdi;
logDebug("Initializing step with meta : " + meta.toString());

if (super.init(smi, sdi)){
Properties streamHeaders = new Properties();
String streamLoadProp = meta.getStreamLoadProp();
Expand All @@ -141,7 +144,10 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
.withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes())
.withBufferFlushMaxRows(meta.getBufferFlushMaxRows())
.withMaxRetries(meta.getMaxRetries())
.withStreamLoadProp(streamHeaders).build();
.withStreamLoadProp(streamHeaders)
.withDeletable(meta.isDeletable()).build();

logDetailed("Initializing step with options: " + options.toString());
streamLoad = new DorisBatchStreamLoad(options, log);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
description = "BaseStep.TypeTooltipDesc.DorisStreamLoader",
categoryDescription = "i18n:org.pentaho.di.trans.step:BaseStep.Category.Bulk",
image = "doris.svg",
documentationUrl = "https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual/",
documentationUrl = "https://doris.apache.org/docs/dev/ecosystem/kettle/",
i18nPackageName = "org.pentaho.di.trans.steps.dorisstreamloader" )
@InjectionSupported( localizationPrefix = "DorisStreamLoader.Injection.", groups = { "FIELDS" } )
public class DorisStreamLoaderMeta extends BaseStepMeta implements StepMetaInterface {
Expand Down Expand Up @@ -84,6 +84,8 @@ public class DorisStreamLoaderMeta extends BaseStepMeta implements StepMetaInter

private int maxRetries;

private boolean deletable;

/** Field name of the target table */
@Injection( name = "FIELD_TABLE", group = "FIELDS" )
private String[] fieldTable;
Expand Down Expand Up @@ -111,8 +113,8 @@ private void readData( Node stepnode, List<? extends SharedObjectInterface> data
bufferFlushMaxRows = Long.valueOf(XMLHandler.getTagValue(stepnode, "bufferFlushMaxRows"));
bufferFlushMaxBytes = Long.valueOf(XMLHandler.getTagValue(stepnode, "bufferFlushMaxBytes"));
maxRetries = Integer.valueOf(XMLHandler.getTagValue(stepnode, "maxRetries"));

streamLoadProp = XMLHandler.getTagValue(stepnode, "streamLoadProp");
deletable = "Y".equalsIgnoreCase(XMLHandler.getTagValue(stepnode, "deletable"));

// Field data mapping
int nrvalues = XMLHandler.countNodes(stepnode, "mapping");
Expand Down Expand Up @@ -145,7 +147,7 @@ public void setDefault() {
bufferFlushMaxBytes = 10 * 1024 * 1024;
maxRetries = 3;
streamLoadProp = "format:json;read_json_by_line:true";

deletable = false;
allocate(0);
}

Expand All @@ -161,6 +163,7 @@ public String getXML() {
retval.append(" ").append(XMLHandler.addTagValue("bufferFlushMaxBytes", bufferFlushMaxBytes));
retval.append(" ").append(XMLHandler.addTagValue("maxRetries", maxRetries));
retval.append(" ").append(XMLHandler.addTagValue("streamLoadProp", streamLoadProp));
retval.append(" ").append(XMLHandler.addTagValue("deletable", deletable));

for (int i = 0; i < fieldTable.length; i++) {
retval.append(" <mapping>").append(Const.CR);
Expand Down Expand Up @@ -189,6 +192,7 @@ public void readRep( Repository rep, IMetaStore metaStore, ObjectId id_step, Lis
maxRetries = Integer.valueOf(rep.getStepAttributeString(id_step, "maxRetries"));

streamLoadProp = rep.getStepAttributeString(id_step, "streamLoadProp");
deletable = rep.getStepAttributeBoolean(id_step, "deletable");
int nrvalues = rep.countNrStepAttributes(id_step, "stream_name");
allocate(nrvalues);

Expand Down Expand Up @@ -217,6 +221,7 @@ public void saveRep( Repository rep, IMetaStore metaStore, ObjectId id_transform
rep.saveStepAttribute(id_transformation, id_step, "bufferFlushMaxRows", bufferFlushMaxRows);
rep.saveStepAttribute(id_transformation, id_step, "bufferFlushMaxBytes", bufferFlushMaxBytes);
rep.saveStepAttribute(id_transformation, id_step, "maxRetries", maxRetries);
rep.saveStepAttribute(id_transformation, id_step, "deletable", deletable);

for (int i = 0; i < fieldTable.length; i++) {
rep.saveStepAttribute(id_transformation, id_step, i, "stream_name", fieldTable[i]);
Expand Down Expand Up @@ -328,7 +333,15 @@ public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

public String[] getFieldTable() {
public boolean isDeletable() {
return deletable;
}

public void setDeletable(boolean deletable) {
this.deletable = deletable;
}

public String[] getFieldTable() {
return fieldTable;
}

Expand Down Expand Up @@ -361,6 +374,7 @@ public String toString() {
", bufferFlushMaxRows=" + bufferFlushMaxRows +
", bufferFlushMaxBytes=" + bufferFlushMaxBytes +
", maxRetries=" + maxRetries +
", deletable=" + deletable +
", fieldTable=" + Arrays.toString(fieldTable) +
", fieldStream=" + Arrays.toString(fieldStream) +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.pentaho.di.trans.steps.dorisstreamloader.load;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang.StringUtils;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -184,7 +185,7 @@ public boolean tryHttpConnection(String host) {
* @param record
* @throws IOException
*/
public synchronized void writeRecord(String database, String table, byte[] record) {
public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);
BatchRecordBuffer buffer =
Expand All @@ -203,6 +204,7 @@ public synchronized void writeRecord(String database, String table, byte[] recor
lock.lock();
try {
while (currentCacheBytes.get() >= maxBlockedBytes) {
checkFlushException();
log.logDetailed(
"Cache full, waiting for flush, currentBytes: " + currentCacheBytes.get()
+ ", maxBlockedBytes: " + maxBlockedBytes);
Expand Down Expand Up @@ -231,19 +233,15 @@ public synchronized void writeRecord(String database, String table, byte[] recor
}
}

public synchronized boolean bufferFullFlush(String bufferKey) {
public boolean bufferFullFlush(String bufferKey) {
return doFlush(bufferKey, false, true);
}

public synchronized boolean intervalFlush() {
return doFlush(null, false, false);
}

/**
* Force flush and wait for success.
* @return
*/
public synchronized boolean forceFlush() {
public boolean forceFlush() {
return doFlush(null, true, false);
}

Expand Down Expand Up @@ -416,11 +414,6 @@ public void run() {
load(bf.getLabelName(), bf);
}
}

if (flushQueue.size() < flushQueueSize) {
// Avoid waiting for 2 rounds of intervalMs
doFlush(null, false, false);
}
} catch (Exception e) {
log.logError("worker running error", e);
exception.set(e);
Expand Down Expand Up @@ -448,6 +441,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
.setLabel(label)
.addCommonHeader()
.setEntity(entity)
.addHiddenColumns(options.isDeletable())
.addProperties(options.getStreamLoadProp());

if (enableGzCompress) {
Expand Down Expand Up @@ -488,11 +482,22 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
putBuilder.setLabel(label + "_" + retry);
reason = respContent.getMessage();
} else {
String errMsg =
String errMsg = null;
if (StringUtils.isBlank(respContent.getMessage())
&& StringUtils.isBlank(respContent.getErrorURL())) {
// sometimes stream load will not return message
errMsg =
String.format(
"stream load error, response is %s",
loadResult);
throw new DorisRuntimeException(errMsg);
} else {
errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
}
throw new DorisRuntimeException(errMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public class DorisOptions {
private long bufferFlushMaxBytes;
private Properties streamLoadProp;
private int maxRetries;
private boolean deletable;

public DorisOptions(String fenodes, String username, String password, String database, String table, long bufferFlushMaxRows, long bufferFlushMaxBytes, Properties streamLoadProp, int maxRetries) {
public DorisOptions(String fenodes, String username, String password, String database, String table, long bufferFlushMaxRows, long bufferFlushMaxBytes, Properties streamLoadProp, int maxRetries, boolean deletable) {
this.fenodes = fenodes;
this.username = username;
this.password = password;
Expand All @@ -46,6 +47,7 @@ public DorisOptions(String fenodes, String username, String password, String dat
this.bufferFlushMaxBytes = bufferFlushMaxBytes;
this.streamLoadProp = streamLoadProp;
this.maxRetries = maxRetries;
this.deletable = deletable;
}

public String getFenodes() {
Expand Down Expand Up @@ -84,6 +86,26 @@ public int getMaxRetries() {
return maxRetries;
}

public boolean isDeletable() {
return deletable;
}

@Override
public String toString() {
return "DorisOptions{" +
"fenodes='" + fenodes + '\'' +
", username='" + username + '\'' +
", password='" + password + '\'' +
", database='" + database + '\'' +
", table='" + table + '\'' +
", bufferFlushMaxRows=" + bufferFlushMaxRows +
", bufferFlushMaxBytes=" + bufferFlushMaxBytes +
", streamLoadProp=" + streamLoadProp +
", maxRetries=" + maxRetries +
", deletable=" + deletable +
'}';
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -98,6 +120,7 @@ public static class Builder {
private long bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES;
private int maxRetries = DEFAULT_MAX_RETRIES;
private Properties streamLoadProp = new Properties();
private boolean deletable = false;

public Builder withFenodes(String fenodes) {
this.fenodes = fenodes;
Expand Down Expand Up @@ -144,6 +167,11 @@ public Builder withMaxRetries(int maxRetries) {
return this;
}

public Builder withDeletable(boolean deletable) {
this.deletable = deletable;
return this;
}

public DorisOptions build() {
Preconditions.checkArgument(fenodes != null, "Fenodes must not be null");
Preconditions.checkArgument(username != null, "Username must not be null");
Expand All @@ -153,7 +181,7 @@ public DorisOptions build() {
Preconditions.checkArgument(bufferFlushMaxRows >= 10000, "BufferFlushMaxRows must be greater than 10000");
Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 * 1024, "BufferFlushMaxBytes must be greater than 10MB");
Preconditions.checkArgument(maxRetries >= 0, "MaxRetries must be greater than 0");
return new DorisOptions(fenodes, username, password, database, table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries);
return new DorisOptions(fenodes, username, password, database, table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries, deletable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.StringJoiner;

import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.CSV;
import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.DORIS_DELETE_SIGN;
import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.JSON;
import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.NULL_VALUE;

Expand All @@ -47,13 +48,15 @@ public class DorisRecordSerializer {
private final String fieldDelimiter;
private final ValueMetaInterface[] formatMeta;
private LogChannelInterface log;
private final boolean deletable;

private DorisRecordSerializer(
String[] fieldNames,
ValueMetaInterface[] formatMeta,
String type,
String fieldDelimiter,
LogChannelInterface log) {
LogChannelInterface log,
boolean deletable) {
this.fieldNames = fieldNames;
this.type = type;
this.fieldDelimiter = fieldDelimiter;
Expand All @@ -62,6 +65,7 @@ private DorisRecordSerializer(
}
this.formatMeta = formatMeta;
this.log = log;
this.deletable = deletable;
}


Expand Down Expand Up @@ -89,6 +93,10 @@ public String buildJsonString(Object[] record, int maxIndex) throws IOException,
valueMap.put(fieldNames[fieldIndex], value);
fieldIndex++;
}
if (deletable) {
// All load data will be deleted
valueMap.put(DORIS_DELETE_SIGN, "1");
}
return objectMapper.writeValueAsString(valueMap);
}

Expand All @@ -101,6 +109,10 @@ public String buildCSVString(Object[] record, int maxIndex) throws IOException,
joiner.add(value);
fieldIndex++;
}
if (deletable) {
// All load data will be deleted
joiner.add("1");
}
return joiner.toString();
}

Expand Down Expand Up @@ -147,6 +159,7 @@ public static class Builder {
private String type;
private String fieldDelimiter;
private LogChannelInterface log;
private boolean deletable;

public Builder setFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
Expand All @@ -173,14 +186,19 @@ public Builder setLogChannelInterface(LogChannelInterface log) {
return this;
}

public Builder setDeletable(boolean deletable) {
this.deletable = deletable;
return this;
}

public DorisRecordSerializer build() {
Preconditions.checkState(
CSV.equals(type) && fieldDelimiter != null
|| JSON.equals(type));
Preconditions.checkNotNull(formatMeta);
Preconditions.checkNotNull(fieldNames);

return new DorisRecordSerializer(fieldNames, formatMeta, type, fieldDelimiter, log);
return new DorisRecordSerializer(fieldNames, formatMeta, type, fieldDelimiter, log, deletable);
}
}
}
Loading
Loading