diff --git a/flink-platform-alert/src/main/java/com/flink/platform/alert/AlertSendingService.java b/flink-platform-alert/src/main/java/com/flink/platform/alert/AlertSendingService.java index 532c7be8..79b20c84 100644 --- a/flink-platform-alert/src/main/java/com/flink/platform/alert/AlertSendingService.java +++ b/flink-platform-alert/src/main/java/com/flink/platform/alert/AlertSendingService.java @@ -14,7 +14,7 @@ import java.util.List; import static com.flink.platform.common.constants.Constant.EMPTY; -import static com.flink.platform.common.enums.ExecutionStatus.FAILURE; +import static com.flink.platform.common.enums.ExecutionStatus.ERROR; /** Alert sending service. */ @Slf4j @@ -45,7 +45,7 @@ public void sendAlerts(JobFlowRun jobFlowRun, String alertMsg) { public void sendErrAlerts(JobFlow jobFlow, String alertMag) { JobFlowRun jobFlowRun = jobFlowService.copyToJobFlowRun(jobFlow); - jobFlowRun.setStatus(FAILURE); + jobFlowRun.setStatus(ERROR); sendAlerts(jobFlowRun, alertMag); } } diff --git a/flink-platform-dao/src/main/java/com/flink/platform/dao/service/JobFlowService.java b/flink-platform-dao/src/main/java/com/flink/platform/dao/service/JobFlowService.java index 62b68280..b6a8b3ca 100644 --- a/flink-platform-dao/src/main/java/com/flink/platform/dao/service/JobFlowService.java +++ b/flink-platform-dao/src/main/java/com/flink/platform/dao/service/JobFlowService.java @@ -13,6 +13,7 @@ import com.flink.platform.dao.entity.JobInfo; import com.flink.platform.dao.entity.JobRunInfo; import com.flink.platform.dao.mapper.JobFlowMapper; +import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -21,6 +22,7 @@ import java.util.List; import static com.flink.platform.common.enums.JobFlowStatus.OFFLINE; +import static com.flink.platform.common.enums.JobFlowStatus.ONLINE; import static com.flink.platform.common.enums.JobFlowType.JOB_LIST; import static java.lang.String.format; import static java.util.stream.Collectors.toList; @@ -29,16 +31,14 @@ /** job config info. */ @Service @DS("master_platform") +@RequiredArgsConstructor(onConstructor_ = @Autowired) public class JobFlowService extends ServiceImpl { - @Autowired - private JobInfoService jobInfoService; + private final JobInfoService jobInfoService; - @Autowired - private JobFlowRunService jobFlowRunService; + private final JobFlowRunService jobFlowRunService; - @Autowired - private JobRunInfoService jobRunInfoService; + private final JobRunInfoService jobRunInfoService; @Transactional public JobFlow cloneJobFlow(long flowId) { @@ -165,4 +165,12 @@ public JobFlowRun copyToJobFlowRun(JobFlow jobFlow) { jobFlowRun.setAlerts(jobFlow.getAlerts()); return jobFlowRun; } + + public List getUnscheduledJobFlows() { + return list(new QueryWrapper() + .lambda() + .eq(JobFlow::getStatus, ONLINE) + .isNotNull(JobFlow::getCronExpr) + .ne(JobFlow::getCronExpr, "")); + } } diff --git a/flink-platform-monitor/src/main/java/com/flink/platform/monitor/UnscheduledJobFlowChecker.java b/flink-platform-monitor/src/main/java/com/flink/platform/monitor/UnscheduledJobFlowChecker.java new file mode 100644 index 00000000..a7f7b011 --- /dev/null +++ b/flink-platform-monitor/src/main/java/com/flink/platform/monitor/UnscheduledJobFlowChecker.java @@ -0,0 +1,43 @@ +package com.flink.platform.monitor; + +import com.flink.platform.alert.AlertSendingService; +import com.flink.platform.dao.entity.Worker; +import com.flink.platform.dao.service.JobFlowService; +import com.flink.platform.dao.service.WorkerService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import static com.flink.platform.common.enums.WorkerStatus.LEADER; + +@Slf4j +@Component +@RequiredArgsConstructor(onConstructor_ = @Autowired) +public class UnscheduledJobFlowChecker { + + private static final String ALERT_TEMPLATE = + "\\nWorkflow's crontab is set but not scheduled: \\nName: %s, Status: %s."; + + private final WorkerService workerService; + + private final JobFlowService jobFlowService; + + private final AlertSendingService alertSendingService; + + @Scheduled(initialDelay = 2 * 60 * 1000, fixedDelay = 60 * 60 * 1000) + public void checkUnscheduledWorkflow() { + Worker worker = workerService.getCurrentWorker(); + if (worker == null || !LEADER.equals(worker.getRole())) { + log.info("Current worker is not leader, skip checkUnscheduledWorkflow."); + return; + } + + jobFlowService.getUnscheduledJobFlows().forEach(jobFlow -> { + String content = String.format( + ALERT_TEMPLATE, jobFlow.getName(), jobFlow.getStatus().name()); + alertSendingService.sendErrAlerts(jobFlow, content); + }); + } +}