Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] mybatis lambdaQueryWrapper improvements #4139

Merged
merged 10 commits into from
Dec 1, 2024
6 changes: 6 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ jobs:
class: org.apache.streampark.e2e.cases.Flink117OnYarnClusterDeployTest
- name: Flink118OnYarnClusterDeployTest
class: org.apache.streampark.e2e.cases.Flink118OnYarnClusterDeployTest
- name: FlinkSQL116OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL116OnYarnTest
- name: FlinkSQL117OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL117OnYarnTest
- name: FlinkSQL118OnYarnTest
class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest
env:
RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.streampark.console.base.util;

import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.util.SystemPropertyUtils;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -90,4 +91,9 @@ public static File getAppClientDir() {
public static File getPluginDir() {
return getAppDir(PLUGINS);
}

public static boolean isHaEnable() {
return SystemPropertyUtils.getBoolean("high-availability.enable", false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

import org.apache.streampark.console.core.entity.AlertConfig;

import org.apache.ibatis.annotations.Param;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

public interface AlertConfigMapper extends BaseMapper<AlertConfig> {

AlertConfig selectAlertConfByName(@Param("alertConfig") AlertConfig alertConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ private void initConfig() {
}

private void initRegistryService() {
boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
if (enable) {
if (WebUtils.isHaEnable()) {
RegistryService registryService = SpringContextUtils.getBean(RegistryService.class);
registryService.registry();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public interface DistributedTaskService extends IService<DistributedTask> {
* @param appId Long
* @return boolean
*/
public boolean isLocalProcessing(Long appId);
boolean isLocalProcessing(Long appId);

/**
* Save Distributed Task.
Expand All @@ -70,5 +70,5 @@ public interface DistributedTaskService extends IService<DistributedTask> {
* @param autoStart boolean
* @param action It may be one of the following values: START, RESTART, REVOKE, CANCEL, ABORT
*/
public void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action);
void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,9 @@ public class AlertConfigServiceImpl extends ServiceImpl<AlertConfigMapper, Alert
@Override
public IPage<AlertConfigParams> page(Long userId, RestRequest request) {
// build query conditions
LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(userId != null, AlertConfig::getUserId, userId);

Page<AlertConfig> page = MybatisPager.getPage(request);
IPage<AlertConfig> resultPage = getBaseMapper().selectPage(page, wrapper);

IPage<AlertConfig> resultPage =
this.lambdaQuery().eq(userId != null, AlertConfig::getUserId, userId).page(page);
Page<AlertConfigParams> result = new Page<>();
if (CollectionUtils.isNotEmpty(resultPage.getRecords())) {
result.setRecords(
Expand All @@ -71,8 +68,7 @@ public IPage<AlertConfigParams> page(Long userId, RestRequest request) {

@Override
public boolean exist(AlertConfig alertConfig) {
AlertConfig confByName = this.baseMapper.selectAlertConfByName(alertConfig);
return confByName != null;
return this.lambdaQuery().eq(AlertConfig::getAlertName, alertConfig.getAlertName()).exists();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import org.apache.streampark.console.core.service.application.FlinkApplicationManageService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
Expand Down Expand Up @@ -70,9 +69,7 @@ public class FlinkApplicationBackupServiceImpl
@Override
public IPage<FlinkApplicationBackup> getPage(FlinkApplicationBackup bakParam, RestRequest request) {
Page<FlinkApplicationBackup> page = MybatisPager.getPage(request);
LambdaQueryWrapper<FlinkApplicationBackup> queryWrapper = new LambdaQueryWrapper<FlinkApplicationBackup>()
.eq(FlinkApplicationBackup::getAppId, bakParam.getAppId());
return this.baseMapper.selectPage(page, queryWrapper);
return this.lambdaQuery().eq(FlinkApplicationBackup::getAppId, bakParam.getAppId()).page(page);
}

@Override
Expand Down Expand Up @@ -135,11 +132,12 @@ public void rollback(FlinkApplicationBackup bakParam) {
public void revoke(FlinkApplication appParam) {
Page<FlinkApplicationBackup> page = new Page<>();
page.setCurrent(0).setSize(1).setSearchCount(false);
LambdaQueryWrapper<FlinkApplicationBackup> queryWrapper = new LambdaQueryWrapper<FlinkApplicationBackup>()
.eq(FlinkApplicationBackup::getAppId, appParam.getId())
.orderByDesc(FlinkApplicationBackup::getCreateTime);

Page<FlinkApplicationBackup> backUpPages = baseMapper.selectPage(page, queryWrapper);
Page<FlinkApplicationBackup> backUpPages = this.lambdaQuery().eq(
FlinkApplicationBackup::getAppId,
appParam.getId())
.orderByDesc(FlinkApplicationBackup::getCreateTime).page(page);

if (!backUpPages.getRecords().isEmpty()) {
FlinkApplicationBackup backup = backUpPages.getRecords().get(0);
String path = backup.getPath();
Expand All @@ -151,9 +149,7 @@ public void revoke(FlinkApplication appParam) {
@Override
public void remove(FlinkApplication appParam) {
try {
baseMapper.delete(
new LambdaQueryWrapper<FlinkApplicationBackup>()
.eq(FlinkApplicationBackup::getAppId, appParam.getId()));
this.lambdaUpdate().eq(FlinkApplicationBackup::getAppId, appParam.getId()).remove();
appParam
.getFsOperator()
.delete(
Expand All @@ -169,10 +165,8 @@ public void remove(FlinkApplication appParam) {

@Override
public void rollbackFlinkSql(FlinkApplication appParam, FlinkSql flinkSqlParam) {
LambdaQueryWrapper<FlinkApplicationBackup> queryWrapper = new LambdaQueryWrapper<FlinkApplicationBackup>()
.eq(FlinkApplicationBackup::getAppId, appParam.getId())
.eq(FlinkApplicationBackup::getSqlId, flinkSqlParam.getId());
FlinkApplicationBackup backUp = baseMapper.selectOne(queryWrapper);
FlinkApplicationBackup backUp = this.lambdaQuery().eq(FlinkApplicationBackup::getAppId, appParam.getId())
.eq(FlinkApplicationBackup::getSqlId, flinkSqlParam.getId()).one();
ApiAlertException.throwIfNull(
backUp, "Application backup can't be null. Rollback flink sql failed.");
// rollback config and sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.benmanes.caffeine.cache.Cache;
Expand Down Expand Up @@ -631,10 +630,8 @@ public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long> appId
if (CollectionUtils.isEmpty(appIds)) {
return new HashMap<>();
}
LambdaQueryWrapper<ApplicationBuildPipeline> queryWrapper = new LambdaQueryWrapper<ApplicationBuildPipeline>()
.in(ApplicationBuildPipeline::getAppId, appIds);

List<ApplicationBuildPipeline> appBuildPipelines = baseMapper.selectList(queryWrapper);
List<ApplicationBuildPipeline> appBuildPipelines =
this.lambdaQuery().in(ApplicationBuildPipeline::getAppId, appIds).list();
if (CollectionUtils.isEmpty(appBuildPipelines)) {
return new HashMap<>();
}
Expand All @@ -644,8 +641,7 @@ public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long> appId

@Override
public void removeByAppId(Long appId) {
baseMapper.delete(
new LambdaQueryWrapper<ApplicationBuildPipeline>().eq(ApplicationBuildPipeline::getAppId, appId));
this.lambdaUpdate().eq(ApplicationBuildPipeline::getAppId, appId).remove();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@
import org.apache.streampark.console.core.service.FlinkEffectiveService;
import org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -92,13 +89,10 @@ public synchronized void create(FlinkApplication appParam, Boolean latest) {
}

public void setLatest(Long appId, Long configId) {
LambdaUpdateWrapper<FlinkApplicationConfig> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.set(FlinkApplicationConfig::getLatest, false).eq(FlinkApplicationConfig::getAppId, appId);
this.update(updateWrapper);

updateWrapper.clear();
updateWrapper.set(FlinkApplicationConfig::getLatest, true).eq(FlinkApplicationConfig::getId, configId);
this.update(updateWrapper);
this.lambdaUpdate().set(FlinkApplicationConfig::getLatest, false).eq(FlinkApplicationConfig::getAppId, appId)
.update();
this.lambdaUpdate().set(FlinkApplicationConfig::getLatest, true).eq(FlinkApplicationConfig::getId, configId)
.update();
}

@Override
Expand Down Expand Up @@ -187,9 +181,8 @@ public void setLatestOrEffective(Boolean latest, Long configId, Long appId) {

@Override
public void toEffective(Long appId, Long configId) {
LambdaUpdateWrapper<FlinkApplicationConfig> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkApplicationConfig::getAppId, appId).set(FlinkApplicationConfig::getLatest, false);
this.update(updateWrapper);
this.lambdaUpdate().eq(FlinkApplicationConfig::getAppId, appId).set(FlinkApplicationConfig::getLatest, false)
.update();
effectiveService.saveOrUpdate(appId, EffectiveTypeEnum.CONFIG, configId);
}

Expand Down Expand Up @@ -225,21 +218,17 @@ public IPage<FlinkApplicationConfig> getPage(FlinkApplicationConfig config, Rest

@Override
public List<FlinkApplicationConfig> list(Long appId) {
LambdaQueryWrapper<FlinkApplicationConfig> queryWrapper = new LambdaQueryWrapper<FlinkApplicationConfig>()
.eq(FlinkApplicationConfig::getAppId, appId)
.orderByDesc(FlinkApplicationConfig::getVersion);

List<FlinkApplicationConfig> configList = this.baseMapper.selectList(queryWrapper);
List<FlinkApplicationConfig> configList = this.lambdaQuery().eq(FlinkApplicationConfig::getAppId, appId)
.orderByDesc(FlinkApplicationConfig::getVersion).list();
fillEffectiveField(appId, configList);
return configList;
}

@Override
public synchronized String readTemplate() {
if (flinkConfTemplate == null) {
try {
Resource resource = resourceLoader.getResource("classpath:flink-application.conf");
Scanner scanner = new Scanner(resource.getInputStream());
Resource resource = resourceLoader.getResource("classpath:flink-application.conf");
try (Scanner scanner = new Scanner(resource.getInputStream())) {
StringBuilder stringBuffer = new StringBuilder();
while (scanner.hasNextLine()) {
stringBuffer.append(scanner.nextLine()).append(System.lineSeparator());
Expand All @@ -257,8 +246,7 @@ public synchronized String readTemplate() {

@Override
public void removeByAppId(Long appId) {
baseMapper.delete(
new LambdaQueryWrapper<FlinkApplicationConfig>().eq(FlinkApplicationConfig::getAppId, appId));
this.lambdaUpdate().eq(FlinkApplicationConfig::getAppId, appId).remove();
}

private void fillEffectiveField(Long id, List<FlinkApplicationConfig> configList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -251,14 +250,12 @@ public boolean checkAlter(FlinkApplication appParam) {

@Override
public boolean existsByTeamId(Long teamId) {
return baseMapper.exists(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getTeamId, teamId));
return this.lambdaQuery().eq(FlinkApplication::getTeamId, teamId).exists();
}

@Override
public boolean existsByUserId(Long userId) {
return baseMapper.exists(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getUserId, userId));
return this.lambdaQuery().eq(FlinkApplication::getUserId, userId).exists();
}

@Override
Expand All @@ -273,17 +270,13 @@ public boolean existsRunningByClusterId(Long clusterId) {

@Override
public boolean existsByClusterId(Long clusterId) {
return baseMapper.exists(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getFlinkClusterId, clusterId));
return this.lambdaQuery().eq(FlinkApplication::getFlinkClusterId, clusterId).exists();
}

@Override
public Integer countByClusterId(Long clusterId) {
return baseMapper
.selectCount(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getFlinkClusterId,
clusterId))
.intValue();
return this.lambdaQuery().eq(FlinkApplication::getFlinkClusterId,
clusterId).count().intValue();
}

@Override
Expand All @@ -293,8 +286,7 @@ public Integer countAffectedByClusterId(Long clusterId, String dbType) {

@Override
public boolean existsByFlinkEnvId(Long flinkEnvId) {
return baseMapper.exists(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getVersionId, flinkEnvId));
return this.lambdaQuery().eq(FlinkApplication::getVersionId, flinkEnvId).exists();
}

@Override
Expand Down Expand Up @@ -434,8 +426,8 @@ public AppExistsStateEnum checkExists(FlinkApplication appParam) {
return AppExistsStateEnum.INVALID;
}

FlinkApplication application = baseMapper.selectOne(
new LambdaQueryWrapper<FlinkApplication>().eq(FlinkApplication::getJobName, jobName));
FlinkApplication application = this.lambdaQuery().eq(FlinkApplication::getJobName, jobName).one();

if (application != null && !application.getId().equals(appParamId)) {
return AppExistsStateEnum.IN_DB;
}
Expand Down
Loading
Loading