26
26
import com .dqops .data .checkresults .statuscache .CurrentTableStatusKey ;
27
27
import com .dqops .data .checkresults .statuscache .TableStatusCache ;
28
28
import com .dqops .data .checkresults .statuscache .TableStatusCacheProvider ;
29
- import com .dqops .data .storage .HivePartitionPathUtility ;
30
29
import com .dqops .data .storage .LoadedMonthlyPartition ;
31
30
import com .dqops .data .storage .ParquetPartitioningKeys ;
31
+ import com .dqops .metadata .labels .labelloader .LabelRefreshKey ;
32
+ import com .dqops .metadata .labels .labelloader .LabelRefreshTarget ;
33
+ import com .dqops .metadata .labels .labelloader .LabelsIndexer ;
34
+ import com .dqops .metadata .labels .labelloader .LabelsIndexerProvider ;
32
35
import com .dqops .metadata .sources .PhysicalTableName ;
36
+ import com .dqops .metadata .storage .localfiles .SpecFileNames ;
33
37
import com .dqops .utils .exceptions .DqoRuntimeException ;
34
38
import com .dqops .utils .reflection .ObjectMemorySizeUtility ;
35
39
import com .github .benmanes .caffeine .cache .Cache ;
@@ -67,6 +71,7 @@ public class LocalFileSystemCacheImpl implements LocalFileSystemCache, Disposabl
67
71
private final Object directoryWatchersLock = new Object ();
68
72
private final DqoCacheConfigurationProperties dqoCacheConfigurationProperties ;
69
73
private final TableStatusCacheProvider tableStatusCacheProvider ;
74
+ private final LabelsIndexerProvider labelsIndexerProvider ;
70
75
private final HomeLocationFindService homeLocationFindService ;
71
76
private final Path userHomeRootPath ;
72
77
private Instant nextFileChangeDetectionAt = Instant .now ().minus (100L , ChronoUnit .MILLIS );
@@ -76,14 +81,17 @@ public class LocalFileSystemCacheImpl implements LocalFileSystemCache, Disposabl
76
81
* Dependency injection constructor.
77
82
* @param dqoCacheConfigurationProperties Cache configuration parameters.
78
83
* @param tableStatusCacheProvider Table status cache provider.
84
+ * @param labelsIndexerProvider Labels indexer, notified to update labels for loaded or modified connections and tables.
79
85
* @param homeLocationFindService Service that finds the location of the user home, used to translate file paths of updated files to relative file paths in the user home.
80
86
*/
81
87
@ Autowired
82
88
public LocalFileSystemCacheImpl (DqoCacheConfigurationProperties dqoCacheConfigurationProperties ,
83
89
TableStatusCacheProvider tableStatusCacheProvider ,
90
+ LabelsIndexerProvider labelsIndexerProvider ,
84
91
HomeLocationFindService homeLocationFindService ) {
85
92
this .dqoCacheConfigurationProperties = dqoCacheConfigurationProperties ;
86
93
this .tableStatusCacheProvider = tableStatusCacheProvider ;
94
+ this .labelsIndexerProvider = labelsIndexerProvider ;
87
95
this .homeLocationFindService = homeLocationFindService ;
88
96
this .userHomeRootPath = homeLocationFindService .getUserHomePath () != null ? Path .of (homeLocationFindService .getUserHomePath ()) : Path .of ("." );
89
97
@@ -280,10 +288,19 @@ public FileContent loadFileContent(Path filePath, Function<Path, FileContent> re
280
288
if (this .dqoCacheConfigurationProperties .isEnable ()) {
281
289
this .startFolderWatcher (filePath .getParent ());
282
290
this .processFileChanges (false );
283
- return this .fileContentsCache .get (filePath , readFunction );
291
+ FileContent fileContent = this .fileContentsCache .getIfPresent (filePath );
292
+ if (fileContent != null ) {
293
+ return fileContent ;
294
+ }
295
+
296
+ fileContent = this .fileContentsCache .get (filePath , readFunction );
297
+ this .invalidateTableStatusCacheAndLabelsIndexer (filePath , false );
298
+ return fileContent ;
284
299
}
285
300
286
- return readFunction .apply (filePath );
301
+ FileContent fileContentWithoutCache = readFunction .apply (filePath );
302
+ this .invalidateTableStatusCacheAndLabelsIndexer (filePath , false );
303
+ return fileContentWithoutCache ;
287
304
}
288
305
289
306
/**
@@ -294,6 +311,7 @@ public FileContent loadFileContent(Path filePath, Function<Path, FileContent> re
294
311
@ Override
295
312
public void storeFile (Path filePath , FileContent fileContent ) {
296
313
if (!this .dqoCacheConfigurationProperties .isEnable ()) {
314
+ this .invalidateTableStatusCacheAndLabelsIndexer (filePath , false );
297
315
return ;
298
316
}
299
317
@@ -303,7 +321,9 @@ public void storeFile(Path filePath, FileContent fileContent) {
303
321
processFileChanges (true );
304
322
}
305
323
324
+ boolean replacingCachedFile = this .fileContentsCache .getIfPresent (filePath ) != null ;
306
325
this .fileContentsCache .put (filePath , fileContent );
326
+ this .invalidateTableStatusCacheAndLabelsIndexer (filePath , replacingCachedFile );
307
327
308
328
Path parentFolderPath = filePath .getParent ();
309
329
this .fileListsCache .invalidate (parentFolderPath );
@@ -336,6 +356,7 @@ public LoadedMonthlyPartition getParquetFile(Path filePath) {
336
356
@ Override
337
357
public void storeParquetFile (Path filePath , LoadedMonthlyPartition table ) {
338
358
if (!this .dqoCacheConfigurationProperties .isEnable ()) {
359
+ this .invalidateTableStatusCacheAndLabelsIndexer (filePath , false );
339
360
return ;
340
361
}
341
362
@@ -347,7 +368,7 @@ public void storeParquetFile(Path filePath, LoadedMonthlyPartition table) {
347
368
348
369
boolean replacingCachedFile = this .parquetFilesCache .getIfPresent (filePath ) != null ;
349
370
this .parquetFilesCache .put (filePath , table );
350
- invalidateTableStatusCache (filePath , replacingCachedFile );
371
+ this . invalidateTableStatusCacheAndLabelsIndexer (filePath , replacingCachedFile );
351
372
352
373
Path parentFolderPath = filePath .getParent ();
353
374
this .fileListsCache .invalidate (parentFolderPath );
@@ -369,7 +390,7 @@ public void removeFile(Path filePath) {
369
390
this .fileContentsCache .invalidate (filePath );
370
391
this .parquetFilesCache .invalidate (filePath );
371
392
this .wasRecentlyInvalidated = true ;
372
- invalidateTableStatusCache (filePath , true );
393
+ this . invalidateTableStatusCacheAndLabelsIndexer (filePath , true );
373
394
374
395
375
396
Path parentFolderPath = filePath .getParent ();
@@ -392,7 +413,7 @@ public void removeFolder(Path folderPath) {
392
413
393
414
this .folderListsCache .invalidate (folderPath );
394
415
this .fileListsCache .invalidate (folderPath );
395
- invalidateTableStatusCache (folderPath , true );
416
+ this . invalidateTableStatusCacheAndLabelsIndexer (folderPath , true );
396
417
397
418
this .stopFolderWatcher (folderPath );
398
419
@@ -414,7 +435,7 @@ public void invalidateFolder(Path folderPath) {
414
435
415
436
this .folderListsCache .invalidate (folderPath );
416
437
this .fileListsCache .invalidate (folderPath );
417
- invalidateTableStatusCache (folderPath , true );
438
+ this . invalidateTableStatusCacheAndLabelsIndexer (folderPath , true );
418
439
}
419
440
420
441
/**
@@ -430,7 +451,7 @@ public void invalidateFile(Path filePath) {
430
451
this .fileContentsCache .invalidate (filePath );
431
452
this .parquetFilesCache .invalidate (filePath );
432
453
this .wasRecentlyInvalidated = true ;
433
- invalidateTableStatusCache (filePath , true );
454
+ this . invalidateTableStatusCacheAndLabelsIndexer (filePath , true );
434
455
435
456
Path folderPath = filePath .getParent ();
436
457
if (folderPath != null ) {
@@ -442,11 +463,12 @@ public void invalidateFile(Path filePath) {
442
463
/**
443
464
* Matches the path of an updated or deleted file to a parquet file for tables that are cached: check_results and errors.
444
465
* If a table is detected whose data quality results were updated, triggers an invalidation of a current table cache.
466
+ * Also detects when a connection or table yaml file were modified, and rescans their indexes.
445
467
* @param filePath File path to a file that should be updated.
446
468
* @param replacingCachedFile True when we are replacing a file that was already in a cache, false when a file is just placed into a cache,
447
469
* and it is not a real invalidation, but just a notification that a file was just cached.
448
470
*/
449
- public void invalidateTableStatusCache (Path filePath , boolean replacingCachedFile ) {
471
+ public void invalidateTableStatusCacheAndLabelsIndexer (Path filePath , boolean replacingCachedFile ) {
450
472
if (!filePath .startsWith (this .userHomeRootPath )) {
451
473
return ;
452
474
}
@@ -462,21 +484,39 @@ public void invalidateTableStatusCache(Path filePath, boolean replacingCachedFil
462
484
}
463
485
464
486
HomeFolderPath folder = homeFilePath .getFolder ();
465
- if (folder .size () < 4 || ! Objects .equals (BuiltInFolderNames .DATA , folder .get (0 ).getFileSystemName ()) ||
466
- ! (Objects .equals (BuiltInFolderNames .CHECK_RESULTS , folder .get (1 ).getFileSystemName ()) ||
487
+ if (folder .size () >= 4 && Objects .equals (BuiltInFolderNames .DATA , folder .get (0 ).getFileSystemName ()) &&
488
+ (Objects .equals (BuiltInFolderNames .CHECK_RESULTS , folder .get (1 ).getFileSystemName ()) ||
467
489
Objects .equals (BuiltInFolderNames .ERRORS , folder .get (1 ).getFileSystemName ()))) {
468
- return ; // not a parquet folder
469
- }
490
+ // parquet file updated
470
491
471
- String connectionNameFolder = folder .get (2 ).getFileSystemName ();
472
- String schemaTableNameFolder = folder .get (3 ).getFileSystemName ();
492
+ String connectionNameFolder = folder .get (2 ).getFileSystemName ();
493
+ String schemaTableNameFolder = folder .get (3 ).getFileSystemName ();
473
494
474
- if (connectionNameFolder .startsWith (ParquetPartitioningKeys .CONNECTION + "=" ) && connectionNameFolder .length () > 2 &&
475
- schemaTableNameFolder .startsWith (ParquetPartitioningKeys .SCHEMA_TABLE + "=" ) && schemaTableNameFolder .length () > 2 ) {
476
- String decodedConnectionName = FileNameSanitizer .decodeFileSystemName (connectionNameFolder .substring (2 ));
477
- PhysicalTableName physicalTableName = PhysicalTableName .fromBaseFileName (schemaTableNameFolder .substring (2 ));
478
- TableStatusCache tableStatusCache = this .tableStatusCacheProvider .getTableStatusCache ();
479
- tableStatusCache .invalidateTableStatus (new CurrentTableStatusKey (folder .getDataDomain (), decodedConnectionName , physicalTableName ), replacingCachedFile );
495
+ if (connectionNameFolder .startsWith (ParquetPartitioningKeys .CONNECTION + "=" ) && connectionNameFolder .length () > 2 &&
496
+ schemaTableNameFolder .startsWith (ParquetPartitioningKeys .SCHEMA_TABLE + "=" ) && schemaTableNameFolder .length () > 2 ) {
497
+ String decodedConnectionName = FileNameSanitizer .decodeFileSystemName (connectionNameFolder .substring (2 ));
498
+ PhysicalTableName physicalTableName = PhysicalTableName .fromBaseFileName (schemaTableNameFolder .substring (2 ));
499
+ TableStatusCache tableStatusCache = this .tableStatusCacheProvider .getTableStatusCache ();
500
+ tableStatusCache .invalidateTableStatus (new CurrentTableStatusKey (folder .getDataDomain (), decodedConnectionName , physicalTableName ), replacingCachedFile );
501
+ }
502
+ }
503
+
504
+ if (folder .size () >= 2 && Objects .equals (BuiltInFolderNames .SOURCES , folder .get (0 ).getFileSystemName ())) {
505
+ String connectionName = folder .get (1 ).getObjectName ();
506
+ String fileName = homeFilePath .getFileName ();
507
+ LabelsIndexer labelsIndexer = this .labelsIndexerProvider .getLabelsIndexer ();
508
+
509
+ if (Objects .equals (fileName , SpecFileNames .CONNECTION_SPEC_FILE_NAME_YAML )) {
510
+ labelsIndexer .invalidateObject (
511
+ new LabelRefreshKey (LabelRefreshTarget .CONNECTION , folder .getDataDomain (), connectionName , null ),
512
+ replacingCachedFile );
513
+ } else if (fileName != null && fileName .endsWith (SpecFileNames .TABLE_SPEC_FILE_EXT_YAML )) {
514
+ String bareFileName = fileName .substring (0 , fileName .length () - SpecFileNames .TABLE_SPEC_FILE_EXT_YAML .length ());
515
+ PhysicalTableName physicalTableName = PhysicalTableName .fromBaseFileName (bareFileName );
516
+ labelsIndexer .invalidateObject (
517
+ new LabelRefreshKey (LabelRefreshTarget .TABLE , folder .getDataDomain (), connectionName , physicalTableName ),
518
+ replacingCachedFile );
519
+ }
480
520
}
481
521
}
482
522
0 commit comments