Skip to content

Commit

Permalink
[Enhancement] (nereids)implement showSyncJobCommand in nereids
Browse files Browse the repository at this point in the history
  • Loading branch information
msridhar78 committed Nov 30, 2024
1 parent 42a7734 commit 2316a16
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ supportedShowStatement
| SHOW DELETE ((FROM | IN) database=multipartIdentifier)? #showDelete
| SHOW ALL? GRANTS #showGrants
| SHOW GRANTS FOR userIdentify #showGrantsForUser
| SHOW SYNC JOB ((FROM | IN) database=multipartIdentifier)? #showSyncJob
| SHOW LOAD PROFILE loadIdPath=STRING_LITERAL #showLoadProfile
| SHOW VIEW
(FROM |IN) tableName=multipartIdentifier
Expand Down Expand Up @@ -345,7 +346,6 @@ unsupportedShowStatement
| SHOW QUERY PROFILE queryIdPath=STRING_LITERAL #showQueryProfile
| SHOW CACHE HOTSPOT tablePath=STRING_LITERAL #showCacheHotSpot
| SHOW ENCRYPTKEYS ((FROM | IN) database=multipartIdentifier)? wildWhere? #showEncryptKeys
| SHOW SYNC JOB ((FROM | IN) database=multipartIdentifier)? #showSyncJob
| SHOW TABLE CREATION ((FROM | IN) database=multipartIdentifier)? wildWhere? #showTableCreation
| SHOW CATALOG RECYCLE BIN wildWhere? #showCatalogRecycleBin
| SHOW QUERY STATS ((FOR database=identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
import org.apache.doris.nereids.DorisParser.ShowSmallFilesContext;
import org.apache.doris.nereids.DorisParser.ShowSqlBlockRuleContext;
import org.apache.doris.nereids.DorisParser.ShowStorageEnginesContext;
import org.apache.doris.nereids.DorisParser.ShowSyncJobContext;
import org.apache.doris.nereids.DorisParser.ShowTableIdContext;
import org.apache.doris.nereids.DorisParser.ShowTabletsBelongContext;
import org.apache.doris.nereids.DorisParser.ShowTriggersContext;
Expand Down Expand Up @@ -536,6 +537,7 @@
import org.apache.doris.nereids.trees.plans.commands.ShowSmallFilesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowStorageEnginesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowSyncJobCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTableIdCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTabletsBelongCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTriggersCommand;
Expand Down Expand Up @@ -4680,6 +4682,16 @@ public LogicalPlan visitDropEncryptkey(DropEncryptkeyContext ctx) {
return new DropEncryptkeyCommand(new EncryptKeyName(nameParts), ctx.EXISTS() != null);
}

@Override
public LogicalPlan visitShowSyncJob(ShowSyncJobContext ctx) {
String databaseName = null;
if (ctx.multipartIdentifier() != null) {
List<String> databaseParts = visitMultipartIdentifier(ctx.multipartIdentifier());
databaseName = databaseParts.get(0);
}
return new ShowSyncJobCommand(databaseName);
}

@Override
public LogicalPlan visitDropFile(DropFileContext ctx) {
String dbName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public enum PlanType {
SHOW_ROLE_COMMAND,
SHOW_SMALL_FILES_COMMAND,
SHOW_STORAGE_ENGINES_COMMAND,
SHOW_SYNC_JOB_COMMAND,
SHOW_TABLE_ID_COMMAND,
SHOW_TRIGGERS_COMMAND,
SHOW_VARIABLES_COMMAND,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Represents the command for SHOW SYNC JOB.
*/
public class ShowSyncJobCommand extends ShowCommand {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId").add("JobName").add("Type").add("State").add("Channel").add("Status")
.add("JobConfig").add("CreateTime").add("LastStartTime").add("LastStopTime").add("FinishTime").add("Msg")
.build();

private String databaseName;

public ShowSyncJobCommand(String databaseName) {
super(PlanType.SHOW_SYNC_JOB_COMMAND);
this.databaseName = databaseName;
}

private void validate(ConnectContext ctx) throws AnalysisException {
if (Strings.isNullOrEmpty(databaseName)) {
databaseName = ctx.getDatabase();
if (Strings.isNullOrEmpty(databaseName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
}
if (!Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME,
databaseName, PrivPredicate.SHOW)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED_ERROR,
PrivPredicate.SHOW.getPrivs().toString(), databaseName);
}
}

@Override
public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate(ctx);
Env env = Env.getCurrentEnv();
DatabaseIf db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(databaseName);

List<List<Comparable>> syncInfos = env.getSyncJobManager().getSyncJobsInfoByDbId(db.getId());
Collections.sort(syncInfos, new ListComparator<List<Comparable>>(0));

List<List<String>> rows = Lists.newArrayList();
for (List<Comparable> syncInfo : syncInfos) {
List<String> row = new ArrayList<String>(syncInfo.size());

for (Comparable element : syncInfo) {
row.add(element.toString());
}
rows.add(row);
}
return new ShowResultSet(this.getMetaData(), rows);
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitShowSyncJobCommand(this, context);
}

public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.doris.nereids.trees.plans.commands.ShowSmallFilesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowStorageEnginesCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowSyncJobCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTableIdCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTabletsBelongCommand;
import org.apache.doris.nereids.trees.plans.commands.ShowTriggersCommand;
Expand Down Expand Up @@ -252,6 +253,10 @@ default R visitCallCommand(CallCommand callCommand, C context) {
return visitCommand(callCommand, context);
}

default R visitShowSyncJobCommand(ShowSyncJobCommand showSyncJobCommand, C context) {
return visitCommand(showSyncJobCommand, context);
}

default R visitCreateProcedureCommand(CreateProcedureCommand createProcedureCommand, C context) {
return visitCommand(createProcedureCommand, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_show_sync_job_command", "query,sync_job") {
try {
sql """CREATE DATABASE IF NOT EXISTS test_db;"""

sql """CREATE TABLE IF NOT EXISTS test_db.test_tbl1 (
id INT,
name STRING
) UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1");"""

// Create the sync job
sql """CREATE SYNC test_db.job1
(
FROM mysql_db1.tbl1 INTO test_tbl1
)
FROM BINLOG
(
"type" = "canal",
"canal.server.ip" = "127.0.0.1",
"canal.server.port" = "11111",
"canal.destination" = "example",
"canal.username" = "",
"canal.password" = ""
);"""

checkNereidsExecute("SHOW SYNC JOB FROM test_db")

} catch (Exception e) {
// Log any exceptions that occur during testing
log.error("Failed to execute CREATE SYNC JOB command", e)
throw e
} finally {
// Cleanup
try_sql("STOP SYNC JOB IF EXISTS job1;")
try_sql("DROP TABLE IF EXISTS test_db.test_tbl1;")
try_sql("DROP DATABASE IF EXISTS test_db;")
}
}

0 comments on commit 2316a16

Please sign in to comment.