Skip to content

Commit 06b28df

Browse files
committed
Table controller supports paging and filtering table names, and returns the most recent table data quality status.
1 parent 19b590a commit 06b28df

18 files changed

+421
-34
lines changed

dqops/src/main/java/com/dqops/core/filesystem/localfiles/LocalFileStorageServiceImpl.java

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.nio.file.Path;
3535
import java.nio.file.attribute.FileTime;
3636
import java.time.Instant;
37+
import java.util.Comparator;
3738
import java.util.List;
3839
import java.util.stream.Collectors;
3940
import java.util.stream.Stream;
@@ -295,6 +296,7 @@ public List<HomeFolderPath> listFoldersDirect(HomeFolderPath folderPath, Path ab
295296
try (Stream<Path> fileList = Files.list(absolutePath)) {
296297
List<HomeFolderPath> folders = fileList
297298
.filter(path -> Files.isDirectory(path))
299+
.sorted()
298300
.map(path -> {
299301
String fileSystemName = path.getFileName().toString();
300302
FolderName subFolder = FolderName.fromFileSystemName(fileSystemName);
@@ -339,6 +341,7 @@ public List<HomeFilePath> listFilesDirect(HomeFolderPath folderPath, Path absolu
339341
try (Stream<Path> pathStream = Files.list(absolutePath)) {
340342
List<HomeFilePath> filePathsList = pathStream
341343
.filter(path -> !Files.isDirectory(path))
344+
.sorted()
342345
.map(path -> {
343346
String fileSystemName = path.getFileName().toString();
344347
return folderPath.resolveFile(fileSystemName);

dqops/src/main/java/com/dqops/data/checkresults/models/currentstatus/ColumnCurrentDataQualityStatusModel.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.time.ZoneId;
3636
import java.time.temporal.ChronoUnit;
3737
import java.util.*;
38+
import java.util.function.Predicate;
3839

3940
/**
4041
* The column validity status. It is a summary of the results of the most recently executed data quality checks on the column.
@@ -189,10 +190,10 @@ protected ColumnCurrentDataQualityStatusModel clone() {
189190
/**
190191
* Creates a deep clone of the table status model, preserving only the checks for an expected check type.
191192
* All scores and the data quality KPI is recalculated for the checks that left.
192-
* @param checkType Data quality check type to copy, the results of the other check types are ignored.
193+
* @param checkFilter Check filter that filters the checks that should be preserved.
193194
* @return A deep clone of the object with results only for that check type.
194195
*/
195-
public ColumnCurrentDataQualityStatusModel cloneFilteredByCheckType(CheckType checkType) {
196+
public ColumnCurrentDataQualityStatusModel cloneFilteredByCheckType(Predicate<CheckCurrentDataQualityStatusModel> checkFilter) {
196197
ColumnCurrentDataQualityStatusModel tableStatusClone = this.clone();
197198
tableStatusClone.currentSeverity = null;
198199
tableStatusClone.highestHistoricalSeverity = null;
@@ -207,7 +208,7 @@ public ColumnCurrentDataQualityStatusModel cloneFilteredByCheckType(CheckType ch
207208
tableStatusClone.checks = new LinkedHashMap<>();
208209

209210
for (Map.Entry<String, CheckCurrentDataQualityStatusModel> keyValue : this.checks.entrySet()) {
210-
if (keyValue.getValue().getCheckType() == checkType) {
211+
if (checkFilter == null || checkFilter.test(keyValue.getValue())) {
211212
tableStatusClone.checks.put(keyValue.getKey(), keyValue.getValue());
212213
}
213214
}

dqops/src/main/java/com/dqops/data/checkresults/models/currentstatus/TableCurrentDataQualityStatusModel.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.time.Instant;
3535
import java.time.temporal.ChronoUnit;
3636
import java.util.*;
37+
import java.util.function.Function;
38+
import java.util.function.Predicate;
3739
import java.util.stream.Collectors;
3840
import java.util.stream.Stream;
3941

@@ -308,11 +310,12 @@ protected TableCurrentDataQualityStatusModel clone() {
308310
/**
309311
* Creates a deep clone of the table status model, preserving only the checks for an expected check type.
310312
* All scores and the data quality KPI is recalculated for the checks that left.
311-
* @param checkType Data quality check type to copy, the results of the other check types are ignored.
312-
* @param includeColumns Include columns. When this parameter is false, the dictionary of columns is removed.
313+
* @param checkFilter Check filter that filters the checks that should be preserved.
314+
* @param includeColumnsAndChecks Include columns and checks. When this parameter is false, the dictionary of columns is removed.
313315
* @return A deep clone of the object with results only for that check type.
314316
*/
315-
public TableCurrentDataQualityStatusModel cloneFilteredByCheckType(CheckType checkType, boolean includeColumns) {
317+
public TableCurrentDataQualityStatusModel cloneFilteredByCheckType(
318+
Predicate<CheckCurrentDataQualityStatusModel> checkFilter, boolean includeColumnsAndChecks) {
316319
TableCurrentDataQualityStatusModel tableStatusClone = this.clone();
317320
tableStatusClone.currentSeverity = null;
318321
tableStatusClone.highestHistoricalSeverity = null;
@@ -321,14 +324,14 @@ public TableCurrentDataQualityStatusModel cloneFilteredByCheckType(CheckType che
321324
tableStatusClone.checks = new LinkedHashMap<>();
322325

323326
for (Map.Entry<String, CheckCurrentDataQualityStatusModel> keyValue : this.checks.entrySet()) {
324-
if (keyValue.getValue().getCheckType() == checkType) {
327+
if (checkFilter == null || checkFilter.test(keyValue.getValue())) {
325328
tableStatusClone.checks.put(keyValue.getKey(), keyValue.getValue());
326329
}
327330
}
328331

329332
tableStatusClone.columns = new LinkedHashMap<>();
330333
for (Map.Entry<String, ColumnCurrentDataQualityStatusModel> columnKeyValue : this.columns.entrySet()) {
331-
ColumnCurrentDataQualityStatusModel columnModelFiltered = columnKeyValue.getValue().cloneFilteredByCheckType(checkType);
334+
ColumnCurrentDataQualityStatusModel columnModelFiltered = columnKeyValue.getValue().cloneFilteredByCheckType(checkFilter);
332335
if (!columnModelFiltered.getChecks().isEmpty()) {
333336
tableStatusClone.columns.put(columnKeyValue.getKey(), columnModelFiltered);
334337
}
@@ -339,7 +342,8 @@ public TableCurrentDataQualityStatusModel cloneFilteredByCheckType(CheckType che
339342
tableStatusClone.calculateDataQualityKpiScore();
340343
tableStatusClone.calculateStatusesForDataQualityDimensions();
341344

342-
if (!includeColumns) {
345+
if (!includeColumnsAndChecks) {
346+
tableStatusClone.checks = null; // detaching checks
343347
tableStatusClone.columns = null; // detaching columns
344348
}
345349

dqops/src/main/java/com/dqops/data/checkresults/statuscache/CurrentTableStatusCacheEntry.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class CurrentTableStatusCacheEntry {
2727
private final Object lock = new Object();
2828
private CurrentTableStatusEntryStatus status;
2929
private TableCurrentDataQualityStatusModel allCheckTypesWithColumns;
30+
private TableCurrentDataQualityStatusModel monitoringAndPartitionedTableOnly;
3031
private TableCurrentDataQualityStatusModel profilingTableOnly;
3132
private TableCurrentDataQualityStatusModel monitoringTableOnly;
3233
private TableCurrentDataQualityStatusModel partitionedTableOnly;
@@ -77,6 +78,14 @@ public TableCurrentDataQualityStatusModel getAllCheckTypesWithColumns() {
7778
return allCheckTypesWithColumns;
7879
}
7980

81+
/**
82+
* Returns a table data quality status that covers only monitoring and partitioned checks, but excludes the profiling status.
83+
* @return The quality status for monitoring and partitioned checks only.
84+
*/
85+
public TableCurrentDataQualityStatusModel getMonitoringAndPartitionedTableOnly() {
86+
return monitoringAndPartitionedTableOnly;
87+
}
88+
8089
/**
8190
* Returns the status for the table related only to profiling checks. The status does not contain a list of column.
8291
* @return The status of the table, for profiling checks only.
@@ -103,15 +112,21 @@ public TableCurrentDataQualityStatusModel getPartitionedTableOnly() {
103112

104113
/**
105114
* Sets the current model with a detailed status for all check types. If it is not empty, the status is also changed to loaded.
106-
* @param allCheckTypesWithColumns Status model.
115+
* @param allCheckTypesWithColumns Status model with all checks and columns.
116+
* @param monitoringAndPartitionedTableOnly Status model with monitoring and partitioned checks, without columns.
117+
* @param profilingTableOnly Status model with only profiling checks and no columns.
118+
* @param monitoringTableOnly Status model with only monitoring checks and no columns.
119+
* @param partitionedTableOnly Status model with only partitioned checks and no columns.
107120
*/
108121
public void setStatusModels(
109122
TableCurrentDataQualityStatusModel allCheckTypesWithColumns,
123+
TableCurrentDataQualityStatusModel monitoringAndPartitionedTableOnly,
110124
TableCurrentDataQualityStatusModel profilingTableOnly,
111125
TableCurrentDataQualityStatusModel monitoringTableOnly,
112126
TableCurrentDataQualityStatusModel partitionedTableOnly) {
113127
synchronized (this.lock) {
114128
this.allCheckTypesWithColumns = allCheckTypesWithColumns;
129+
this.monitoringAndPartitionedTableOnly = monitoringAndPartitionedTableOnly;
115130
this.profilingTableOnly = profilingTableOnly;
116131
this.monitoringTableOnly = monitoringTableOnly;
117132
this.partitionedTableOnly = partitionedTableOnly;

dqops/src/main/java/com/dqops/data/checkresults/statuscache/TableStatusCache.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,19 @@ public interface TableStatusCache {
3131
*
3232
* @param tableStatusKey Table status key.
3333
* @param checkType Check type. When a check type is given, the operation returns the status model only for one check type. The returned model does not contain columns.
34-
* When the <code>checkType</code> is null, this method returns a full model for all checks, including all columns.
34+
* When the <code>checkType</code> is null, this method returns a model for monitoring and partitioned checks combined.
3535
* @return Table status model or null when it is not yet loaded.
3636
*/
3737
TableCurrentDataQualityStatusModel getCurrentTableStatus(CurrentTableStatusKey tableStatusKey, CheckType checkType);
3838

39+
/**
40+
* Retrieves the current table status for a requested table including all columns.
41+
*
42+
* @param tableStatusKey Table status key.
43+
* @return Table status model or null when it is not yet loaded.
44+
*/
45+
TableCurrentDataQualityStatusModel getCurrentTableStatusWithColumns(CurrentTableStatusKey tableStatusKey);
46+
3947
/**
4048
* Notifies the table status cache that the table result were updated and should be invalidated.
4149
*
@@ -47,9 +55,10 @@ public interface TableStatusCache {
4755

4856
/**
4957
* Returns a future that is completed when there are no queued table status reload operations.
58+
* @param waitTimeoutMilliseconds Optional timeout to wait for the completion of the future. If the timeout elapses, the future is completed with a value <code>false</code>.
5059
* @return Future that is completed when the status of all requested tables was loaded.
5160
*/
52-
CompletableFuture<Integer> getQueueEmptyFuture();
61+
CompletableFuture<Boolean> getQueueEmptyFuture(Long waitTimeoutMilliseconds);
5362

5463
/**
5564
* Starts a service that loads table statuses of requested tables.

dqops/src/main/java/com/dqops/data/checkresults/statuscache/TableStatusCacheImpl.java

+35-8
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,30 @@ protected CurrentTableStatusCacheEntry loadEntryCore(CurrentTableStatusKey table
101101
return currentTableStatusCacheEntry;
102102
}
103103

104+
/**
105+
* Retrieves the current table status for a requested table including all columns.
106+
*
107+
* @param tableStatusKey Table status key.
108+
* @return Table status model or null when it is not yet loaded.
109+
*/
110+
@Override
111+
public TableCurrentDataQualityStatusModel getCurrentTableStatusWithColumns(CurrentTableStatusKey tableStatusKey) {
112+
CurrentTableStatusCacheEntry currentTableStatusCacheEntry = this.tableStatusCache.get(tableStatusKey, this::loadEntryCore);
113+
return currentTableStatusCacheEntry.getAllCheckTypesWithColumns();
114+
}
115+
104116
/**
105117
* Retrieves the current table status for a requested table.
106118
* @param tableStatusKey Table status key.
107119
* @param checkType Check type. When a check type is given, the operation returns the status model only for one check type. The returned model does not contain columns.
108-
* When the <code>checkType</code> is null, this method returns a full model for all checks, including all columns.
120+
* When the <code>checkType</code> is null, this method returns a model for monitoring and partitioned checks combined.
109121
* @return Table status model or null when it is not yet loaded.
110122
*/
111123
@Override
112124
public TableCurrentDataQualityStatusModel getCurrentTableStatus(CurrentTableStatusKey tableStatusKey, CheckType checkType) {
113125
CurrentTableStatusCacheEntry currentTableStatusCacheEntry = this.tableStatusCache.get(tableStatusKey, this::loadEntryCore);
114126
if (checkType == null) {
115-
return currentTableStatusCacheEntry.getAllCheckTypesWithColumns();
127+
return currentTableStatusCacheEntry.getMonitoringAndPartitionedTableOnly();
116128
}
117129

118130
switch (checkType) {
@@ -153,12 +165,20 @@ public void invalidateTableStatus(CurrentTableStatusKey tableStatusKey, boolean
153165

154166
/**
155167
* Returns a future that is completed when there are no queued table status reload operations.
168+
* @param waitTimeoutMilliseconds Optional timeout to wait for the completion of the future. If the timeout elapses, the future is completed with a value <code>false</code>.
156169
* @return Future that is completed when the status of all requested tables was loaded.
157170
*/
158171
@Override
159-
public CompletableFuture<Integer> getQueueEmptyFuture() {
172+
public CompletableFuture<Boolean> getQueueEmptyFuture(Long waitTimeoutMilliseconds) {
160173
synchronized (this.lock) {
161-
return this.queueEmptyFuture;
174+
CompletableFuture<Boolean> booleanCompletableFuture = this.queueEmptyFuture
175+
.thenApply(result -> true);
176+
177+
if (waitTimeoutMilliseconds != null) {
178+
booleanCompletableFuture = booleanCompletableFuture.completeOnTimeout(false, waitTimeoutMilliseconds, TimeUnit.MILLISECONDS);
179+
}
180+
181+
return booleanCompletableFuture;
162182
}
163183
}
164184

@@ -210,11 +230,18 @@ public void onRequestLoadTableStatus(CurrentTableStatusKey tableStatusKey) {
210230

211231
TableCurrentDataQualityStatusModel fullStatusModel =
212232
this.checkResultsDataService.analyzeTableMostRecentQualityStatus(filterParameters, userDomainIdentity);
213-
TableCurrentDataQualityStatusModel profilingStatus = fullStatusModel.cloneFilteredByCheckType(CheckType.profiling, false);
214-
TableCurrentDataQualityStatusModel monitoringStatus = fullStatusModel.cloneFilteredByCheckType(CheckType.monitoring, false);
215-
TableCurrentDataQualityStatusModel partitionedStatus = fullStatusModel.cloneFilteredByCheckType(CheckType.partitioned, false);
216233

217-
currentTableStatusCacheEntry.setStatusModels(fullStatusModel, profilingStatus, monitoringStatus, partitionedStatus); // also sets the status
234+
TableCurrentDataQualityStatusModel monitoringAndPartitionedStatus = fullStatusModel.cloneFilteredByCheckType(
235+
checkResultsModel -> checkResultsModel.getCheckType() == CheckType.monitoring || checkResultsModel.getCheckType() == CheckType.partitioned, false);
236+
TableCurrentDataQualityStatusModel profilingStatus = fullStatusModel.cloneFilteredByCheckType(
237+
checkResultsModel -> checkResultsModel.getCheckType() == CheckType.profiling, false);
238+
TableCurrentDataQualityStatusModel monitoringStatus = fullStatusModel.cloneFilteredByCheckType(
239+
checkResultsModel -> checkResultsModel.getCheckType() == CheckType.monitoring, false);
240+
TableCurrentDataQualityStatusModel partitionedStatus = fullStatusModel.cloneFilteredByCheckType(
241+
checkResultsModel -> checkResultsModel.getCheckType() == CheckType.partitioned, false);
242+
243+
currentTableStatusCacheEntry.setStatusModels(fullStatusModel, monitoringAndPartitionedStatus,
244+
profilingStatus, monitoringStatus, partitionedStatus); // also sets the status
218245
}
219246
catch (Exception ex) {
220247
currentTableStatusCacheEntry.setStatus(CurrentTableStatusEntryStatus.LOADED);

dqops/src/main/java/com/dqops/metadata/search/CheckSearchFiltersVisitor.java

+4
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,10 @@ public TreeNodeTraversalResult accept(AbstractCheckSpec<?,?,?,?> abstractCheckSp
358358

359359
parameter.getNodes().add(abstractCheckSpec);
360360

361+
if (this.filters.getMaxResults() != null && parameter.getNodes().size() >= this.filters.getMaxResults()) {
362+
return TreeNodeTraversalResult.STOP_TRAVERSAL;
363+
}
364+
361365
return TreeNodeTraversalResult.SKIP_CHILDREN; // no need to search any deeper, we have found what we were looking for
362366
}
363367

dqops/src/main/java/com/dqops/metadata/search/ColumnSearchFilters.java

+19
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public class ColumnSearchFilters {
5555
"The labels are assigned on the labels screen and stored in the *labels* node in the *.dqotable.yaml* file.")
5656
private String[] labels;
5757

58+
@JsonPropertyDescription("Optional limit for the maximum number of results to return.")
59+
private Integer maxResults;
60+
5861
@JsonIgnore
5962
private SearchPattern connectionNameSearchPattern;
6063
@JsonIgnore
@@ -204,6 +207,22 @@ public void setLabels(String[] labels) {
204207
this.labels = labels;
205208
}
206209

210+
/**
211+
* Sets the limit for the maximum number of results to return.
212+
* @return Limit or null.
213+
*/
214+
public Integer getMaxResults() {
215+
return maxResults;
216+
}
217+
218+
/**
219+
* Sets the limit for the maximum number of results to return.
220+
* @param maxResults New limit of rows or null to disable limiting the results.
221+
*/
222+
public void setMaxResults(Integer maxResults) {
223+
this.maxResults = maxResults;
224+
}
225+
207226
/**
208227
* Returns the {@link SearchPattern} related to <code>connectionName</code>.
209228
* Lazy getter, parses <code>connectionName</code> as a search pattern and returns parsed object.

dqops/src/main/java/com/dqops/metadata/search/ColumnSearchFiltersVisitor.java

+10
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,22 @@ public TreeNodeTraversalResult accept(ColumnSpec columnSpec, SearchParameterObje
275275
String columnNameFilter = this.filters.getColumnName();
276276
if (Strings.isNullOrEmpty(columnNameFilter)) {
277277
parameter.getNodes().add(columnSpec);
278+
279+
if (this.filters.getMaxResults() != null && parameter.getNodes().size() >= this.filters.getMaxResults()) {
280+
return TreeNodeTraversalResult.STOP_TRAVERSAL;
281+
}
282+
278283
return TreeNodeTraversalResult.SKIP_CHILDREN;
279284
}
280285

281286
String columnName = columnSpec.getHierarchyId().getLast().toString();
282287
if (StringPatternComparer.matchSearchPattern(columnName, columnNameFilter)) {
283288
parameter.getNodes().add(columnSpec);
289+
290+
if (this.filters.getMaxResults() != null && parameter.getNodes().size() >= this.filters.getMaxResults()) {
291+
return TreeNodeTraversalResult.STOP_TRAVERSAL;
292+
}
293+
284294
return TreeNodeTraversalResult.SKIP_CHILDREN;
285295
}
286296

dqops/src/main/java/com/dqops/metadata/search/StatisticsCollectorSearchFiltersVisitor.java

+4
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,10 @@ public TreeNodeTraversalResult accept(AbstractStatisticsCollectorSpec<?> abstrac
362362

363363
parameter.getNodes().add(abstractStatisticsCollectorSpec);
364364

365+
if (this.filters.getMaxResults() != null && parameter.getNodes().size() >= this.filters.getMaxResults()) {
366+
return TreeNodeTraversalResult.STOP_TRAVERSAL;
367+
}
368+
365369
return TreeNodeTraversalResult.SKIP_CHILDREN; // no need to search any deeper, we have found what we were looking for
366370
}
367371

0 commit comments

Comments
 (0)