Skip to content

Commit 92a7beb

Browse files
committed
Pass an additional to the run check service to control if we want to run the rules.
1 parent d6a463f commit 92a7beb

19 files changed

+194
-15
lines changed

dqops/src/integration-test/java/com/dqops/postgresql/checks/comparison/PostgresqlTableComparisonIntegrationTests.java

+10
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.dqops.data.storage.ParquetPartitionStorageServiceObjectMother;
3939
import com.dqops.execution.ExecutionContext;
4040
import com.dqops.execution.checks.CheckExecutionSummary;
41+
import com.dqops.execution.checks.RunChecksTarget;
4142
import com.dqops.execution.checks.TableCheckExecutionServiceImpl;
4243
import com.dqops.execution.checks.TableCheckExecutionServiceObjectMother;
4344
import com.dqops.execution.checks.progress.CheckExecutionProgressListenerStub;
@@ -143,6 +144,7 @@ void profiling_whenComparingRowCountNoGroupingSameTable_thenValuesMatch() {
143144
false,
144145
new CheckExecutionProgressListenerStub(),
145146
false,
147+
RunChecksTarget.sensors_and_rules,
146148
JobCancellationToken.createDummyJobCancellationToken());
147149

148150
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),
@@ -182,6 +184,7 @@ void profiling_whenComparingColumnCountNoGroupingSameTable_thenValuesMatch() {
182184
false,
183185
new CheckExecutionProgressListenerStub(),
184186
false,
187+
RunChecksTarget.sensors_and_rules,
185188
JobCancellationToken.createDummyJobCancellationToken());
186189

187190
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),
@@ -224,6 +227,7 @@ void profiling_whenComparingRowCountGroupingOnBooleanAndComparingToSameTable_the
224227
false,
225228
new CheckExecutionProgressListenerStub(),
226229
false,
230+
RunChecksTarget.sensors_and_rules,
227231
JobCancellationToken.createDummyJobCancellationToken());
228232

229233
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),
@@ -259,6 +263,7 @@ void profiling_whenComparingColumnCountGroupingOnBooleanAndComparingToSameTable_
259263
false,
260264
new CheckExecutionProgressListenerStub(),
261265
false,
266+
RunChecksTarget.sensors_and_rules,
262267
JobCancellationToken.createDummyJobCancellationToken());
263268

264269
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),
@@ -293,6 +298,7 @@ void profiling_whenComparingRowCountGroupingOnBooleanAndComparingToSameTableButG
293298
false,
294299
new CheckExecutionProgressListenerStub(),
295300
false,
301+
RunChecksTarget.sensors_and_rules,
296302
JobCancellationToken.createDummyJobCancellationToken());
297303

298304
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),
@@ -332,6 +338,7 @@ void profiling_whenComparingRowCountGroupingAndGroupingUsesCalculatedColumnThatM
332338
false,
333339
new CheckExecutionProgressListenerStub(),
334340
false,
341+
RunChecksTarget.sensors_and_rules,
335342
JobCancellationToken.createDummyJobCancellationToken());
336343

337344
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),
@@ -367,6 +374,7 @@ void profiling_whenComparingRowCountGroupingOnNotMatchingColumns_thenValuesNotMa
367374
false,
368375
new CheckExecutionProgressListenerStub(),
369376
false,
377+
RunChecksTarget.sensors_and_rules,
370378
JobCancellationToken.createDummyJobCancellationToken());
371379

372380
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),
@@ -401,6 +409,7 @@ void profiling_whenComparingSumOnColumnNoGroupingSameTable_thenValuesMatch() {
401409
false,
402410
new CheckExecutionProgressListenerStub(),
403411
false,
412+
RunChecksTarget.sensors_and_rules,
404413
JobCancellationToken.createDummyJobCancellationToken());
405414

406415
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),
@@ -433,6 +442,7 @@ void partitionedDaily_whenComparingRowCountNoGroupingSameTable_thenValuesMatch()
433442
false,
434443
new CheckExecutionProgressListenerStub(),
435444
false,
445+
RunChecksTarget.sensors_and_rules,
436446
JobCancellationToken.createDummyJobCancellationToken());
437447

438448
Assertions.assertNull(checkExecutionSummary.getCheckExecutionErrorSummary(),

dqops/src/main/java/com/dqops/cli/commands/check/CheckRunCliCommand.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.dqops.cli.terminal.TerminalTableWritter;
3131
import com.dqops.execution.checks.CheckExecutionErrorSummary;
3232
import com.dqops.execution.checks.CheckExecutionSummary;
33+
import com.dqops.execution.checks.RunChecksTarget;
3334
import com.dqops.execution.checks.progress.CheckExecutionProgressListener;
3435
import com.dqops.execution.checks.progress.CheckExecutionProgressListenerProvider;
3536
import com.dqops.execution.checks.progress.CheckRunReportingMode;
@@ -119,6 +120,10 @@ public CheckRunCliCommand(TerminalFactory terminalFactory,
119120
@CommandLine.Option(names = {"-d", "--dummy"}, description = "Runs data quality check in a dummy mode, sensors are not executed on the target database, but the rest of the process is performed", defaultValue = "false")
120121
private boolean dummyRun;
121122

123+
124+
@CommandLine.Option(names = {"-tr", "--target"}, description = "Configures the data quality execution mode. The default option is to run both the sensor to collect metrics, and validate the results with data quality rules. Alternatively, it is possible to only run the sensors to collect metrics, or only run rules on existing data.", defaultValue = "sensors_and_rules")
125+
private RunChecksTarget executionTarget = RunChecksTarget.sensors_and_rules;
126+
122127
@CommandLine.Option(names = {"-ces", "--collect-error-samples"}, description = "Collects error samples for failed data quality checks", defaultValue = "false")
123128
private Boolean collectErrorSamples;
124129

@@ -333,6 +338,22 @@ public void setDummyRun(boolean dummyRun) {
333338
this.dummyRun = dummyRun;
334339
}
335340

341+
/**
342+
* Returns the target executor (sensor, rule or both) to run.
343+
* @return Mode of running checks.
344+
*/
345+
public RunChecksTarget getExecutionTarget() {
346+
return executionTarget;
347+
}
348+
349+
/**
350+
* Sets the mode of running the checks.
351+
* @param executionTarget Target.
352+
*/
353+
public void setExecutionTarget(RunChecksTarget executionTarget) {
354+
this.executionTarget = executionTarget;
355+
}
356+
336357
/**
337358
* Returns true if error samples should be collected while running checks.
338359
* @return Collect error samples.
@@ -403,7 +424,8 @@ public Integer call() throws Exception {
403424
filters.setLabels(this.labels);
404425

405426
CheckExecutionProgressListener progressListener = this.checkExecutionProgressListenerProvider.getProgressListener(this.mode, false);
406-
CheckExecutionSummary checkExecutionSummary = this.checkService.runChecks(filters, this.timeWindowFilterParameters, this.collectErrorSamples, progressListener, this.dummyRun);
427+
CheckExecutionSummary checkExecutionSummary = this.checkService.runChecks(filters, this.timeWindowFilterParameters, this.collectErrorSamples,
428+
progressListener, this.dummyRun, this.executionTarget);
407429

408430
if (checkExecutionSummary.getTotalChecksExecutedCount() == 0) {
409431
this.terminalFactory.getWriter().writeLine("No checks with these filters were found.");

dqops/src/main/java/com/dqops/cli/commands/check/impl/CheckCliService.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.dqops.cli.commands.check.impl.models.AllChecksModelCliPatchParameters;
1919
import com.dqops.data.errorsamples.factory.ErrorSamplesDataScope;
2020
import com.dqops.execution.checks.CheckExecutionSummary;
21+
import com.dqops.execution.checks.RunChecksTarget;
2122
import com.dqops.execution.checks.progress.CheckExecutionProgressListener;
2223
import com.dqops.execution.errorsampling.ErrorSamplerExecutionSummary;
2324
import com.dqops.execution.errorsampling.progress.ErrorSamplerExecutionProgressListener;
@@ -38,13 +39,15 @@ public interface CheckCliService {
3839
* @param collectErrorSamples Collect error samples for failed checks.
3940
* @param checkExecutionProgressListener Progress listener that will report the progress.
4041
* @param dummyRun Run the sensors in a dummy mode (sensors are not executed).
42+
* @param executionTarget Execution target (sensor, rule, both).
4143
* @return Check execution summary.
4244
*/
4345
CheckExecutionSummary runChecks(CheckSearchFilters checkSearchFilters,
4446
TimeWindowFilterParameters timeWindowFilterParameters,
4547
boolean collectErrorSamples,
4648
CheckExecutionProgressListener checkExecutionProgressListener,
47-
boolean dummyRun);
49+
boolean dummyRun,
50+
RunChecksTarget executionTarget);
4851

4952
/**
5053
* Collects error samples for data quality checks identified by filters.

dqops/src/main/java/com/dqops/cli/commands/check/impl/CheckCliServiceImpl.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.dqops.core.principal.DqoUserPrincipal;
2121
import com.dqops.data.errorsamples.factory.ErrorSamplesDataScope;
2222
import com.dqops.execution.checks.CheckExecutionSummary;
23+
import com.dqops.execution.checks.RunChecksTarget;
2324
import com.dqops.execution.checks.progress.CheckExecutionProgressListener;
2425
import com.dqops.execution.errorsampling.ErrorSamplerExecutionSummary;
2526
import com.dqops.execution.errorsampling.progress.ErrorSamplerExecutionProgressListener;
@@ -68,16 +69,19 @@ public CheckCliServiceImpl(CheckService checkService,
6869
* @param collectErrorSamples Collect error samples for failed checks.
6970
* @param checkExecutionProgressListener Progress listener that will report the progress.
7071
* @param dummyRun Run the sensors in a dummy mode (sensors are not executed).
72+
* @param executionTarget Execution target (sensor, rule, both).
7173
* @return Check execution summary.
7274
*/
7375
@Override
7476
public CheckExecutionSummary runChecks(CheckSearchFilters checkSearchFilters,
7577
TimeWindowFilterParameters timeWindowFilterParameters,
7678
boolean collectErrorSamples,
7779
CheckExecutionProgressListener checkExecutionProgressListener,
78-
boolean dummyRun) {
80+
boolean dummyRun,
81+
RunChecksTarget executionTarget) {
7982
DqoUserPrincipal principal = this.apiKeyPrincipalProvider.getLocalUserPrincipal();
80-
return this.checkService.runChecks(checkSearchFilters, timeWindowFilterParameters, collectErrorSamples, checkExecutionProgressListener, dummyRun, principal);
83+
return this.checkService.runChecks(checkSearchFilters, timeWindowFilterParameters, collectErrorSamples,
84+
checkExecutionProgressListener, dummyRun, executionTarget, principal);
8185
}
8286

8387
/**

dqops/src/main/java/com/dqops/cli/commands/run/RunCliCommand.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.dqops.execution.checks.progress.CheckRunReportingMode;
2828
import com.dqops.utils.datetime.DurationParseUtility;
2929
import com.dqops.utils.datetime.InvalidDurationFormatException;
30-
import io.swagger.models.auth.In;
3130
import lombok.extern.slf4j.Slf4j;
3231
import org.springframework.beans.factory.annotation.Autowired;
3332
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -53,6 +52,7 @@ public class RunCliCommand extends BaseCommand implements ICommand {
5352
private TerminalReader terminalReader;
5453
private TerminalWriter terminalWriter;
5554
private DqoSchedulerConfigurationProperties dqoSchedulerConfigurationProperties;
55+
private RootConfigurationProperties rootConfigurationProperties;
5656

5757
public RunCliCommand() {
5858
}
@@ -86,7 +86,6 @@ public RunCliCommand(JobSchedulerService jobSchedulerService,
8686

8787
@CommandLine.Option(names = {"-m", "--mode"}, description = "Check execution reporting mode (silent, summary, info, debug)", defaultValue = "silent")
8888
private CheckRunReportingMode checkRunMode = CheckRunReportingMode.silent;
89-
private RootConfigurationProperties rootConfigurationProperties;
9089

9190
@CommandLine.Option(names = {"-t", "--time-limit"}, description = "Optional execution time limit. DQOps will run for the given duration and gracefully shut down. " +
9291
"Supported values are in the following format: 300s (300 seconds), 10m (10 minutes), 2h (run for up to 2 hours) or just a number that is the time limit in seconds.")

dqops/src/main/java/com/dqops/execution/checks/CheckExecutionService.java

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public interface CheckExecutionService {
3737
* @param collectErrorSamples Collect error samples for failed checks. Can disable or enable sample collection independent of other parameters.
3838
* @param progressListener Progress listener that receives progress calls.
3939
* @param dummySensorExecution When true, the sensor is not executed and dummy results are returned. Dummy run will report progress and show a rendered template, but will not touch the target system.
40+
* @param executionTarget Check execution mode (sensors, rules, or both).
4041
* @param startChildJobsPerTable True - starts parallel jobs per table, false - runs all checks without starting additional jobs.
4142
* @param parentJobId Parent job id.
4243
* @param jobCancellationToken Job cancellation token.
@@ -49,6 +50,7 @@ CheckExecutionSummary executeChecks(ExecutionContext executionContext,
4950
Boolean collectErrorSamples,
5051
CheckExecutionProgressListener progressListener,
5152
boolean dummySensorExecution,
53+
RunChecksTarget executionTarget,
5254
boolean startChildJobsPerTable,
5355
DqoQueueJobId parentJobId,
5456
JobCancellationToken jobCancellationToken,
@@ -82,6 +84,7 @@ CheckExecutionSummary executeChecksForSchedule(ExecutionContext executionContext
8284
* @param collectErrorSamples Collect error samples for failed checks.
8385
* @param progressListener Progress listener that receives progress calls.
8486
* @param dummySensorExecution When true, the sensor is not executed and dummy results are returned. Dummy run will report progress and show a rendered template, but will not touch the target system.
87+
* @param executionTarget Check execution mode (sensors, rules, or both).
8588
* @param jobCancellationToken Job cancellation token.
8689
* @return Check summary table with the count of alerts, checks and rules for each table, but containing only one row for the target table. The result may be empty if the table was not found.
8790
*/
@@ -93,5 +96,6 @@ CheckExecutionSummary executeSelectedChecksOnTable(ExecutionContext executionCon
9396
Boolean collectErrorSamples,
9497
CheckExecutionProgressListener progressListener,
9598
boolean dummySensorExecution,
99+
RunChecksTarget executionTarget,
96100
JobCancellationToken jobCancellationToken);
97101
}

dqops/src/main/java/com/dqops/execution/checks/CheckExecutionServiceImpl.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public CheckExecutionServiceImpl(HierarchyNodeTreeSearcher hierarchyNodeTreeSear
7777
* @param collectErrorSamples Collect error samples for failed checks. Can disable or enable sample collection independent of other parameters.
7878
* @param progressListener Progress listener that receives progress calls.
7979
* @param dummySensorExecution When true, the sensor is not executed and dummy results are returned. Dummy run will report progress and show a rendered template, but will not touch the target system.
80+
* @param executionTarget Check execution mode (sensors, rules, or both).
8081
* @param startChildJobsPerTable True - starts parallel jobs per table, false - runs all checks without starting additional jobs.
8182
* @param parentJobId Parent job id for the parent job.
8283
* @param jobCancellationToken Job cancellation token.
@@ -90,6 +91,7 @@ public CheckExecutionSummary executeChecks(ExecutionContext executionContext,
9091
Boolean collectErrorSamples,
9192
CheckExecutionProgressListener progressListener,
9293
boolean dummySensorExecution,
94+
RunChecksTarget executionTarget,
9395
boolean startChildJobsPerTable,
9496
DqoQueueJobId parentJobId,
9597
JobCancellationToken jobCancellationToken,
@@ -114,6 +116,7 @@ public CheckExecutionSummary executeChecks(ExecutionContext executionContext,
114116
setProgressListener(progressListener);
115117
setDummyExecution(dummySensorExecution);
116118
setCollectErrorSamples(collectErrorSamples);
119+
setExecutionTarget(executionTarget);
117120
}};
118121
RunChecksOnTableQueueJob runChecksOnTableJob = this.dqoQueueJobFactory.createRunChecksOnTableJob();
119122
runChecksOnTableJob.setParameters(runChecksOnTableParameters);
@@ -130,7 +133,7 @@ public CheckExecutionSummary executeChecks(ExecutionContext executionContext,
130133
CheckExecutionSummary tableCheckExecutionSummary = this.tableCheckExecutionService.executeChecksOnTable(
131134
executionContext, userHome, connectionWrapper, targetTable,
132135
checkSearchFilters, userTimeWindowFilters, collectErrorSamples, progressListener,
133-
dummySensorExecution, jobCancellationToken);
136+
dummySensorExecution, executionTarget, jobCancellationToken);
134137
checkExecutionSummary.append(tableCheckExecutionSummary);
135138
}
136139
}
@@ -180,6 +183,7 @@ public CheckExecutionSummary executeChecksForSchedule(ExecutionContext execution
180183
setCheckSearchFilters(checkSearchFilters);
181184
setTimeWindowFilter(null);
182185
setProgressListener(progressListener);
186+
setExecutionTarget(RunChecksTarget.sensors_and_rules);
183187
}};
184188
RunChecksOnTableQueueJob runChecksOnTableJob = this.dqoQueueJobFactory.createRunChecksOnTableJob();
185189
runChecksOnTableJob.setParameters(runChecksOnTableParameters);
@@ -206,6 +210,7 @@ public CheckExecutionSummary executeChecksForSchedule(ExecutionContext execution
206210
* @param collectErrorSamples Collect error samples for failed checks.
207211
* @param progressListener Progress listener that receives progress calls.
208212
* @param dummySensorExecution When true, the sensor is not executed and dummy results are returned. Dummy run will report progress and show a rendered template, but will not touch the target system.
213+
* @param executionTarget Check execution mode (sensors, rules, or both).
209214
* @param jobCancellationToken Job cancellation token.
210215
* @return Check summary table with the count of alerts, checks and rules for each table, but containing only one row for the target table. The result may be empty if the table was not found.
211216
*/
@@ -218,6 +223,7 @@ public CheckExecutionSummary executeSelectedChecksOnTable(ExecutionContext execu
218223
Boolean collectErrorSamples,
219224
CheckExecutionProgressListener progressListener,
220225
boolean dummySensorExecution,
226+
RunChecksTarget executionTarget,
221227
JobCancellationToken jobCancellationToken) {
222228
UserHome userHome = executionContext.getUserHomeContext().getUserHome();
223229

@@ -234,7 +240,7 @@ public CheckExecutionSummary executeSelectedChecksOnTable(ExecutionContext execu
234240
CheckExecutionSummary checkExecutionSummary = this.tableCheckExecutionService.executeChecksOnTable(
235241
executionContext, userHome, connectionWrapper, tableWrapper,
236242
checkSearchFilters, userTimeWindowFilters, collectErrorSamples, progressListener,
237-
dummySensorExecution, jobCancellationToken);
243+
dummySensorExecution, executionTarget, jobCancellationToken);
238244

239245
return checkExecutionSummary;
240246
}

0 commit comments

Comments
 (0)