Skip to content

Commit

Permalink
feat: Alert on unscheduled workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
itinycheng committed Dec 2, 2024
1 parent 802a59a commit e7b6788
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.List;

import static com.flink.platform.common.constants.Constant.EMPTY;
import static com.flink.platform.common.enums.ExecutionStatus.FAILURE;
import static com.flink.platform.common.enums.ExecutionStatus.ERROR;

/** Alert sending service. */
@Slf4j
Expand Down Expand Up @@ -45,7 +45,7 @@ public void sendAlerts(JobFlowRun jobFlowRun, String alertMsg) {

public void sendErrAlerts(JobFlow jobFlow, String alertMag) {
JobFlowRun jobFlowRun = jobFlowService.copyToJobFlowRun(jobFlow);
jobFlowRun.setStatus(FAILURE);
jobFlowRun.setStatus(ERROR);
sendAlerts(jobFlowRun, alertMag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.flink.platform.dao.entity.JobInfo;
import com.flink.platform.dao.entity.JobRunInfo;
import com.flink.platform.dao.mapper.JobFlowMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -21,6 +22,7 @@
import java.util.List;

import static com.flink.platform.common.enums.JobFlowStatus.OFFLINE;
import static com.flink.platform.common.enums.JobFlowStatus.ONLINE;
import static com.flink.platform.common.enums.JobFlowType.JOB_LIST;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
Expand All @@ -29,16 +31,14 @@
/** job config info. */
@Service
@DS("master_platform")
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class JobFlowService extends ServiceImpl<JobFlowMapper, JobFlow> {

@Autowired
private JobInfoService jobInfoService;
private final JobInfoService jobInfoService;

@Autowired
private JobFlowRunService jobFlowRunService;
private final JobFlowRunService jobFlowRunService;

@Autowired
private JobRunInfoService jobRunInfoService;
private final JobRunInfoService jobRunInfoService;

@Transactional
public JobFlow cloneJobFlow(long flowId) {
Expand Down Expand Up @@ -165,4 +165,12 @@ public JobFlowRun copyToJobFlowRun(JobFlow jobFlow) {
jobFlowRun.setAlerts(jobFlow.getAlerts());
return jobFlowRun;
}

public List<JobFlow> getUnscheduledJobFlows() {
return list(new QueryWrapper<JobFlow>()
.lambda()
.eq(JobFlow::getStatus, ONLINE)
.isNotNull(JobFlow::getCronExpr)
.ne(JobFlow::getCronExpr, ""));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.flink.platform.monitor;

import com.flink.platform.alert.AlertSendingService;
import com.flink.platform.dao.entity.Worker;
import com.flink.platform.dao.service.JobFlowService;
import com.flink.platform.dao.service.WorkerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import static com.flink.platform.common.enums.WorkerStatus.LEADER;

@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class UnscheduledJobFlowChecker {

private static final String ALERT_TEMPLATE =
"\\nWorkflow's crontab is set but not scheduled: \\nName: %s, Status: %s.";

private final WorkerService workerService;

private final JobFlowService jobFlowService;

private final AlertSendingService alertSendingService;

@Scheduled(initialDelay = 2 * 60 * 1000, fixedDelay = 60 * 60 * 1000)
public void checkUnscheduledWorkflow() {
Worker worker = workerService.getCurrentWorker();
if (worker == null || !LEADER.equals(worker.getRole())) {
log.info("Current worker is not leader, skip checkUnscheduledWorkflow.");
return;
}

jobFlowService.getUnscheduledJobFlows().forEach(jobFlow -> {
String content = String.format(
ALERT_TEMPLATE, jobFlow.getName(), jobFlow.getStatus().name());
alertSendingService.sendErrAlerts(jobFlow, content);
});
}
}

0 comments on commit e7b6788

Please sign in to comment.