diff --git a/backend/src/main/kotlin/org/tormap/adapter/controller/RelayLocationController.kt b/backend/src/main/kotlin/org/tormap/adapter/controller/RelayLocationController.kt index 548e18b7..57e1c797 100644 --- a/backend/src/main/kotlin/org/tormap/adapter/controller/RelayLocationController.kt +++ b/backend/src/main/kotlin/org/tormap/adapter/controller/RelayLocationController.kt @@ -36,7 +36,7 @@ class RelayLocationController( * Update the cache for available relay location days and the given [day] */ @Async - fun cacheNewDay(day: String) { + fun updateCache(day: String) { cacheManager.getCache(CacheName.RELAY_LOCATION_DAYS)?.invalidate() getDays() cacheManager.getCache(CacheName.RELAY_LOCATION_DAY)?.put( diff --git a/backend/src/main/kotlin/org/tormap/database/entity/ProcessedFile.kt b/backend/src/main/kotlin/org/tormap/database/entity/ProcessedFile.kt index b80d4172..8cb5a8f2 100644 --- a/backend/src/main/kotlin/org/tormap/database/entity/ProcessedFile.kt +++ b/backend/src/main/kotlin/org/tormap/database/entity/ProcessedFile.kt @@ -58,3 +58,6 @@ fun DescriptorType.isRecent() = fun DescriptorType.isRelayServerType() = this === DescriptorType.ARCHIVE_RELAY_SERVER || this === DescriptorType.RECENT_RELAY_SERVER + +fun DescriptorType.isRelayConsensusType() = + this === DescriptorType.ARCHIVE_RELAY_CONSENSUS || this === DescriptorType.RECENT_RELAY_CONSENSUS diff --git a/backend/src/main/kotlin/org/tormap/service/DescriptorCoordinationService.kt b/backend/src/main/kotlin/org/tormap/service/DescriptorCoordinationService.kt index 59db6322..77791718 100644 --- a/backend/src/main/kotlin/org/tormap/service/DescriptorCoordinationService.kt +++ b/backend/src/main/kotlin/org/tormap/service/DescriptorCoordinationService.kt @@ -4,7 +4,10 @@ import org.springframework.stereotype.Service import org.tormap.config.value.DescriptorConfig import org.tormap.database.entity.DescriptorType import org.tormap.database.entity.isRecent +import org.tormap.database.entity.isRelayConsensusType import org.tormap.database.entity.isRelayServerType +import org.tormap.database.repository.RelayDetailsRepository +import org.tormap.database.repository.RelayLocationRepository import org.tormap.util.logger import org.torproject.descriptor.DescriptorCollector import org.torproject.descriptor.index.DescriptorIndexCollector @@ -20,7 +23,9 @@ class DescriptorCoordinationService( private val descriptorConfig: DescriptorConfig, private val relayDetailsUpdateService: RelayDetailsUpdateService, private val descriptorFileService: DescriptorFileService, - private val descriptorProcessingService: DescriptorProcessingService + private val descriptorProcessingService: DescriptorProcessingService, + private val relayDetailsRepository: RelayDetailsRepository, + private val relayLocationRepository: RelayLocationRepository, ) { private val logger = logger() private val descriptorCollector: DescriptorCollector = DescriptorIndexCollector() @@ -56,18 +61,15 @@ class DescriptorCoordinationService( var errorCount = 0 val processedMonths = mutableSetOf() descriptorFileService.getDescriptorDiskReader(apiPath, descriptorType).forEach { descriptor -> - if (lastProcessedFile != null && lastProcessedFile != descriptor.descriptorFile) { - logger.info("Finished processing descriptors file ${lastProcessedFile?.name} with $errorCount errors.") - if (errorCount == 0) { - lastProcessedFile?.let { - descriptorFileService.saveProcessedFileReference(it, descriptorType) + lastProcessedFile?.let { + if (it != descriptor.descriptorFile) { + flushRelayRepositoryAndSaveProcessedFile(it, descriptorType, errorCount) + if (descriptorType.isRelayServerType()) { + relayDetailsUpdateService.updateFamilies(processedMonths) } + processedMonths.clear() + errorCount = 0 } - if (descriptorType.isRelayServerType()) { - relayDetailsUpdateService.updateFamilies(processedMonths) - } - processedMonths.clear() - errorCount = 0 } val descriptorInfo = descriptorProcessingService.processDescriptor(descriptor) descriptorInfo.yearMonth?.let { processedMonths.add(it) } @@ -75,4 +77,31 @@ class DescriptorCoordinationService( lastProcessedFile = descriptor.descriptorFile } } + + private fun flushRelayRepositoryAndSaveProcessedFile(file: File, descriptorType: DescriptorType, errorCount: Int) { + try { + when { + descriptorType.isRelayServerType() -> relayDetailsRepository.flush() + descriptorType.isRelayConsensusType() -> relayLocationRepository.flush() + else -> throw Exception("Descriptor type ${descriptorType.name} is not yet supported!") + } + if (errorCount == 0) { + descriptorFileService.saveProcessedFileReference(file, descriptorType) + } + logFinishedProcessingDescriptorFile(file.name, errorCount) + } catch (exception: Exception) { + logger.error("Could not flush relay repository for ${descriptorType.name}! ${exception.message}") + } + } + + private fun logFinishedProcessingDescriptorFile( + filename: String, + errorCount: Int, + ) { + if (errorCount == 0) { + logger.info("Finished $filename with 0 errors") + } else { + logger.error("Finished $filename with $errorCount errors") + } + } } diff --git a/backend/src/main/kotlin/org/tormap/service/DescriptorProcessingService.kt b/backend/src/main/kotlin/org/tormap/service/DescriptorProcessingService.kt index 05192f87..b8b56947 100644 --- a/backend/src/main/kotlin/org/tormap/service/DescriptorProcessingService.kt +++ b/backend/src/main/kotlin/org/tormap/service/DescriptorProcessingService.kt @@ -64,7 +64,7 @@ class DescriptorProcessingService( } } relayLocationRepositoryImpl.flush() - relayLocationController.cacheNewDay(descriptorDay.toString()) + relayLocationController.updateCache(descriptorDay.toString()) return ProcessedDescriptorInfo(YearMonth.from(descriptorDay).toString()) } diff --git a/backend/src/main/kotlin/org/tormap/service/RelayDetailsUpdateService.kt b/backend/src/main/kotlin/org/tormap/service/RelayDetailsUpdateService.kt index 47b9d297..b8b46f36 100644 --- a/backend/src/main/kotlin/org/tormap/service/RelayDetailsUpdateService.kt +++ b/backend/src/main/kotlin/org/tormap/service/RelayDetailsUpdateService.kt @@ -31,7 +31,6 @@ class RelayDetailsUpdateService( */ fun updateAutonomousSystems() { try { - relayDetailsRepositoryImpl.flush() val monthsToProcess = relayDetailsRepositoryImpl.findDistinctMonthsAndAutonomousSystemNumberNull() logger.info("... Updating Autonomous Systems for months: ${monthsToProcess.joinToString(", ")}") monthsToProcess.forEach { @@ -87,7 +86,6 @@ class RelayDetailsUpdateService( fun updateFamilies(months: Set) { try { logger.info("... Updating relay families for months: ${months.joinToString(", ")}") - relayDetailsRepositoryImpl.flush() months.forEach { month -> try { updateFamiliesForMonth(month)