Skip to content

Commit

Permalink
[Feature] Introduce log cleaner interface (apache#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
s7monk authored Jun 26, 2024
1 parent 5ee275c commit e9b49ab
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public R<String> getLogs() {
return R.succeed(jobService.getLogsByUserId(StpUtil.getLoginIdAsString()));
}

@SaIgnore
@GetMapping("/logs/clear")
public R<String> clearLogs() {
return R.succeed(jobService.clearLog(StpUtil.getLoginIdAsString()));
}

@SaCheckPermission("playground:job:stop")
@PostMapping("/stop")
public R<Void> stop(@RequestBody StopJobDTO stopJobDTO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,12 @@ public interface JobService extends IService<JobInfo> {
* @return log entries as a string
*/
String getLogsByUserId(String userId);

/**
* Clears the log associated with the specified user.
*
* @param userId the user's ID
* @return log entries as a string
*/
String clearLog(String userId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,18 @@ public void refreshJobStatus(String taskType) {

@Override
public String getLogsByUserId(String userId) {
String user = userId.toString();
if (LogReadPool.getInstance().exist(user)) {
return LogReadPool.getInstance().get(user).toString();
if (LogReadPool.getInstance().exist(userId)) {
return LogReadPool.getInstance().get(userId).toString();
} else {
return "";
}
}

@Override
public String clearLog(String userId) {
if (LogReadPool.getInstance().exist(userId)) {
LogReadPool.clear(userId);
return LogReadPool.getInstance().get(userId).toString();
} else {
return "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testGetLogs() throws Exception {
queryWrapper.eq("cluster_name", "test_cluster");
ClusterInfo one = clusterService.getOne(queryWrapper);
JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
jobSubmitDTO.setJobName("flink-job-test-get-job-status");
jobSubmitDTO.setJobName("flink-job-test-get-job-logs");
jobSubmitDTO.setTaskType("Flink");
jobSubmitDTO.setStreaming(true);
jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
Expand All @@ -295,11 +295,11 @@ public void testGetLogs() throws Exception {
assertEquals(200, r.getCode());

String logsResponseStr = getLogs();
R<String> getJobStatusRes =
R<String> getJobLogsRes =
ObjectMapperUtils.fromJSON(logsResponseStr, new TypeReference<R<String>>() {});
assertEquals(200, getJobStatusRes.getCode());
assertEquals(200, getJobLogsRes.getCode());

assertNotNull(getJobStatusRes.getData());
assertNotNull(getJobLogsRes.getData());
}

@Test
Expand Down Expand Up @@ -404,6 +404,32 @@ public void testExecutionMode() {
assertTrue(history.getStatements().contains("SET 'execution.runtime-mode' = 'streaming';"));
}

@Test
@Order(9)
public void testClearLogs() throws Exception {
QueryWrapper<ClusterInfo> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("cluster_name", "test_cluster");
ClusterInfo one = clusterService.getOne(queryWrapper);
JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
jobSubmitDTO.setJobName("flink-job-test-clear-job-logs");
jobSubmitDTO.setTaskType("Flink");
jobSubmitDTO.setStreaming(true);
jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
jobSubmitDTO.setStatements(StatementsConstant.selectStatement);

String responseString = submit(jobSubmitDTO);
R<JobVO> r = ObjectMapperUtils.fromJSON(responseString, new TypeReference<R<JobVO>>() {});
assertEquals(200, r.getCode());

String logsResponseStr = clearLogs();
R<String> getJobLogRes =
ObjectMapperUtils.fromJSON(logsResponseStr, new TypeReference<R<String>>() {});
assertEquals(200, getJobLogRes.getCode());

assertNotNull(getJobLogRes.getData());
assertEquals("Console:\n", getJobLogRes.getData());
}

private String submit(JobSubmitDTO jobSubmitDTO) throws Exception {
return mockMvc.perform(
MockMvcRequestBuilders.post(jobPath + "/submit")
Expand Down Expand Up @@ -444,6 +470,19 @@ private String getLogs() throws Exception {
.getContentAsString();
}

private String clearLogs() throws Exception {
return mockMvc.perform(
MockMvcRequestBuilders.get(jobPath + "/logs/clear")
.cookie(cookie)
.contentType(MediaType.APPLICATION_JSON_VALUE)
.accept(MediaType.APPLICATION_JSON_VALUE))
.andExpect(MockMvcResultMatchers.status().isOk())
.andDo(MockMvcResultHandlers.print())
.andReturn()
.getResponse()
.getContentAsString();
}

private void refreshJob() throws Exception {
mockMvc.perform(
MockMvcRequestBuilders.post(jobPath + "/refresh")
Expand Down
7 changes: 7 additions & 0 deletions paimon-web-server/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,10 @@ interceptor:
exclude:
path:
patterns: /api/login, /ui/**

websocket:
pool:
size: 1
heartbeat:
send: 30000
receive: 30000

0 comments on commit e9b49ab

Please sign in to comment.