Skip to content

Commit

Permalink
Merge pull request #195 from cubefs/flink-metadata-sync
Browse files Browse the repository at this point in the history
Flink metadata sync #194
  • Loading branch information
zebozhuang authored Dec 29, 2023
2 parents 49e9e28 + 4658feb commit d431fe7
Show file tree
Hide file tree
Showing 11 changed files with 636 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 OPPO.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oppo.cloud.flink.config;

import com.oppo.cloud.common.domain.cluster.hadoop.HadoopConf;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
* hadoop config
*/
@Configuration
@ConfigurationProperties(prefix = "hadoop")
@Data
public class HadoopConfig extends HadoopConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,36 +62,4 @@ public void receiveDsTaskApplication(@Payload String message,
consumer.commitSync();
}

/**
* Consume task app metadata
*/
@KafkaListener(topics = "${spring.kafka.flinkTaskApp}", containerFactory = "kafkaListenerContainerFactory")
public void receiveFlinkTaskApp(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
log.debug(String.format("%d, From partition %d: %s", consumer.hashCode(), partition, message));
// Parsing message
FlinkTaskApp flinkTaskApp = JSON.parseObject(message, FlinkTaskApp.class);
try {
FlinkTaskAppExample flinkTaskAppExample = new FlinkTaskAppExample();
flinkTaskAppExample.createCriteria()
.andApplicationIdEqualTo(flinkTaskApp.getApplicationId());
List<FlinkTaskApp> flinkTaskApps = flinkTaskAppMapper.selectByExample(flinkTaskAppExample);
if (flinkTaskApps == null || flinkTaskApps.size() == 0) {
flinkTaskAppMapper.insert(flinkTaskApp);
} else if (flinkTaskApps.size() == 1) {
FlinkTaskApp pre = flinkTaskApps.get(0);
pre.setTaskState(flinkTaskApp.getTaskState());
flinkTaskAppMapper.updateByPrimaryKeySelective(pre);
} else {
log.error("realtimeTaskApps size > 1 , appid:{}", flinkTaskApp.getApplicationId());
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
}
consumer.commitSync();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.oppo.cloud.common.domain.flink.enums.DiagnosisFrom;
import com.oppo.cloud.flink.service.DiagnosisService;
import com.oppo.cloud.flink.service.ITaskSyncerMetaService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -33,6 +34,9 @@ public class TaskDetectScheduler {
@Autowired
DiagnosisService diagnosisService;

@Autowired
ITaskSyncerMetaService taskSyncerMetaService;

/**
* TODO: Custom schedule time
*/
Expand All @@ -57,7 +61,7 @@ public void DetectDaily() {
/**
* Hourly level scheduled diagnosis
*/
@Scheduled(cron = "0 0/1 * * * ?") // debug for 1 minutes
@Scheduled(cron = "0 0/1 * * * ?") // debug for 1 minutes
//@Scheduled(cron = "1 0 * * * ?")
public void detectHourly() {
log.info("Start executing scheduled diagnostic tasks");
Expand All @@ -69,4 +73,13 @@ public void detectHourly() {
diagnosisService.diagnosisAppHourly(start, end, DiagnosisFrom.JobUptime);
log.info("End execution of scheduled diagnostic tasks");
}


/**
* Hourly scheduled sync yarn metadata
*/
@Scheduled(cron = "* */5 * * * ?")
public void syncYarnMetadata() {
taskSyncerMetaService.syncer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2023 OPPO.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oppo.cloud.flink.domain;

import lombok.Data;

@Data
public class Properties {

private String key;
private String value;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 OPPO.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oppo.cloud.flink.domain;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.util.List;

@Data
public class YarnConfProperties {

@JsonProperty("properties")
private List<Properties> properties;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2023 OPPO.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oppo.cloud.flink.domain;

import lombok.Data;

@Data
public class YarnPathInfo {
/**
* fs.defaultFS
*/
private String defaultFS;

/**
* yarn.nodemanager.remote-app-log-dir
*/
private String remoteDir;

/**
* yarn.app.mapreduce.am.staging-dir
*/
private String mapreduceStagingDir;

/**
* mapreduce.jobhistory.done-dir
*/
private String mapreduceDoneDir;

/**
* mapreduce.jobhistory.intermediate-done-dir
*/
private String mapreduceIntermediateDoneDir;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 OPPO.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oppo.cloud.flink.service;

import java.util.List;
import java.util.Map;

/**
* YARN and Spark Cluster Address Configuration Information
*/
public interface IClusterConfigService {

/**
* Obtaining the Spark History Server List
*/
List<String> getSparkHistoryServers();

/**
* Obtaining the YARN Resource Manager List
*/
Map<String,String> getYarnClusters();

/**
* Updating Cluster Information
*/
void updateClusterConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 OPPO.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oppo.cloud.flink.service;

/**
* Synchronization of Cluster Task Metadata
*/
public interface ITaskSyncerMetaService {

/**
* Data Synchronization
*/
void syncer();
}
Loading

0 comments on commit d431fe7

Please sign in to comment.