Skip to content

Commit

Permalink
perf: flush relay repositories more efficiently, improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
JuliusHenke committed Nov 26, 2023
1 parent 99a69bc commit 73b9410
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class RelayLocationController(
* Update the cache for available relay location days and the given [day]
*/
@Async
fun cacheNewDay(day: String) {
fun updateCache(day: String) {
cacheManager.getCache(CacheName.RELAY_LOCATION_DAYS)?.invalidate()
getDays()
cacheManager.getCache(CacheName.RELAY_LOCATION_DAY)?.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ fun DescriptorType.isRecent() =

fun DescriptorType.isRelayServerType() =
this === DescriptorType.ARCHIVE_RELAY_SERVER || this === DescriptorType.RECENT_RELAY_SERVER

fun DescriptorType.isRelayConsensusType() =
this === DescriptorType.ARCHIVE_RELAY_CONSENSUS || this === DescriptorType.RECENT_RELAY_CONSENSUS
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import org.springframework.stereotype.Service
import org.tormap.config.value.DescriptorConfig
import org.tormap.database.entity.DescriptorType
import org.tormap.database.entity.isRecent
import org.tormap.database.entity.isRelayConsensusType
import org.tormap.database.entity.isRelayServerType
import org.tormap.database.repository.RelayDetailsRepository
import org.tormap.database.repository.RelayLocationRepository
import org.tormap.util.logger
import org.torproject.descriptor.DescriptorCollector
import org.torproject.descriptor.index.DescriptorIndexCollector
Expand All @@ -20,7 +23,9 @@ class DescriptorCoordinationService(
private val descriptorConfig: DescriptorConfig,
private val relayDetailsUpdateService: RelayDetailsUpdateService,
private val descriptorFileService: DescriptorFileService,
private val descriptorProcessingService: DescriptorProcessingService
private val descriptorProcessingService: DescriptorProcessingService,
private val relayDetailsRepository: RelayDetailsRepository,
private val relayLocationRepository: RelayLocationRepository,
) {
private val logger = logger()
private val descriptorCollector: DescriptorCollector = DescriptorIndexCollector()
Expand Down Expand Up @@ -56,23 +61,47 @@ class DescriptorCoordinationService(
var errorCount = 0
val processedMonths = mutableSetOf<String>()
descriptorFileService.getDescriptorDiskReader(apiPath, descriptorType).forEach { descriptor ->
if (lastProcessedFile != null && lastProcessedFile != descriptor.descriptorFile) {
logger.info("Finished processing descriptors file ${lastProcessedFile?.name} with $errorCount errors.")
if (errorCount == 0) {
lastProcessedFile?.let {
descriptorFileService.saveProcessedFileReference(it, descriptorType)
lastProcessedFile?.let {
if (it != descriptor.descriptorFile) {
flushRelayRepositoryAndSaveProcessedFile(it, descriptorType, errorCount)
if (descriptorType.isRelayServerType()) {
relayDetailsUpdateService.updateFamilies(processedMonths)
}
processedMonths.clear()
errorCount = 0
}
if (descriptorType.isRelayServerType()) {
relayDetailsUpdateService.updateFamilies(processedMonths)
}
processedMonths.clear()
errorCount = 0
}
val descriptorInfo = descriptorProcessingService.processDescriptor(descriptor)
descriptorInfo.yearMonth?.let { processedMonths.add(it) }
descriptorInfo.error?.let { errorCount++ }
lastProcessedFile = descriptor.descriptorFile
}
}

private fun flushRelayRepositoryAndSaveProcessedFile(file: File, descriptorType: DescriptorType, errorCount: Int) {
try {
when {
descriptorType.isRelayServerType() -> relayDetailsRepository.flush()
descriptorType.isRelayConsensusType() -> relayLocationRepository.flush()
else -> throw Exception("Descriptor type ${descriptorType.name} is not yet supported!")
}
if (errorCount == 0) {
descriptorFileService.saveProcessedFileReference(file, descriptorType)
}
logFinishedProcessingDescriptorFile(file.name, errorCount)
} catch (exception: Exception) {
logger.error("Could not flush relay repository for ${descriptorType.name}! ${exception.message}")
}
}

private fun logFinishedProcessingDescriptorFile(
filename: String,
errorCount: Int,
) {
if (errorCount == 0) {
logger.info("Finished $filename with 0 errors")
} else {
logger.error("Finished $filename with $errorCount errors")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class DescriptorProcessingService(
}
}
relayLocationRepositoryImpl.flush()
relayLocationController.cacheNewDay(descriptorDay.toString())
relayLocationController.updateCache(descriptorDay.toString())
return ProcessedDescriptorInfo(YearMonth.from(descriptorDay).toString())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class RelayDetailsUpdateService(
*/
fun updateAutonomousSystems() {
try {
relayDetailsRepositoryImpl.flush()
val monthsToProcess = relayDetailsRepositoryImpl.findDistinctMonthsAndAutonomousSystemNumberNull()
logger.info("... Updating Autonomous Systems for months: ${monthsToProcess.joinToString(", ")}")
monthsToProcess.forEach {
Expand Down Expand Up @@ -87,7 +86,6 @@ class RelayDetailsUpdateService(
fun updateFamilies(months: Set<String>) {
try {
logger.info("... Updating relay families for months: ${months.joinToString(", ")}")
relayDetailsRepositoryImpl.flush()
months.forEach { month ->
try {
updateFamiliesForMonth(month)
Expand Down

0 comments on commit 73b9410

Please sign in to comment.