Skip to content

Commit b8e08b8

Browse files
committed
Auto import tables on a cron schuduler.
1 parent 6a5cf07 commit b8e08b8

24 files changed

+732
-42
lines changed

dqops/src/main/java/com/dqops/core/jobqueue/DqoJobType.java

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public enum DqoJobType {
3434
run_scheduled_checks_cron,
3535
import_schema,
3636
import_tables,
37+
auto_import_tables,
3738
delete_stored_data,
3839
repair_stored_data,
3940
}

dqops/src/main/java/com/dqops/core/jobqueue/DqoQueueJobFactory.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.dqops.core.jobqueue.jobs.schema.ImportSchemaQueueJob;
2121
import com.dqops.core.jobqueue.jobs.table.ImportTablesQueueJob;
2222
import com.dqops.core.scheduler.collectstatistics.CollectScheduledStatisticsDqoJob;
23+
import com.dqops.core.scheduler.importtables.AutoImportTablesDqoJob;
2324
import com.dqops.core.scheduler.runcheck.RunScheduledChecksDqoJob;
2425
import com.dqops.core.synchronization.jobs.SynchronizeMultipleFoldersDqoQueueJob;
2526
import com.dqops.core.synchronization.jobs.SynchronizeRootFolderDqoQueueJob;
@@ -57,7 +58,13 @@ public interface DqoQueueJobFactory {
5758
* Creates a job that runs the data profiler and collects statistics for a given schedule.
5859
* @return Job that collects statistics when triggered by a scheduler.
5960
*/
60-
CollectScheduledStatisticsDqoJob collectScheduledStatisticsJob();
61+
CollectScheduledStatisticsDqoJob createCollectScheduledStatisticsJob();
62+
63+
/**
64+
* Creates a job that automatically imports tables for all connections scheduled for a given schedule.
65+
* @return Job that automatically imports tables when triggered by a scheduler.
66+
*/
67+
AutoImportTablesDqoJob createAutoImportTablesJob();
6168

6269
/**
6370
* Creates a child job that runs profilers (collects statistics) on a single table.

dqops/src/main/java/com/dqops/core/jobqueue/DqoQueueJobFactoryImpl.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.dqops.core.jobqueue.jobs.schema.ImportSchemaQueueJob;
2121
import com.dqops.core.jobqueue.jobs.table.ImportTablesQueueJob;
2222
import com.dqops.core.scheduler.collectstatistics.CollectScheduledStatisticsDqoJob;
23+
import com.dqops.core.scheduler.importtables.AutoImportTablesDqoJob;
2324
import com.dqops.core.scheduler.runcheck.RunScheduledChecksDqoJob;
2425
import com.dqops.core.synchronization.jobs.SynchronizeMultipleFoldersDqoQueueJob;
2526
import com.dqops.core.synchronization.jobs.SynchronizeRootFolderDqoQueueJob;
@@ -85,7 +86,7 @@ public CollectStatisticsQueueJob createCollectStatisticsJob() {
8586
* @return Job that collects statistics when triggered by a scheduler.
8687
*/
8788
@Override
88-
public CollectScheduledStatisticsDqoJob collectScheduledStatisticsJob() {
89+
public CollectScheduledStatisticsDqoJob createCollectScheduledStatisticsJob() {
8990
return this.beanFactory.getBean(CollectScheduledStatisticsDqoJob.class);
9091
}
9192

@@ -169,6 +170,16 @@ public ImportTablesQueueJob createImportTablesJob() {
169170
return this.beanFactory.getBean(ImportTablesQueueJob.class);
170171
}
171172

173+
/**
174+
* Creates a job that automatically imports tables for all connections scheduled for a given schedule.
175+
*
176+
* @return Job that automatically imports tables when triggered by a scheduler.
177+
*/
178+
@Override
179+
public AutoImportTablesDqoJob createAutoImportTablesJob() {
180+
return this.beanFactory.getBean(AutoImportTablesDqoJob.class);
181+
}
182+
172183
/**
173184
* Creates a job that data from user's ".data" directory.
174185
*

dqops/src/main/java/com/dqops/core/jobqueue/jobs/table/ImportTablesQueueJob.java

+80-28
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package com.dqops.core.jobqueue.jobs.table;
1717

18-
import com.dqops.connectors.ConnectionProvider;
19-
import com.dqops.connectors.ConnectionProviderRegistry;
20-
import com.dqops.connectors.ProviderType;
21-
import com.dqops.connectors.SourceConnection;
18+
import com.dqops.connectors.*;
2219
import com.dqops.core.configuration.DqoMetadataImportConfigurationProperties;
2320
import com.dqops.core.jobqueue.DqoJobExecutionContext;
2421
import com.dqops.core.jobqueue.DqoJobType;
@@ -32,10 +29,12 @@
3229
import com.dqops.core.principal.UserDomainIdentity;
3330
import com.dqops.core.secrets.SecretValueLookupContext;
3431
import com.dqops.core.secrets.SecretValueProvider;
32+
import com.dqops.metadata.search.pattern.SearchPattern;
3533
import com.dqops.metadata.sources.*;
3634
import com.dqops.metadata.storage.localfiles.userhome.UserHomeContext;
3735
import com.dqops.metadata.storage.localfiles.userhome.UserHomeContextFactory;
3836
import com.dqops.metadata.userhome.UserHome;
37+
import org.apache.parquet.Strings;
3938
import org.springframework.beans.factory.annotation.Autowired;
4039
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
4140
import org.springframework.context.annotation.Scope;
@@ -45,6 +44,7 @@
4544
import tech.tablesaw.api.Table;
4645
import tech.tablesaw.api.TextColumn;
4746

47+
import java.util.ArrayList;
4848
import java.util.List;
4949
import java.util.stream.Collectors;
5050

@@ -94,10 +94,7 @@ public void setImportParameters(ImportTablesQueueJobParameters importParameters)
9494
*/
9595
public Table createDatasetTableFromTableSpecs(List<TableSpec> sourceTableSpecs) {
9696
// TODO: move method to tablesaw utils (repeated in ImportSchemaQueueJob).
97-
Table resultTable = Table.create().addColumns(
98-
TextColumn.create("Schema name"),
99-
TextColumn.create("Table name"),
100-
IntColumn.create("Column count"));
97+
Table resultTable = createEmptyOutputResult();
10198

10299
for(TableSpec sourceTableSpec : sourceTableSpecs) {
103100
Row row = resultTable.appendRow();
@@ -108,6 +105,17 @@ public Table createDatasetTableFromTableSpecs(List<TableSpec> sourceTableSpecs)
108105
return resultTable;
109106
}
110107

108+
/**
109+
* Creates an empty result table
110+
* @return Empty result table.
111+
*/
112+
private Table createEmptyOutputResult() {
113+
return Table.create().addColumns(
114+
TextColumn.create("Schema name"),
115+
TextColumn.create("Table name"),
116+
IntColumn.create("Column count"));
117+
}
118+
111119
/**
112120
* Job internal implementation method that should be implemented by derived jobs.
113121
*
@@ -134,27 +142,71 @@ public ImportTablesResult onExecute(DqoJobExecutionContext jobExecutionContext)
134142

135143
ProviderType providerType = expandedConnectionSpec.getProviderType();
136144
ConnectionProvider connectionProvider = this.connectionProviderRegistry.getConnectionProvider(providerType);
145+
146+
String schemaNameFilter = Strings.isNullOrEmpty(this.importParameters.getSchemaName()) ? "*" : this.importParameters.getSchemaName();
147+
SearchPattern schemaSearchPattern = SearchPattern.create(false, schemaNameFilter);
148+
137149
try (SourceConnection sourceConnection = connectionProvider.createConnection(expandedConnectionSpec, true, secretValueLookupContext)) {
138-
List<TableSpec> sourceTableSpecs = sourceConnection.retrieveTableMetadata(
139-
this.importParameters.getSchemaName(),
140-
this.importParameters.getTableNameContains(),
141-
this.metadataImportConfigurationProperties.getTablesImportLimit(),
142-
this.importParameters.getTableNames(),
143-
connectionWrapper,
144-
secretValueLookupContext);
145-
146-
List<TableSpec> importedTablesSpecs = sourceTableSpecs
147-
.stream()
148-
.map(tableSpec -> tableSpec.deepClone())
149-
.collect(Collectors.toList());
150-
151-
TableList currentTablesColl = connectionWrapper.getTables();
152-
currentTablesColl.importTables(importedTablesSpecs, connectionSpec.getDefaultGroupingConfiguration());
153-
userHomeContext.flush();
154-
155-
Table resultTable = createDatasetTableFromTableSpecs(importedTablesSpecs);
156-
157-
return new ImportTablesResult(resultTable, sourceTableSpecs);
150+
if (schemaSearchPattern.isWildcardSearchPattern()) {
151+
// must iterate over schemas
152+
List<SourceSchemaModel> sourceSchemaModels = sourceConnection.listSchemas();
153+
jobExecutionContext.getCancellationToken().throwIfCancelled();
154+
List<TableSpec> fullTableList = new ArrayList<>();
155+
Table fullTableResult = createEmptyOutputResult();
156+
157+
for (SourceSchemaModel sourceSchemaModel : sourceSchemaModels) {
158+
jobExecutionContext.getCancellationToken().throwIfCancelled();
159+
160+
if (!schemaSearchPattern.match(sourceSchemaModel.getSchemaName())) {
161+
continue;
162+
}
163+
164+
List<TableSpec> sourceTableSpecs = sourceConnection.retrieveTableMetadata(
165+
sourceSchemaModel.getSchemaName(),
166+
this.importParameters.getTableNameContains(),
167+
this.metadataImportConfigurationProperties.getTablesImportLimit(),
168+
this.importParameters.getTableNames(),
169+
connectionWrapper,
170+
secretValueLookupContext);
171+
172+
List<TableSpec> importedTablesSpecs = sourceTableSpecs
173+
.stream()
174+
.map(tableSpec -> tableSpec.deepClone())
175+
.collect(Collectors.toList());
176+
177+
TableList currentTablesColl = connectionWrapper.getTables();
178+
currentTablesColl.importTables(importedTablesSpecs, connectionSpec.getDefaultGroupingConfiguration());
179+
fullTableList.addAll(importedTablesSpecs);
180+
181+
Table resultTable = createDatasetTableFromTableSpecs(importedTablesSpecs);
182+
fullTableResult.append(resultTable);
183+
}
184+
185+
userHomeContext.flush();
186+
return new ImportTablesResult(fullTableResult, fullTableList);
187+
} else {
188+
// only one schema
189+
List<TableSpec> sourceTableSpecs = sourceConnection.retrieveTableMetadata(
190+
this.importParameters.getSchemaName(),
191+
this.importParameters.getTableNameContains(),
192+
this.metadataImportConfigurationProperties.getTablesImportLimit(),
193+
this.importParameters.getTableNames(),
194+
connectionWrapper,
195+
secretValueLookupContext);
196+
197+
List<TableSpec> importedTablesSpecs = sourceTableSpecs
198+
.stream()
199+
.map(tableSpec -> tableSpec.deepClone())
200+
.collect(Collectors.toList());
201+
202+
TableList currentTablesColl = connectionWrapper.getTables();
203+
currentTablesColl.importTables(importedTablesSpecs, connectionSpec.getDefaultGroupingConfiguration());
204+
userHomeContext.flush();
205+
206+
Table resultTable = createDatasetTableFromTableSpecs(importedTablesSpecs);
207+
208+
return new ImportTablesResult(resultTable, sourceTableSpecs);
209+
}
158210
}
159211
}
160212

dqops/src/main/java/com/dqops/core/jobqueue/monitoring/DqoJobEntryParametersModel.java

+6
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public class DqoJobEntryParametersModel {
6262
@JsonPropertyDescription("The job parameters for the \"collect scheduled statistics\" cron queue job.")
6363
private CronScheduleSpec collectScheduledStatisticsParameters;
6464

65+
/**
66+
* The job parameters for the "auto import tables" queue job.
67+
*/
68+
@JsonPropertyDescription("The job parameters for the \"auto import tables\" cron queue job.")
69+
private CronScheduleSpec autoImportTablesParameters;
70+
6571
/**
6672
* The job parameters for the "run checks" queue job.
6773
*/

dqops/src/main/java/com/dqops/core/scheduler/JobSchedulerServiceImpl.java

+27
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dqops.core.principal.DqoUserPrincipal;
2727
import com.dqops.core.principal.DqoUserPrincipalProvider;
2828
import com.dqops.core.scheduler.collectstatistics.CollectScheduledStatisticsSchedulerJob;
29+
import com.dqops.core.scheduler.importtables.AutoImportTablesSchedulerJob;
2930
import com.dqops.core.scheduler.quartz.*;
3031
import com.dqops.core.scheduler.runcheck.RunScheduledChecksSchedulerJob;
3132
import com.dqops.core.scheduler.synchronize.JobSchedulesDelta;
@@ -77,6 +78,7 @@ public class JobSchedulerServiceImpl implements JobSchedulerService {
7778
private JobDetail runChecksJob;
7879
private JobDetail synchronizeMetadataJob;
7980
private JobDetail collectStatisticsJob;
81+
private JobDetail importTablesJob;
8082
private FileSystemSynchronizationReportingMode synchronizationMode = FileSystemSynchronizationReportingMode.silent;
8183
private CheckRunReportingMode checkRunReportingMode = CheckRunReportingMode.silent;
8284
private StatisticsCollectorExecutionReportingMode collectStatisticsReportingMode = StatisticsCollectorExecutionReportingMode.silent;
@@ -247,6 +249,14 @@ public void defineDefaultJobs() {
247249
this.scheduler.addJob(this.collectStatisticsJob, true);
248250
}
249251

252+
if (!this.scheduler.checkExists(JobKeys.IMPORT_TABLES)) {
253+
this.importTablesJob = newJob(AutoImportTablesSchedulerJob.class)
254+
.withIdentity(JobKeys.IMPORT_TABLES)
255+
.storeDurably()
256+
.build();
257+
this.scheduler.addJob(this.importTablesJob, true);
258+
}
259+
250260
String scanMetadataCronSchedule = this.schedulerConfigurationProperties.getSynchronizeCronSchedule();
251261
DqoUserPrincipal userPrincipalForAdministrator = this.principalProvider.createLocalInstanceAdminPrincipal();
252262
DqoCloudApiKey dqoCloudApiKey = this.dqoCloudApiKeyProvider.getApiKey(userPrincipalForAdministrator.getDataDomainIdentity());
@@ -354,6 +364,16 @@ public void reconcileScheduledDomains() {
354364
}
355365
}
356366
}
367+
368+
List<? extends Trigger> triggersOfImportTables = this.scheduler.getTriggersOfJob(JobKeys.IMPORT_TABLES);
369+
if (triggersOfImportTables != null) {
370+
for (Trigger trigger : triggersOfImportTables) {
371+
String dataDomainInJob = this.jobDataMapAdapter.getDataDomain(trigger.getJobDataMap());
372+
if (Objects.equals(dataDomainInJob, existingDomainName)) {
373+
this.scheduler.unscheduleJob(trigger.getKey());
374+
}
375+
}
376+
}
357377
}
358378
catch (SchedulerException ex) {
359379
log.error("Failed to unschedule jobs for an unloaded data domain because " + ex.getMessage(), ex);
@@ -420,6 +440,13 @@ public void shutdown() {
420440
}
421441
}
422442

443+
List<? extends Trigger> triggersOfImportTables = this.scheduler.getTriggersOfJob(JobKeys.IMPORT_TABLES);
444+
if (triggersOfImportTables != null) {
445+
for (Trigger trigger : triggersOfImportTables) {
446+
this.scheduler.unscheduleJob(trigger.getKey());
447+
}
448+
}
449+
423450
this.scheduler.shutdown();
424451
this.schedulerFactory.stop();
425452
}

dqops/src/main/java/com/dqops/core/scheduler/collectstatistics/CollectScheduledStatisticsSchedulerJob.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
6969
final CronScheduleSpec runChecksCronSchedule = this.jobDataMapAdapter.getSchedule(jobExecutionContext.getMergedJobDataMap());
7070

7171
try {
72-
this.collectScheduledStatisticsJob = this.dqoQueueJobFactory.collectScheduledStatisticsJob();
72+
this.collectScheduledStatisticsJob = this.dqoQueueJobFactory.createCollectScheduledStatisticsJob();
7373
this.collectScheduledStatisticsJob.setCronSchedule(runChecksCronSchedule);
7474
String dataDomain = this.jobDataMapAdapter.getDataDomain(jobExecutionContext.getTrigger().getJobDataMap());
7575

0 commit comments

Comments
 (0)